Unable to calculate in parallel what can be calculated on a for loop
2
1
Entering edit mode
@lluis-revilla-sancho
Last seen 5 weeks ago
European Union

I am using registerDoParallel(2) to run some analysis in parallel using foreach and %dopar%. However I can't manage to calculate in parallel one thing I can calculate in sequential order.

The function I want to use in parallel is this one:

# Calculates the degree of overlap of the GO BP ontologies of entrez ids.
go_cor <- function(e_a, e_b, chip = "hgu133plus2.db", mapfun = NULL,
                   Ontology = "BP", ...) {
  # https://support.bioconductor.org/p/85702/#85732

  if (is.na(e_a) | is.na(e_b)) {
    return(NA)
  }
  # Ensure proper format
  e_a <- as.character(e_a)
  e_b <- as.character(e_b)

  if (mapfun) {
    mapfunc <- function(z) {
      mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
    }

    LP <- simLL(e_a, e_b, Ontology, measure = "LP", mapfun = mapfunc)
    UI <- simLL(e_a, e_b, Ontology, measure = "UI", mapfun = mapfunc)
  } else {
    LP <- simLL(e_a, e_b, Ontology, measure = "LP", chip = chip)
    UI <- simLL(e_a, e_b, Ontology, measure = "UI", chip = chip)
  }
  out <- NA
  if (length(LP) > 1 | length(UI) > 1) {
    if (is.na(LP["sim"]) | is.na(UI["sim"])) {
      return(NA)
    } else {
      mean.g1 <- s.path(LP$g1)
      mean.g2 <- s.path(LP$g2)
      out <- (UI$sim/LP$sim)*max(mean.g1, mean.g2,na.rm = T)
      return(out)
    }
  } else if (is.na(LP) | is.na(UI)) {
    return(out)
  }
  return(out)
}

s.path <- function(ig){
  lfi <- graph::leaves(ig, "in")
  degs <- graph::degree(ig)
  root <- names(degs$outDegree)[degs$outDegree == 0]
  paths <- RBGL::sp.between(ig, lfi, root)
  plens <- Biobase::subListExtract(paths, "length", simplify = TRUE)
  out <- max(plens)
}

This code is called with code similar to:

comb <-list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"
), c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"
), c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"
), c("159", "175"), c("159", "280"), c("175", "280"))

go.mat <- foreach(i = seq_len(length(comb)),
                  .combine = c, verbose = TRUE) %dopar% {
    ## message("new comb ", paste(comb))
    combi <- comb[i]
    go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
}

When I use %dopar% or %do% with registerDoSEQ() I just get the first score, but when I use a for loop it calculates more than one score `for (i in 1:length(comb){....}`.

Is there anything on the Bioconductor packages I use inside go_cor function that prevents me to running it on parallel? I thought it could be due to not loading the right libraries, on the workers, but I used the `.package =  c("org.Hs.eg.db", "RBGL", "Rgraphviz", "graph", "Biobase", "AnnotationDbi")` and still it didn't work. By didn't work I mean I get no error neither message of why it is failing, just one plain value in go.mat and nothing else.

 

> sessionInfo()
R version 3.2.3 (2015-12-10)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.2 LTS

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C              
 [3] LC_TIME=es_ES.UTF-8        LC_COLLATE=en_US.UTF-8    
 [5] LC_MONETARY=es_ES.UTF-8    LC_MESSAGES=en_US.UTF-8   
 [7] LC_PAPER=es_ES.UTF-8       LC_NAME=C                 
 [9] LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=es_ES.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
 [1] grid      stats4    parallel  stats     graphics  grDevices utils    
 [8] datasets  methods   base     

other attached packages:
 [1] topGO_2.22.0         SparseM_1.72         Rgraphviz_2.14.0    
 [4] RBGL_1.46.0          org.Hs.eg.db_3.2.3   GOstats_2.36.0      
 [7] graph_1.48.0         Category_2.36.0      GO.db_3.2.2         
[10] RSQLite_1.0.0        DBI_0.5-1            AnnotationDbi_1.32.3
[13] IRanges_2.4.8        S4Vectors_0.8.11     Matrix_1.2-7.1      
[16] Biobase_2.30.0       BiocGenerics_0.16.1  doParallel_1.0.10   
[19] iterators_1.0.8      foreach_1.4.3        BiocParallel_1.4.3  

