Multiprocessing with Python

Circumventing memory and time constraints with multiprocessing

Posted by James on November 22, 2014

Introduction

This is a quick and dirty tutorial about a hack that we used to fit our data into the constraints of our memory.

When working with some clients, you might find that their database is simply a repository of csv or excel files and you'll simply have to make do; often having to complete your work without updating their data-warehouse. Most of the time these files would be better served being stored in some simple DB framework, but time might not allow for that. This method came about from having contraints set on our time, our machine, and our scope.

So here's a good example of something we came across: Let's suppose you have to combine a bunch of tables together for feature generation (and you're not using a Neo4j, MongoDB, or some other type database, but rather have tables stored in csvs, tsvs, etc.), but you know that if you were to try to combine them all, the resulting dataframe would not fit into memory. So your first thought might be: 'I will just combine it into parts, then save those parts'. Now that seems like a great solution, but will probably be a relatively slow process. That is, unless we use multilple cores.

The Goal

The goal was to combine information about related job titles for every job title we have (~10,000) with codes the government gives those related titles and then combine that result with state specific information for each of the related titles, then use features generated via word2vec to amplify existing features in our client's pipeline.

and to do it very fast... because who likes waiting around for things. Think of it as a multi-table join, but outside of a standard relational database

The Data

Jobs Data

referencenumber title postdate url company city state description industry price updatedate feeddate jobboard hash matches(features)
1652398203 Sales Associate 2014-07-09 13:47:18 URL link Company Name City State Our Sales Associates are... Sales / Sales Management / Business Development low 2014-08-02 07:17:27 FeedDate URL Link 1e4a85e9660f23... ['sales','cold-calling'...]

Titles Data

ID Title
82 Pediatricians, General

OES Data

area area_title area_type naics naics_title own_code occ_code occ_title group tot_emp emp_prse jobs_1000 loc_quotient pct_total h_mean a_mean mean_prse h_pct10 h_pct25 h_median h_pct75 h_pct90 a_pct10 a_pct25 a_median a_pct75 a_pct90 annual hourly
99 U.S. 1 000000 Cross-industry 1235 00-0000 All Occupations total " 132,588,810 " 0.1 22.33 " 46,440 " 0.1 8.74 10.90 16.87 27.34 42.47 " 18,190 " " 22,670 " " 35,080 " " 56,860 " " 88,330 "

SOC Table

2010 SOC Code 2010 SOC Title 2010 SOC Direct Match Title llustrative Example
11-1011 Chief Executives CEO

Example Script

The following is an example of how we can use multiprocessing to both speed up an operation AND stay within the constrains of our box's memory. The first part of the script is problem specific, feel free to skip it and focus on the second portion of the code which focuses on the multiprocessing engine.

#import the necessary packages
import pandas as pd
import us
import numpy as np
from multiprocessing import Pool,cpu_count,Queue,Manager

# the data in one particular column was number in the form that horrible excel version 
# of a number where '12000' is '12,000' with that beautiful useless comma in there. 
# did I mention I excel bothers me?
# instead of converting the number right away, we only convert them when we need to
def median_maker(column):
    return np.median([int(x.replace(',','')) for x in column])

# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'
# related_title_score_df is the dataframe of information for the title; columns = ['title','score'] 
### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871
# code_title_df contains columns ['code','title']
# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)


def job_title_location_matcher(title,location):
    try:
        related_title_score_df = dictionary_of_dataframes[title]
        # we limit dataframe1 to only those related_titles that are above 
        # a previously established threshold
        related_title_score_df = related_title_score_df[title_score_df['score']>80]

        #we merge the related titles with another table and its codes
        codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
        codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()

        # merge the two dataframes by the codes
        merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
        #limit the BLS data to the state we want
        all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]

        #calculate some summary statistics for the time we want
        group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
        row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
        #convert it all to strings so we can combine them all when writing to file
        row_string = [str(x) for x in row]
        return row_string
    except:
        # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
        'do nothing' 

Here is where the magic happens:

#runs the function and puts the answers in the queue
def worker(row, q):
        ans = job_title_location_matcher(row[0],row[1])
        q.put(ans)



# this writes to the file while there are still things that could be in the queue
# this allows for multiple processes to write to the same file without blocking eachother
def listener(q):
    f = open(filename,'wb')
    while 1:
        m = q.get()
        if m =='kill':
                break
        f.write(','.join(m) + '\n')
        f.flush()
    f.close()

def main():
    #load all your data, then throw out all unnecessary tables/columns
    filename = 'skill_TEST_POOL.txt'

    #sets up the necessary multiprocessing tasks 
    manager = Manager()
    q = manager.Queue()
    pool = Pool(cpu_count() + 2)
    watcher = pool.map_async(listener,(q,))

    jobs = []
    #titles_states is a dataframe of millions of job titles and states they were found in
    for i in titles_states.iloc:
        job = pool.map_async(worker, (i, q))
        jobs.append(job)

    for job in jobs:
        job.get()
    q.put('kill')
    pool.close()
    pool.join()
        
if __name__ == "__main__":
    main()


Because of the size of respective dataframes (all totaled ~100 Gb), fitting them all into memory just wasn't going to happen. By writing the final dataframe line by line and never actually storing it in memory we are able to do all the calculations and combinations we needed. The 'standard method' here is that we could have just wrote a 'write_line' function at the end of 'job_title_location_matcher', but then it would have only processed one instance at a time. This would have taken nearly ~2 days to finish because of the number of titles/state combinations we have; so instead we used multiprocessing and it took ~2 hours.

While the underpinnings of this tutorial might be out of the realm of your use-case, by utilizing the power of multiprocessing, you're able to beat a lot of your computer's constraints. Here, we were working on a c3.8xl ubuntu ec2 with 32 cores and 60 Gb of RAM (that's a lot of RAM, but with all the merges we were not not able to fit the data in memory). The key takeaway should be that we were able to effectively utilize ~100 Gb of data on a 60 Gb machine all the while cutting the run-time down by ~25X. When automating a process, however large, by utilizing your cores through multiprocessing, you're able to get the maximum amount of efficiency out of your machine, this might not be news for some, but for others, simply thinking about how you can speed up processes via multiprocessing is a huge benefit to the automation of any task. As an aside, this segment is somewhat of a continuation from our other blogpost on skill assets in the job-market.

Check out some of the stuff we do