Bioconductor并行(parallel)运算

评论4,527

当我们进入大数据时代之后,并行运算就十分重要了。什么是大数据呢?引用一句流行语吧:

Big data is hype. Everybody talks about big data; nobody knows exactly what it is.

对于生物学来说,大数据对应的应该是第二代测序技术的兴起。所以我们谈到并行运算,主要是为了处理测序数据。在R中,有许多并行运算的包,HighPerformanceComputing就列举了当前所有的用于并行运算的软件包。为了将它更简单的呈现,我绘制了一张图,以便大家能够对常用的一些并行运算的软件包有一个大约的印象,当人们谈及这些包时,我们会有一个印象。 Bioconductor并行(parallel)运算 从上图中,可以看到,snow几乎关联了所有的包。但是,因为它很基础,所以很多包对它进行了再一次包装,以达到编程人员即使不了解硬件信息,也可以很轻松地移植并行运算代码的目的。而本文的重点,当然是bioconductor中最新出现的用于parallel运算的包:BiocParallel。它的优势在于它支持了很多bioconductor的类,比如GRanges数据。 下面,我们就通过一个实例来了解如何使用BiocParallel,以及在使用的过程中应该注意些什么。

> ##获取一个大一点的数据
> library(GEOquery)
> setwd("Downloads/")
> getGEOSuppFiles("GSM917672")
[1] "ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl/"
trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_+_density.wig.gz'
ftp data connection made, file length 17133766 bytes
opened URL
==================================================
downloaded 16.3 Mb
 
trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_-_density.wig.gz'
ftp data connection made, file length 16134038 bytes
opened URL
==================================================
downloaded 15.4 Mb
 
> setwd("GSM917672/")
> sapply(dir(), gunzip)
$`GSM917672_286_-_density.wig.gz`
character(0)
 
$`GSM917672_286_+_density.wig.gz`
character(0)
> file <- dir()[1]
> ##将数据读入内存
> s <- file.info(file)$size
> buf <- readChar(file, s, useBytes=TRUE)
> ##将数据按照WIG文件格式说明分割成小段,每一段只包含一段完整的WIG峰值
> buf <- strsplit(buf, "\n", fixed=TRUE, useBytes=TRUE)[[1]]
> infoLine <- grep("Step", buf)
> block <- c(infoLine, length(buf)+1)
> dif <- diff(block)
> block <- rep(infoLine, dif)
> buf <- split(buf, block)
> ##WIG文件的处理
> library(GenomicRanges)
> parser <- function(ele){
+ 	library(GenomicRanges)##在并行运算中,所有的包都需要再次载入,而在非并行运算中只需要载入一次
+ 	firstline <- ele[1]
+ 	firstline <- unlist(strsplit(firstline, "\\s"))
+ 	firstline <- firstline[firstline!=""]
+ 	structure <- firstline[1]
+ 	firstline <- firstline[-1]
+ 	firstline <- do.call(rbind,strsplit(firstline, "=", fixed=TRUE))
+ 	firstline <- firstline[match(c("chrom","span","start","step"), 
+ 								 firstline[,1]),]
+ 	wiginfo <- c(structure, firstline[,2])
+ 	names(wiginfo) <- c("structure", "chrom","span","start","step")
+ 	start <- as.numeric(wiginfo["start"])
+ 	end <- start + 1:((length(ele)-1) * as.numeric(wiginfo["step"])) - 1
+ 	start <- c(start, end+1)[1:(length(ele)-1)]
+ 	value <- as.numeric(ele[2:length(ele)])
+ 	GRanges(seqnames=wiginfo["chrom"],
+ 			ranges=IRanges(start, end),
+ 			score=value)
+ }
> ##使用pbapply中pblapply来比较运算速度。因为这个过程中也同样需要输出进度条
> library(pbapply)
> system.time(r1 <- pblapply(buf[1:100], parser))
  |++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
   user  system elapsed 
  1.474   0.017   1.512 
> system.time(r1 <- pblapply(buf[1:10000], parser))
  |++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
   user  system elapsed 
152.830   0.867 157.519 
> system.time(r1 <- bplapply(buf[1:50000], parser))
    user   system  elapsed 
1247.566    6.536 1257.071 
> ##使用BiocParallel来进行运算
> library("BiocParallel")
> library(BatchJobs)
> cluster.functions <- makeClusterFunctionsMulticore()
Setting for worker localhost: ncpus=8
> bpParams <- BatchJobsParam(cluster.functions=cluster.functions)
> system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
 
   user  system elapsed 
  0.425   0.325  10.675 
> system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
 
   user  system elapsed 
  7.898   2.351 166.118  
> system.time(r2 <- bplapply(buf[1:50000], parser, BPPARAM=bpParams))
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:50000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00):02:51))
 
   user  system elapsed 
 55.197  11.359 732.627 
>  
> ##比较运算结果是否一致
> identical(r1, r2)
[1] TRUE

从上面的步骤来看,使用BiocParallel进行运算分为三步走, 第一步,书写线程安全的程序,因为每个内核都会重新启动一个R来进行运算,所以程序的书写必须保证每个全新启动的R都可以正确的在其环境中查找到你所需要的所有变量或者函数。 第二步,设置环境参数。这一步使用makeClusterFunctions相关的函数。示例中我们使用了makeClusterFunctionsMulticore函数,其它还有makeClusterFunctionsInteractive, makeClusterFunctionsLocal, makeClusterFunctionsSSH, makeClusterFunctionsTorque, makeClusterFunctionsSGE, makeClusterFunctionsSLURM,具体可参见:?makeClusterFunctions。 第三步,使用bplapply函数提交运算。 从运算的时效性来看,如果每个单独的线程都只是很小的运算量,使用多核并不一定能节约时间,反而有可能因为等待线程结果而花费更多的时间。所以这种时候,不如使用GPU运算。如果每个单独的线程运算量很大时,使用多核就可以极大的节约时间了。 对R中的并行运算的中文博文,还可以参考:http://algosolo.com/competition/429.html 原文来自:http://pgfe.umassmed.edu/ou/archives/3396

发表评论

匿名网友