loaded via a namespace (and not attached):
 [1] splines_3.2.3          xtable_1.8-2           lattice_0.20-34       
 [4] tools_3.2.3            AnnotationForge_1.12.2 genefilter_1.52.1     
 [7] lambda.r_1.1.9         futile.logger_1.4.3    survival_2.39-5       
[10] GSEABase_1.32.0        futile.options_1.0.0   codetools_0.2-15      
[13] XML_3.98-1.4           annotate_1.48.0      
> registered()
$MulticoreParam
class: MulticoreParam
  bpjobname:BPJOB; bpworkers:10; bptasks:0; bptimeout:Inf; bpRNGseed:; bpisup:FALSE
  bplog:FALSE; bpthreshold:INFO; bplogdir:NA
  bpstopOnError:FALSE; bpprogressbar:FALSE
  bpresultdir:NA
cluster type: FORK

$SnowParam
class: SnowParam
  bpjobname:BPJOB; bpworkers:10; bptasks:0; bptimeout:Inf; bpRNGseed:; bpisup:FALSE
  bplog:FALSE; bpthreshold:INFO; bplogdir:NA
  bpstopOnError:FALSE; bpprogressbar:FALSE
  bpresultdir:NA
cluster type: SOCK

$SerialParam
class: SerialParam
  bplog:FALSE; bpthreshold:INFO
  bpcatchErrors:FALSE

EDIT: After some modifications (per Mike answer) I can see all the iterations but there is an error in

task 2 failed - "rsqlite_query_fetch: failed first step: database disk image is malformed"
biocparallel foreach org.hs.eg.db gostats • 4.4k views
ADD COMMENT
1
Entering edit mode
@martin-morgan-1513
Last seen 2 days ago
United States

I made your example as minimal as possible by removing parts until I had just what was needed to reproduce the error

suppressPackageStartupMessages({
    library("GOstats")
    library("org.Hs.eg.db")

    library("foreach")
    library("doParallel")
    registerDoParallel()
})

go_cor <- function(e_a, e_b) {
    mapfunc <- function(z)
        mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
    simLL(e_a, e_b, "BP", measure = "LP", mapfun = mapfunc)
}

comb <- list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"),
    c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"),
    c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"),
    c("159", "175"), c("159", "280"), c("175", "280"))

foreach(combi = comb) %dopar%
    go_cor(combi[1], combi[2])

I agree with Robert that the likely problem is that multiple threads are using the same database connection, and I tried to solve this by loading the data base packages in the workers only

suppressPackageStartupMessages({
    library("foreach")
    library("doParallel")
    registerDoParallel()
})

go_cor <- function(e_a, e_b) {
    suppressPackageStartupMessages({
        library("GOstats")
        library("org.Hs.eg.db")
    })
    mapfunc <- function(z)
        mget(z, revmap(org.Hs.egGO2EG), ifnotfound = NA)
    simLL(e_a, e_b, "BP", measure = "LP", mapfun = mapfunc)
}

comb <- list(c("12", "28"), c("12", "52"), c("12", "159"), c("12", "175"),
    c("12", "280"), c("28", "52"), c("28", "159"), c("28", "175"),
    c("28", "280"), c("52", "159"), c("52", "175"), c("52", "280"),
    c("159", "175"), c("159", "280"), c("175", "280"))

foreach(combi = comb) %dopar%
    go_cor(combi[1], combi[2])

This seems to work.

ADD COMMENT
0
Entering edit mode
Mike Smith ★ 6.6k
@mike-smith
Last seen 8 hours ago
EMBL Heidelberg

A couple of thoughts on this bit of code

