Parallel processing in R: Data Ingest Example

Active Analytics Ltd: posted 08 June 2014 by Chibisi Chima-Okereke

Introduction

In a previous blog, we built a small web app using acquisition data from Fannie Mae. The data set is rather large and consists of some 51 text files which are 2.7 GB unzipped. It does take quite some time if you choose to read in the files one after another in a sequential fashion. R has lots of tools for parallelizing processes to use all the computing resources on a machine or cluster.

In this blog entry we use the foreach and doParallel package to demonstrate how to run parallel code in R simply and easily. In this case we read in the data files in parallel and return a large data frame of the data read from all the files.

Setting up

Loading our environment

First we load the packages and specify the parallel backend

require(doParallel)
nWorkers <- 7
registerDoParallel(nWorkers)

# We get the job files
jobFiles <- list.files(pattern = "Acquisition_", 
	path = "../data/", full.names = TRUE)

Our worker functions

We then create two functions, one to split the files to be read for each "thread", and another to process the files for each thread.

# Function to split a vector down to job list for each node
splitJobs <- function(jobs, nWorkers)
{
	nJobs <- length(jobs)
	nWorkers <- nWorkers - 1
	nNodeTasks <- nJobs %/% nWorkers
	splits <- c(rep(1:nWorkers, rep(nNodeTasks, nWorkers)), 
		rep((nWorkers + 1), nJobs%%nWorkers))
	jobSplits <- split(jobs, splits)
	names(jobSplits) <- NULL
	return(jobSplits)
}

# Function to read a vector of text files
readTextFiles <- function(files, ...){
	tables <- lapply(files, read.table, ...)
	tables <- do.call(rbind, tables)
	return(tables)
}

Executing the process

Now we read in the data

jobList <- splitJobs(jobFiles, nWorkers)

ptime <- system.time(performanceData <- foreach(i = 1:nWorkers, .combine = rbind) %dopar% {
	job <- jobList[[i]]
	readTextFiles(job, sep = "|", header = FALSE)
})

gc()

The time taken on my 8 core i7 core laptop, 16 Gb RAM running Ubuntu 12.04:

ptime
   user  system elapsed 
411.404  10.472 124.638 

Let's assign the column names:

# Assign some column names
colNames <- c("loanID", "channel", "sellerName", "origInterestRate", "origUPB", "origLoanTerm", 
              "originationDate", "firstPaymentDate", "origLTV", "origCombinedLTV", "NoBorrowers",
              "debtIncomeRatio", "creditScore", "firstTimeBuyer", "loanPurpose", "PropertyType",
              "numberOfUnits", "occupancyStatus", "propertyState", "zip", "mortgageInsurancePercentage",
              "productType")

colnames(performanceData) <- colNames

For course you could always parallelize without batching:

ptime2 <- system.time(performanceData2 <- foreach(i = 1:length(jobFiles), .combine = rbind) %dopar% {
	read.table(jobFiles[i], sep = "|", header = FALSE)
})

But it is a little slower probably because all the rbind is done at the end.

> ptime2
   user  system elapsed 
426.792  29.780 134.726 

Here is the same timing for loading the data sequentially

sTime <- system.time(
	performanceData3 <- readTextFiles(jobFiles, sep = "|", header = FALSE)
)
sTime
   user  system elapsed 
243.884  25.076 269.635 

Conclusion

There are a lot of tools in R that allow parallel and high performance computing in general, for more information check out the high performance computing taskview.

We hope that little time saving tip helps your productivity.

Data Science Consulting & Software Training

Active Analytics Ltd. is a data science consultancy, and Open Source Statistical Software Training company. Please contact us for more details or to comment on the blog.

Dr. Chibisi Chima-Okereke, R Training, Statistics and Data Analysis.