当我们进入大数据时代之后,并行运算就十分重要了。什么是大数据呢?引用一句流行语吧:
Big data is hype. Everybody talks about big data; nobody knows exactly what it is.
对于生物学来说,大数据对应的应该是第二代测序技术的兴起。所以我们谈到并行运算,主要是为了处理测序数据。在R中,有许多并行运算的包,HighPerformanceComputing就列举了当前所有的用于并行运算的软件包。为了将它更简单的呈现,我绘制了一张图,以便大家能够对常用的一些并行运算的软件包有一个大约的印象,当人们谈及这些包时,我们会有一个印象。
从上图中,可以看到,snow几乎关联了所有的包。但是,因为它很基础,所以很多包对它进行了再一次包装,以达到编程人员即使不了解硬件信息,也可以很轻松地移植并行运算代码的目的。而本文的重点,当然是bioconductor中最新出现的用于parallel运算的包:BiocParallel。它的优势在于它支持了很多bioconductor的类,比如GRanges数据。 下面,我们就通过一个实例来了解如何使用BiocParallel,以及在使用的过程中应该注意些什么。
> ##获取一个大一点的数据
> library(GEOquery)
> setwd("Downloads/")
> getGEOSuppFiles("GSM917672")
[1] "ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl/"
trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_+_density.wig.gz'
ftp data connection made, file length 17133766 bytes
opened URL
==================================================
downloaded 16.3 Mb
trying URL 'ftp://ftp.ncbi.nlm.nih.gov/geo/samples/GSM917nnn/GSM917672/suppl//GSM917672_286_-_density.wig.gz'
ftp data connection made, file length 16134038 bytes
opened URL
==================================================
downloaded 15.4 Mb
> setwd("GSM917672/")
> sapply(dir(), gunzip)
$`GSM917672_286_-_density.wig.gz`
character(0)
$`GSM917672_286_+_density.wig.gz`
character(0)
> file <- dir()[1]
> ##将数据读入内存
> s <- file.info(file)$size
> buf <- readChar(file, s, useBytes=TRUE)
> ##将数据按照WIG文件格式说明分割成小段,每一段只包含一段完整的WIG峰值
> buf <- strsplit(buf, "\n", fixed=TRUE, useBytes=TRUE)[[1]]
> infoLine <- grep("Step", buf)
> block <- c(infoLine, length(buf)+1)
> dif <- diff(block)
> block <- rep(infoLine, dif)
> buf <- split(buf, block)
> ##WIG文件的处理
> library(GenomicRanges)
> parser <- function(ele){
+ library(GenomicRanges)##在并行运算中,所有的包都需要再次载入,而在非并行运算中只需要载入一次
+ firstline <- ele[1]
+ firstline <- unlist(strsplit(firstline, "\\s"))
+ firstline <- firstline[firstline!=""]
+ structure <- firstline[1]
+ firstline <- firstline[-1]
+ firstline <- do.call(rbind,strsplit(firstline, "=", fixed=TRUE))
+ firstline <- firstline[match(c("chrom","span","start","step"),
+ firstline[,1]),]
+ wiginfo <- c(structure, firstline[,2])
+ names(wiginfo) <- c("structure", "chrom","span","start","step")
+ start <- as.numeric(wiginfo["start"])
+ end <- start + 1:((length(ele)-1) * as.numeric(wiginfo["step"])) - 1
+ start <- c(start, end+1)[1:(length(ele)-1)]
+ value <- as.numeric(ele[2:length(ele)])
+ GRanges(seqnames=wiginfo["chrom"],
+ ranges=IRanges(start, end),
+ score=value)
+ }
> ##使用pbapply中pblapply来比较运算速度。因为这个过程中也同样需要输出进度条
> library(pbapply)
> system.time(r1 <- pblapply(buf[1:100], parser))
|++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
user system elapsed
1.474 0.017 1.512
> system.time(r1 <- pblapply(buf[1:10000], parser))
|++++++++++++++++++++++++++++++++++++++++++++++++++| 100%
user system elapsed
152.830 0.867 157.519
> system.time(r1 <- bplapply(buf[1:50000], parser))
user system elapsed
1247.566 6.536 1257.071
> ##使用BiocParallel来进行运算
> library("BiocParallel")
> library(BatchJobs)
> cluster.functions <- makeClusterFunctionsMulticore()
Setting for worker localhost: ncpus=8
> bpParams <- BatchJobsParam(cluster.functions=cluster.functions)
> system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
user system elapsed
0.425 0.325 10.675
> system.time(r2 <- bplapply(buf[1:10000], parser, BPPARAM=bpParams))
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:10000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)0:16)
user system elapsed
7.898 2.351 166.118
> system.time(r2 <- bplapply(buf[1:50000], parser, BPPARAM=bpParams))
SubmitJobs |+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00)
Waiting [S:0 R:0 D:50000 E:0] |++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++| 100% (00:00:00):02:51))
user system elapsed
55.197 11.359 732.627
>
> ##比较运算结果是否一致
> identical(r1, r2)
[1] TRUE从上面的步骤来看,使用BiocParallel进行运算分为三步走, 第一步,书写线程安全的程序,因为每个内核都会重新启动一个R来进行运算,所以程序的书写必须保证每个全新启动的R都可以正确的在其环境中查找到你所需要的所有变量或者函数。 第二步,设置环境参数。这一步使用makeClusterFunctions相关的函数。示例中我们使用了makeClusterFunctionsMulticore函数,其它还有makeClusterFunctionsInteractive, makeClusterFunctionsLocal, makeClusterFunctionsSSH, makeClusterFunctionsTorque, makeClusterFunctionsSGE, makeClusterFunctionsSLURM,具体可参见:?makeClusterFunctions。 第三步,使用bplapply函数提交运算。 从运算的时效性来看,如果每个单独的线程都只是很小的运算量,使用多核并不一定能节约时间,反而有可能因为等待线程结果而花费更多的时间。所以这种时候,不如使用GPU运算。如果每个单独的线程运算量很大时,使用多核就可以极大的节约时间了。 对R中的并行运算的中文博文,还可以参考:http://algosolo.com/competition/429.html 原文来自:http://pgfe.umassmed.edu/ou/archives/3396

