snow

Rで並列処理 (snowライブラリ)から、パッケージsnowの使い方その1。

install.packages("snow")
library(snow)
#localhostに2個のノードを割り当てられます。
cl <- makeSOCKcluster(c("localhost", "localhost"))
#試しにgetwd()でそれぞれの環境のカレントディレクトリを取得してみます。
clusterCall(cl, getwd)
#1000行100列 (マイクロアレイデータを想定、行が遺伝子、列がサンプル)のデータを作り、各行に対してWelchのt検定を行ってみます。
mat <- matrix(rnorm(100000), 1000, 100)
f <- factor(rep(0:1, each=50))
# 各ノードに変数をエクスポートするには、仕様上グローバル変数にする必要がある。
g <<- f
clusterExport(cl, "g")
#単独ノード
system.time(apply(mat, 1, function(x) t.test(x ~ g)$p.value) -> c1)
# 2ノード 
system.time(parRapply(cl, mat, function(x) t.test(x ~ g)$p.value) -> c2)
# もちろん、結果は一致しないと困る
sum(abs(c1-c2))

実は、factorの扱いに慣れていないので、正直なところ私にとって使いにくい…
 
Rで並列計算から、パッケージsnowの使い方その2。

###並列計算用に関数作成###
start_simulation <- function() { 
###データの生成###
a <- rnorm(400)
b <- rnorm(400)
###回帰###
c <- lm(a ~ b)
###とりあえず、係数でも記録します###
return(c(a = coef(c)[1], 
         b = coef(c)[2])
       )
}
###繰り返し数###
k <- 10000

library(snow)
###乱数の都合でこいつも使います。ないと面白いことになります。###
library(rlecuyer)
#Core 2 Duoなので2つです。クアッドコアとかなら4ですかね。開きすぎると閉じさせられます。
#Firewallは無視です。ブロック解除です。
cl <- makeCluster(2, type = "SOCK")
###乱数種はここでセット###
clusterSetupRNG (cl,seed=123)
###並列計算するfunctionをここで明示###
clusterExport(cl, "start_simulation")
###スタート。ついでに時間も計っちゃいましょう###
###係数はpに、時間はtimesに入るようになっています###
times1 <- system.time(p <- parSapply(cl, 1:k, function(x){start_simulation()})) #並列計算
times2 <- system.time(for(ki in 1:k){start_simulation()}) #単独
times1;times2

乱数の設定ができるらしい。

clusterExport()

で、CPUで計算すべき関数を渡すようである。
 
Rで円周率計算+並列計算から、パッケージsnowの使い方その3。

library(snow)
set.seed(1)
tens <- 10^c(1:5)
# データ数を10倍しながら、かかる時間を調べている
elapsed.time <- matrix(0, length(tens), 2)
dimnames(elapsed.time) <- list(tens, c("single", "multi"))
for( points.num in 1:length(tens) ){
	coords <- matrix(runif(2*tens[points.num]),ncol=2)
	start <- proc.time()
	cl <- makeSOCKcluster(rep("localhost", 2)) # 2は利用するマシンのコア数
	elapsed.time[points.num, 2] <- system.time(myPI <- sum(parRapply(cl,coords,function(x){sqrt(sum(x^2))})<1)/points.num*4)[3]
	cl <- makeSOCKcluster(rep("localhost", 1)) # 1は利用するマシンのコア数
	elapsed.time[points.num, 1] <- system.time(myPI <- sum(parRapply(cl,coords,function(x){sqrt(sum(x^2))})<1)/points.num*4)[3]
}
stopCluster(cl)
elapsed.time
       single multi
10      0.009 0.003
100     0.097 0.004
1000    0.943 0.014
10000   9.781 0.074
1e+05 104.304 0.744