This is a quick and dirty tutorial about a hack that we used to fit our data into the constraints of our memory.
When working with some clients, you might find that their database is simply a repository of csv or excel files and you'll simply have to make do; often having to complete your work without updating their data-warehouse. Most of the time these files would be better served being stored in some simple DB framework, but time might not allow for that. This method came about from having contraints set on our time, our machine, and our scope.
So here's a good example of something we came across: Let's suppose you have to combine a bunch of tables together for feature generation (and you're not using a Neo4j, MongoDB, or some other type database, but rather have tables stored in csvs, tsvs, etc.), but you know that if you were to try to combine them all, the resulting dataframe would not fit into memory. So your first thought might be: 'I will just combine it into parts, then save those parts'. Now that seems like a great solution, but will probably be a relatively slow process. That is, unless we use multilple cores.
The goal was to combine information about related job titles for every job title we have (~10,000) with codes the government gives those related titles and then combine that result with state specific information for each of the related titles, then use features generated via word2vec to amplify existing features in our client's pipeline.
and to do it very fast... because who likes waiting around for things. Think of it as a multi-table join, but outside of a standard relational database
|1652398203||Sales Associate||2014-07-09 13:47:18||URL link||Company Name||City||State||Our Sales Associates are...||Sales / Sales Management / Business Development||low||2014-08-02 07:17:27||FeedDate||URL Link||1e4a85e9660f23...||['sales','cold-calling'...]|
|99||U.S.||1||000000||Cross-industry||1235||00-0000||All Occupations||total||" 132,588,810 "||0.1||22.33||" 46,440 "||0.1||8.74||10.90||16.87||27.34||42.47||" 18,190 "||" 22,670 "||" 35,080 "||" 56,860 "||" 88,330 "|
|2010 SOC Code||2010 SOC Title||2010 SOC Direct Match Title||llustrative Example|
The following is an example of how we can use multiprocessing to both speed up an operation AND stay within the constrains of our box's memory. The first part of the script is problem specific, feel free to skip it and focus on the second portion of the code which focuses on the multiprocessing engine.
#import the necessary packages import pandas as pd import us import numpy as np from multiprocessing import Pool,cpu_count,Queue,Manager # the data in one particular column was number in the form that horrible excel version # of a number where '12000' is '12,000' with that beautiful useless comma in there. # did I mention I excel bothers me? # instead of converting the number right away, we only convert them when we need to def median_maker(column): return np.median([int(x.replace(',','')) for x in column]) # dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist' # related_title_score_df is the dataframe of information for the title; columns = ['title','score'] ### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871 # code_title_df contains columns ['code','title'] # oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!) def job_title_location_matcher(title,location): try: related_title_score_df = dictionary_of_dataframes[title] # we limit dataframe1 to only those related_titles that are above # a previously established threshold related_title_score_df = related_title_score_df[title_score_df['score']>80] #we merge the related titles with another table and its codes codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df) codes_relTitles_scores = codes_relTitles_scores.drop_duplicates() # merge the two dataframes by the codes merged_df = pd.merge(codes_relTitles_scores, oes_data_df) #limit the BLS data to the state we want all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)] #calculate some summary statistics for the time we want group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker) row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90] #convert it all to strings so we can combine them all when writing to file row_string = [str(x) for x in row] return row_string except: # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant 'do nothing'
Here is where the magic happens:
#runs the function and puts the answers in the queue def worker(row, q): ans = job_title_location_matcher(row,row) q.put(ans) # this writes to the file while there are still things that could be in the queue # this allows for multiple processes to write to the same file without blocking eachother def listener(q): f = open(filename,'wb') while 1: m = q.get() if m =='kill': break f.write(','.join(m) + '\n') f.flush() f.close() def main(): #load all your data, then throw out all unnecessary tables/columns filename = 'skill_TEST_POOL.txt' #sets up the necessary multiprocessing tasks manager = Manager() q = manager.Queue() pool = Pool(cpu_count() + 2) watcher = pool.map_async(listener,(q,)) jobs =  #titles_states is a dataframe of millions of job titles and states they were found in for i in titles_states.iloc: job = pool.map_async(worker, (i, q)) jobs.append(job) for job in jobs: job.get() q.put('kill') pool.close() pool.join() if __name__ == "__main__": main()
Because of the size of respective dataframes (all totaled ~100 Gb), fitting them all into memory just wasn't going to happen. By writing the final dataframe line by line and never actually storing it in memory we are able to do all the calculations and combinations we needed. The 'standard method' here is that we could have just wrote a 'write_line' function at the end of 'job_title_location_matcher', but then it would have only processed one instance at a time. This would have taken nearly ~2 days to finish because of the number of titles/state combinations we have; so instead we used multiprocessing and it took ~2 hours.
While the underpinnings of this tutorial might be out of the realm of your use-case, by utilizing the power of multiprocessing, you're able to beat a lot of your computer's constraints. Here, we were working on a c3.8xl ubuntu ec2 with 32 cores and 60 Gb of RAM (that's a lot of RAM, but with all the merges we were not not able to fit the data in memory). The key takeaway should be that we were able to effectively utilize ~100 Gb of data on a 60 Gb machine all the while cutting the run-time down by ~25X. When automating a process, however large, by utilizing your cores through multiprocessing, you're able to get the maximum amount of efficiency out of your machine, this might not be news for some, but for others, simply thinking about how you can speed up processes via multiprocessing is a huge benefit to the automation of any task. As an aside, this segment is somewhat of a continuation from our other blogpost on skill assets in the job-market.