go.mat <- foreach(i = seq_len(length(comb),
                      .combine = c, verbose = TRUE) %dopar% {
                        # message("new comb ", paste(comb))
                        combi <- comb[i]
                        go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
                      }

Are you missing a bracket to close seq_len() before the .combine? Also the verbose argument to foreach() has a period before it e.g. .verbose = TRUE. I think without that you're passing verbose into the evaluation environment and that may be influencing how many times it is executed.

Does this work any better?

 

go.mat <- foreach(i = seq_len(length(comb)),
                  .combine = c,
                  .verbose = TRUE) %dopar% {
                      # message("new comb ", paste(comb))
                      combi <- comb[i]
                      go_cor(combi[1], combi[2], mapfun = TRUE, Ontology = "BP")
                  }
ADD COMMENT
0
Entering edit mode

The bracket was a mistake when doing the reproducible example (Rstudio makes it so easy to accommodate), the .verbose, might be the reason, at least not it takes longer to finish. I hope it won't be that, it would be too embarrassing. Thanks Mike

ADD REPLY
0
Entering edit mode

Although it has helped the .verbose, didn't work completely, I can see all the iterations but there is an error in

task 2 failed - "rsqlite_query_fetch: failed first step: database disk image is malformed"

 

And on the verbose output

numValues: 843, numResults: 0, stopped: TRUE
got results for task 1
numValues: 843, numResults: 1, stopped: TRUE
returning status FALSE
got results for task 2
accumulate got an error result
numValues: 843, numResults: 2, stopped: TRUE
returning status FALSE

ADD REPLY
0
Entering edit mode

I would suggest updating to the latest version of R and the various packages.  Yours is nearly a year old and this runs without errors for me.  Here's the output from go.mat and my sessionInfo()

> go.mat
 [1] 0.2516854 0.2146341 0.2335456 0.2400000 0.2400000 0.5490196 0.2777778 0.4949495 0.2424242 0.1730769 0.8115942 0.5555556 0.2222222
[14] 0.2040816 0.5072464
> sessionInfo()
R Under development (unstable) (2016-08-31 r71184)
Platform: x86_64-pc-linux-gnu (64-bit)
Running under: Ubuntu 14.04.4 LTS

locale:
 [1] LC_CTYPE=en_US.UTF-8       LC_NUMERIC=C               LC_TIME=en_US.UTF-8        LC_COLLATE=en_US.UTF-8     LC_MONETARY=de_DE.UTF-8   
 [6] LC_MESSAGES=en_US.UTF-8    LC_PAPER=de_DE.UTF-8       LC_NAME=C                  LC_ADDRESS=C               LC_TELEPHONE=C            
[11] LC_MEASUREMENT=de_DE.UTF-8 LC_IDENTIFICATION=C       

attached base packages:
[1] stats4    parallel  stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
 [1] BiocParallel_1.8.1   GO.db_3.4.0          org.Hs.eg.db_3.4.0   GOstats_2.40.0       graph_1.52.0         Category_2.40.0     
 [7] Matrix_1.2-7.1       AnnotationDbi_1.36.0 IRanges_2.8.0        S4Vectors_0.12.0     Biobase_2.34.0       BiocGenerics_0.20.0 
[13] BiocInstaller_1.24.0 foreach_1.4.3       

loaded via a namespace (and not attached):
 [1] splines_3.4.0          xtable_1.8-2           lattice_0.20-34        tools_3.4.0            grid_3.4.0            
 [6] AnnotationForge_1.16.0 DBI_0.5-1              genefilter_1.56.0      iterators_1.0.8        survival_2.39-5       
[11] RBGL_1.50.0            GSEABase_1.36.0        bitops_1.0-6           codetools_0.2-15       RCurl_1.95-4.8        
[16] RSQLite_1.0.0          compiler_3.4.0         XML_3.98-1.4           annotate_1.52.0       
ADD REPLY
0
Entering edit mode

Sometimes the error is on task 2 but if I run it again it might come in task X or Y, which could be already addressed in newer versions. Thanks for testing it.

ADD REPLY
0
Entering edit mode

I have updated to the latest R and Bioconductor version but the problem is still there. I have problems providing a useful reproducible error because each time the error is different.The error in the task is "accumulate got an error result" which results on foreach stopping because:"INTEGER() can only be applied to a 'integer', not a 'double'" or "rsqlite_query_fetch: failed: database disk image is malformed", but it happens at different combinations.

ADD REPLY
0
Entering edit mode

Lluis,

this error suggests me that at some point more than one process is trying to access the SQLite database behind an annotation package through the same connection, which means through the same annotation object. Instead, each process should load independently the annotation package, which will create its own unique SQLite connection within the process.

cheers,

robert.

ADD REPLY
0
Entering edit mode

It might be but If I load the annotation package independently in each worker via foreach(..., .packages = ("GOstats", "org.Hs.eg.db")) I get the same error.

ADD REPLY

Login before adding your answer.

Traffic: 536 users visited in the last hour
Help About
FAQ
Access RSS
API
Stats

Use of this site constitutes acceptance of our User Agreement and Privacy Policy.

Powered by the version 2.3.6