Hi, This is the first time I've programmed a parallel solution in R and Bioconductor, so I wanted to make a post and see if I've done this optimally/right!
My problem was to do compute a series of sliding window based sequence identity scans between many pairs two sequences. To do this, I programmed two iterators. One iterates sliding windows:
windows <- function(obj, ...){ UseMethod('windows') } getIterVal <- function(obj, ...){ UseMethod('getIterVal') } windows.default <- function(obj, width, step, checkFunc = function(...) TRUE){ n <- length(obj) if(width < 1){stop("Window width must be ≥ 1.")} if(step < 1){stop("step must be ≥ 1.")} if(any(width > n)){stop("The window size cannot be greater than number of data elements.")} state <- new.env() state$i <- 1L state$obj <- obj state$width <- width state$step <- step it <- list(state=state, length=n, checkFunc=checkFunc) class(it) <- c("containerwindow", "window", "iter") return(it) } getIterVal.containerwindow <- function(obj, ...) { i <- obj$state$i if (i > obj$length) stop('SubscriptOutOfBounds', call.=FALSE) start <- i end <- start + obj$state$width - 1 return(obj$state$obj[start:end]) } nextElem.containerwindow <- function(obj, ...) { repeat { tryCatch({ val <- getIterVal(obj) if(obj$checkFunc(val)){ obj$state$i <- obj$state$i + obj$state$step return(val) } obj$state$i <- obj$state$i + obj$state$step }, error = function(e){ if(any(nzchar(e$message))){ if(identical(e$message, "SubscriptOutOfBounds")){ stop("StopIteration", call.=FALSE) } else { stop(e$message, call.=FALSE) } } else { stop("Abort", call.=e) } }) } }
This iterator can then be used in a function that takes a DNAStringSet of two sequences:
slidingSimilarity <- function(dna, windowSize, stepSize){ conMat <- colSums(consensusMatrix(dna) != 0) > 1 itr <- windows(conMat, width = windowSize, step = stepSize, checkFunc = function(i) !any(is.na(i))); dists <- foreach(x = itr, .combine = c) %do% { sum(x) } dists <- 100 - round((dists / windowSize) * 100) windowStarts <- seq(from = 1, to = width(dna)[1], by = stepSize) windowEnds <- seq(from = windowSize, to = width(dna)[1], by = stepSize) ranges <- IRanges(start = windowStarts[1:length(windowEnds)], end = windowEnds) data <- RangedData(ranges = ranges, rawSS = dists) return(data) }
From two aligned sequences in a DNAStringSet - I get a neat sliding window scan of sequence identity!
But now say I wanted to do many pairs of sequences, this may take some time, so I'd want to parallelise. However, I want to avoid passing the entire DNAStringSet object (or copies) around, as this could be memory intensive. The idea came to me this can be done with an iterator too, which should only pass around one or two sequences, and not the entire dataset. That's the idea anyway, and I try to accomplish it by defining an iterator as follows.
pairsRef <- function(obj, ...){ UseMethod('pairsRef') } pairsRef.DNAStringSet <- function(obj, ref = NULL, checkFunc = function(...) TRUE){ state <- new.env() state$i <- 0L state$obj <- obj if(is.null(ref)){ state$ref <- names(obj)[1] } else { state$ref <- ref } state$nonRefs <- names(obj) state$nonRefs <- state$nonRefs[state$nonRefs != state$ref] it <- list(state=state, checkFunc=checkFunc) class(it) <- c("pairsRef", "iter") return(it) } getIterVal.pairsRef <- function(obj, plus = 0L, ...) { i <- obj$state$i + plus if(i > length(obj$state$nonRefs)) stop('SubscriptOutOfBounds', call.=FALSE) names <- c(obj$state$ref, obj$state$nonRefs[i]) return(obj$state$obj[names]) } nextElem.pairsRef <- function(obj, ...) { repeat { tryCatch({ val <- getIterVal(obj, 1L) if(obj$checkFunc(val)){ obj$state$i <- obj$state$i + 1L return(val) } obj$state$i <- obj$state$i + 1L }, error = function(e){ if(any(nzchar(e$message))){ if(identical(e$message, "SubscriptOutOfBounds")){ stop("StopIteration", call.=FALSE) } else { stop(e$message, call.=FALSE) } } else { stop("Abort", call.=e) } }) } }
Using the iterator many analyses of pairs of sequences can be done using multiple cores:
library(Biostrings) library(iterators) library(foreach) library(doMC) nworkers <- 3 registerDoMC(nworkers) sequences <- readDNAStringSet(fastaFile, format = "fasta", use.names = TRUE) results <- foreach(x = pairsRef(sequences)) %dopar% { slidingSimilarity(x, windowSize, stepSize) }
So, does this work for avoiding copying whole objects, and is it the right sort of thing to do? Comments, suggestions, and advice is very much appreciated.
Thanks,
Ben.
That's a lot of code and it would be interesting to actually run it on an appropriate fasta file that we all have access to, e.g., one available in
dir(system.file(package="Biostrings", "extdata"))
.If you want your package to be portable/cross-platform, don't use doMC, it does not work on windows.
Use the parallel package, or better yet, BiocParallel, which will automatically select the appropriate back end for you depending on your platform.
Thanks Dan! This looks like it might be exactly what I need. One question if anyone knows this - does the BiocPar methods or apply, mapply and so on work like I believe my own iterators do, in that they only send a subset of the data to a worker, to avoid the overhead of copying a large data object.
Yes, the typical approach is to separate data into '
tasks
' chunks (argument to*Param()
constructors), and send each chunk to a worker. The defaulttasks=0
divides the data into as many chunks as workers.tasks=length(X)
would divide X into chunks of size 1; this tends to be quite inefficient, but would be appropriate if each worker did a lot of work, the work load varied considerably between tasks, or one was very concerned about managing memory.But one of the funny things is that you can often get much better performance increase, much less complexity, and much more robust code by writing efficient vectorized code. So really your first stop is, of course IMHO, to add to your question with a runnable example and expected output.