Distributed Computing
Distributed — Module分散並列処理のためのツール。
Distributed.addprocs — Functionaddprocs(manager::ClusterManager; kwargs...) -> プロセス識別子のリスト指定されたクラスターマネージャーを介してワーカープロセスを起動します。
例えば、Beowulfクラスタは、パッケージClusterManagers.jlに実装されたカスタムクラスターマネージャーを介してサポートされています。
新しく起動されたワーカーがマスターからの接続確立を待つ秒数は、ワーカープロセスの環境内の変数JULIA_WORKER_TIMEOUTを介して指定できます。TCP/IPをトランスポートとして使用する場合にのみ関連します。
REPLをブロックせずにワーカーを起動するには、またはプログラム的にワーカーを起動する場合は、addprocsを独自のタスクで実行します。
例
# 忙しいクラスタでは、非同期に`addprocs`を呼び出す
t = @async addprocs(...)# ワーカーがオンラインになるときに利用する
if nprocs() > 1 # 少なくとも1つの新しいワーカーが利用可能であることを確認
.... # 分散実行を行う
end# 新しく起動されたワーカーIDまたはエラーメッセージを取得する
if istaskdone(t) # `addprocs`が完了したか確認し、`fetch`がブロックしないようにする
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> プロセス識別子のリストリモートマシン上にSSHを介してワーカープロセスを追加します。構成はキーワード引数で行います(下記参照)。特に、exenameキーワードを使用して、リモートマシン上のjuliaバイナリへのパスを指定できます。
machinesは、[user@]host[:port] [bind_addr[:port]]の形式の文字列として与えられる「マシンスペック」のベクターです。userは現在のユーザーにデフォルト設定され、portは標準SSHポートに設定されます。[bind_addr[:port]]が指定されている場合、他のワーカーは指定されたbind_addrとportでこのワーカーに接続します。
machinesベクター内でタプルを使用するか、(machine_spec, count)の形式を使用することで、リモートホスト上で複数のプロセスを起動することが可能です。ここで、countは指定されたホスト上で起動するワーカーの数です。ワーカー数に:autoを渡すと、リモートホスト上のCPUスレッドの数と同じ数のワーカーが起動します。
例:
addprocs([
"remote1", # 現在のユーザー名でログインして'remote1'に1つのワーカー
"user@remote2", # 'user'ユーザー名でログインして'remote2'に1つのワーカー
"user@remote3:2222", # 'remote3'のSSHポートを'2222'に指定
("user@remote4", 4), # 'remote4'に4つのワーカーを起動
("user@remote5", :auto), # 'remote5'のCPUスレッド数と同じ数のワーカーを起動
])キーワード引数:
tunnel:trueの場合、マスタープロセスからワーカーに接続するためにSSHトンネリングが使用されます。デフォルトはfalseです。multiplex:trueの場合、SSHトンネリングにSSHマルチプレクシングが使用されます。デフォルトはfalseです。ssh: ワーカーを起動するために使用されるSSHクライアント実行可能ファイルの名前またはパス。デフォルトは"ssh"です。sshflags: 追加のsshオプションを指定します。例:sshflags=`-i /home/foo/bar.pem`max_parallel: ホストに並行して接続される最大ワーカー数を指定します。デフォルトは10です。shell: ワーカー上でsshが接続するシェルの種類を指定します。shell=:posix: POSIX互換のUnix/Linuxシェル(sh、ksh、bash、dash、zshなど)。デフォルト。shell=:csh: Unix Cシェル(csh、tcsh)。shell=:wincmd: Microsoft Windowscmd.exe。
dir: ワーカー上の作業ディレクトリを指定します。デフォルトはホストの現在のディレクトリ(pwd()で見つけたもの)です。enable_threaded_blas:trueの場合、追加されたプロセスでBLASが複数のスレッドで実行されます。デフォルトはfalseです。exename:julia実行可能ファイルの名前。デフォルトは"$(Sys.BINDIR)/julia"または"$(Sys.BINDIR)/julia-debug"です。すべてのリモートマシンで共通のJuliaバージョンを使用することが推奨されます。そうしないと、シリアル化やコード配布が失敗する可能性があります。exeflags: ワーカープロセスに渡される追加のフラグ。Cmd、1つのフラグを保持するString、またはフラグごとに1つの要素を持つ文字列のコレクションである必要があります。例: $--threads=auto project=.$、"--compile-trace=stderr"または["--threads=auto", "--compile=all"]。topology: ワーカーが互いに接続する方法を指定します。接続されていないワーカー間でメッセージを送信するとエラーが発生します。topology=:all_to_all: すべてのプロセスが互いに接続されています。デフォルト。topology=:master_worker: ドライバープロセス、すなわちpid1のみがワーカーに接続します。ワーカーは互いに接続しません。topology=:custom: クラスターマネージャのlaunchメソッドがWorkerConfigのidentおよびconnect_identsフィールドを介して接続トポロジーを指定します。クラスターマネージャのIDidentを持つワーカーは、connect_identsで指定されたすべてのワーカーに接続します。
lazy:topology=:all_to_allの場合のみ適用されます。trueの場合、ワーカー間の接続は遅延して設定されます。すなわち、ワーカー間のリモート呼び出しの最初のインスタンスで設定されます。デフォルトはtrueです。env:env=["JULIA_DEPOT_PATH"=>"/depot"]のような文字列ペアの配列を提供して、リモートマシン上で環境変数が設定されるように要求します。デフォルトでは、環境変数JULIA_WORKER_TIMEOUTのみが自動的にローカルからリモート環境に渡されます。cmdline_cookie: 認証クッキーを--workerコマンドラインオプションを介して渡します。クッキーをssh stdioを介して渡す(より安全な)デフォルトの動作は、古い(ConPTY以前の)JuliaまたはWindowsバージョンを使用するWindowsワーカーでハングする可能性があるため、その場合はcmdline_cookie=trueが回避策を提供します。
環境変数:
マスタープロセスが新しく起動したワーカーとの接続を60.0秒以内に確立できない場合、ワーカーはそれを致命的な状況と見なし、終了します。このタイムアウトは環境変数JULIA_WORKER_TIMEOUTを介して制御できます。マスタープロセス上のJULIA_WORKER_TIMEOUTの値は、新しく起動したワーカーが接続確立を待つ秒数を指定します。
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> プロセス識別子のリストローカルホストで np ワーカーを起動し、内蔵の LocalManager を使用します。
ローカルワーカーは、メインプロセスから現在のパッケージ環境(すなわち、アクティブプロジェクト、LOAD_PATH、および DEPOT_PATH)を継承します。
ワーカーは ~/.julia/config/startup.jl スタートアップスクリプトを実行せず、他の実行中のプロセスとグローバル状態(コマンドラインスイッチ、グローバル変数、新しいメソッド定義、読み込まれたモジュールなど)を同期しないことに注意してください。
キーワード引数:
restrict::Bool:true(デフォルト)の場合、バインディングは127.0.0.1に制限されます。dir,exename,exeflags,env,topology,lazy,enable_threaded_blas:SSHManagerと同じ効果があります。addprocs(machines::AbstractVector)のドキュメントを参照してください。
Distributed.nprocs — Functionnprocs()利用可能なプロセスの数を取得します。
例
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — Functionnworkers()利用可能なワーカープロセスの数を取得します。これはnprocs()より1少ないです。nprocs() == 1の場合はnprocs()と等しくなります。
例
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — Methodprocs()プロセス識別子のリストを返します。pid 1(workers() には含まれない)を含みます。
例
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — Methodprocs(pid::Integer)同じ物理ノード上のすべてのプロセス識別子のリストを返します。具体的には、pid と同じ IP アドレスにバインドされているすべてのワーカーが返されます。
Distributed.workers — Functionworkers()すべてのワーカープロセス識別子のリストを返します。
例
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — Functionrmprocs(pids...; waitfor=typemax(Int))指定されたワーカーを削除します。プロセス1のみがワーカーを追加または削除できることに注意してください。
引数 waitfor は、ワーカーがシャットダウンするまでの待機時間を指定します:
- 指定しない場合、
rmprocsは要求されたすべてのpidsが削除されるまで待機します。 - 要求された
waitfor秒前にすべてのワーカーを終了できない場合、ErrorExceptionが発生します。 waitforの値が0の場合、呼び出しは即座に戻り、ワーカーは別のタスクで削除される予定です。スケジュールされたTaskオブジェクトが返されます。ユーザーは、他の並列呼び出しを行う前にタスクに対してwaitを呼び出す必要があります。
例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — Functioninterrupt(pids::Integer...)指定されたワーカーで現在実行中のタスクを中断します。これは、ローカルマシンでCtrl-Cを押すのと同等です。引数が指定されていない場合、すべてのワーカーが中断されます。
interrupt(pids::AbstractVector=workers())指定されたワーカーで現在実行中のタスクを中断します。これは、ローカルマシンでCtrl-Cを押すのと同等です。引数が指定されていない場合、すべてのワーカーが中断されます。
Distributed.myid — Functionmyid()現在のプロセスのIDを取得します。
例
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collectionコレクション c を変換し、利用可能なワーカーとタスクを使用して各要素に f を適用します。
複数のコレクション引数がある場合、要素ごとに f を適用します。
f はすべてのワーカープロセスで利用可能でなければなりません。詳細については、Code Availability and Loading Packages を参照してください。
ワーカープールが指定されていない場合、すべての利用可能なワーカーが CachingPool を介して使用されます。
デフォルトでは、pmap は指定されたすべてのワーカーに計算を分散します。ローカルプロセスのみを使用し、タスクに分散させるには、distributed=false を指定します。これは asyncmap を使用するのと同等です。たとえば、pmap(f, c; distributed=false) は asyncmap(f,c; ntasks=()->nworkers()) と同等です。
pmap は batch_size 引数を介してプロセスとタスクの混合を使用することもできます。バッチサイズが 1 より大きい場合、コレクションは複数のバッチで処理され、各バッチの長さは batch_size 以下になります。バッチは、空いているワーカーに対して単一のリクエストとして送信され、ローカルの asyncmap が複数の同時タスクを使用してバッチの要素を処理します。
エラーが発生すると、pmap はコレクションの残りの部分の処理を停止します。この動作をオーバーライドするには、引数 on_error を介してエラーハンドリング関数を指定できます。この関数は単一の引数、すなわち例外を受け取ります。関数はエラーを再スローすることで処理を停止するか、処理を続行するために任意の値を返し、その値は結果とともに呼び出し元に返されます。
次の2つの例を考えてみましょう。最初の例は例外オブジェクトをインラインで返し、2番目は例外の代わりに 0 を返します:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0エラーは、失敗した計算を再試行することで処理することもできます。キーワード引数 retry_delays と retry_check は、retry にキーワード引数 delays と check として渡されます。バッチ処理が指定されている場合、バッチ全体が失敗した場合、バッチ内のすべてのアイテムが再試行されます。
on_error と retry_delays の両方が指定されている場合、再試行の前に on_error フックが呼び出されることに注意してください。on_error が例外をスロー(または再スロー)しない場合、その要素は再試行されません。
例:エラーが発生した場合、要素に対して最大 3 回 f を再試行し、再試行の間に遅延を入れない。
pmap(f, c; retry_delays = zeros(3))例:例外が InexactError 型でない場合にのみ f を再試行し、最大 3 回まで指数的に増加する遅延を設定します。すべての InexactError の発生に対して NaN を返します。
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — TypeRemoteException(captured)リモート計算での例外はキャプチャされ、ローカルで再スローされます。RemoteExceptionは、ワーカーのpidとキャプチャされた例外をラップします。CapturedExceptionは、リモート例外と例外が発生したときのコールスタックのシリアライズ可能な形式をキャプチャします。
Distributed.ProcessExitedException — TypeProcessExitedException(worker_id::Int)クライアントのJuliaプロセスが終了した後、死んだ子プロセスを参照しようとすると、この例外がスローされます。
Distributed.Future — TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Futureは、終了ステータスと時間が不明な単一の計算のプレースホルダーです。複数の潜在的な計算については、RemoteChannelを参照してください。AbstractRemoteRefを識別するにはremoteref_idを参照してください。
Distributed.RemoteChannel — TypeRemoteChannel(pid::Integer=myid())プロセス pid の Channel{Any}(1) への参照を作成します。デフォルトの pid は現在のプロセスです。
RemoteChannel(f::Function, pid::Integer=myid())特定のサイズと型のリモートチャネルへの参照を作成します。f は pid で実行されるときに AbstractChannel の実装を返さなければならない関数です。
例えば、RemoteChannel(()->Channel{Int}(10), pid) は、pid 上の型 Int でサイズ 10 のチャネルへの参照を返します。
デフォルトの pid は現在のプロセスです。
Base.fetch — Methodfetch(x::Future)Future の値を待機して取得します。取得した値はローカルにキャッシュされます。同じ参照に対する fetch のさらなる呼び出しはキャッシュされた値を返します。リモート値が例外である場合、リモート例外とバックトレースをキャプチャする RemoteException をスローします。
Base.fetch — Methodfetch(c::RemoteChannel)RemoteChannel から値を待って取得します。発生する例外は Future と同じです。取得したアイテムは削除されません。
fetch(x::Any)xを返します。
Distributed.remotecall — Methodremotecall(f, id::Integer, args...; kwargs...) -> Future指定されたプロセスで与えられた引数に対して関数 f を非同期に呼び出します。 Future を返します。キーワード引数がある場合、それらは f に渡されます。
Distributed.remotecall_wait — Methodremotecall_wait(f, id::Integer, args...; kwargs...)指定されたワーカーID id の Worker に対して、1つのメッセージでより高速な wait(remotecall(...)) を実行します。キーワード引数がある場合、それらは f に渡されます。
wait および remotecall も参照してください。
Distributed.remotecall_fetch — Methodremotecall_fetch(f, id::Integer, args...; kwargs...)fetch(remotecall(...))を1つのメッセージで実行します。キーワード引数がある場合、それらはfに渡されます。リモート例外はRemoteExceptionにキャッチされ、スローされます。
他にfetchとremotecallも参照してください。
例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrtは負の実引数で呼び出されましたが、複素引数で呼び出された場合にのみ複素結果を返します。sqrt(Complex(x))を試してください。
...Distributed.remotecall_eval — Functionremotecall_eval(m::Module, procs, expression)モジュール m の下で、procs で指定されたプロセス上で式を実行します。いずれかのプロセスでのエラーは CompositeException に収集され、スローされます。
他にも @everywhere を参照してください。
Distributed.remote_do — Methodremote_do(f, id::Integer, args...; kwargs...) -> nothingワーカー id で非同期に f を実行します。 remotecall とは異なり、計算の結果を保存せず、完了を待つ方法もありません。
成功した呼び出しは、リモートノードでの実行のためにリクエストが受け入れられたことを示します。
同じワーカーへの連続した remotecall は、呼び出された順序で直列化されますが、リモートワーカーでの実行順序は不確定です。たとえば、remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) は、f1 への呼び出しを直列化し、その後に f2 と f3 をその順序で実行します。しかし、f1 がワーカー 2 で f3 よりも先に実行されることは保証されていません。
f によってスローされた例外は、リモートワーカーの stderr に印刷されます。
キーワード引数がある場合、それらは f に渡されます。
Base.put! — Methodput!(rr::RemoteChannel, args...)RemoteChannelに値のセットを格納します。チャネルが満杯の場合、スペースが利用可能になるまでブロックします。最初の引数を返します。
Base.put! — Methodput!(rr::Future, v)Future rr に値を格納します。Future は一度だけ書き込むことができるリモート参照です。すでに設定された Future に対して put! を行うと Exception がスローされます。すべての非同期リモート呼び出しは Future を返し、完了時に呼び出しの戻り値に値を設定します。
Base.take! — Methodtake!(rr::RemoteChannel, args...)RemoteChannel rr から値を取得し、その過程で値を削除します。
Base.isready — Methodisready(rr::RemoteChannel, args...)RemoteChannel に値が格納されているかどうかを判断します。この関数はレースコンディションを引き起こす可能性があることに注意してください。結果を受け取る時点で、もはや真でない可能性があります。ただし、Future に対しては安全に使用できます。なぜなら、Future は一度だけ割り当てられるからです。
Base.isready — Methodisready(rr::Future)Future に値が格納されているかどうかを判断します。
引数 Future が別のノードによって所有されている場合、この呼び出しは応答を待つためにブロックします。rr を別のタスクで待機するか、ローカルの Channel をプロキシとして使用することをお勧めします:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # ブロックしないDistributed.AbstractWorkerPool — TypeAbstractWorkerPoolWorkerPool や CachingPool のようなワーカープールのスーパークラスです。AbstractWorkerPool は以下を実装する必要があります:
push!- 新しいワーカーを全体のプールに追加する(利用可能 + ビジー)put!- ワーカーを利用可能なプールに戻すtake!- 利用可能なプールからワーカーを取得する(リモート関数実行に使用)wait- ワーカーが利用可能になるまでブロックするlength- 全体のプールにおける利用可能なワーカーの数isready- プールでのtake!がブロックする場合は false を返し、そうでなければ true を返す
上記のデフォルト実装(AbstractWorkerPool 上)は以下のフィールドを必要とします:
channel::Channel{Int}workers::Set{Int}
ここで channel は自由なワーカーピッドを含み、workers はこのプールに関連付けられたすべてのワーカーのセットです。
Distributed.WorkerPool — TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})ベクターまたはワーカーIDの範囲からWorkerPoolを作成します。
例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — TypeCachingPool(workers::Vector{Int})AbstractWorkerPoolの実装です。 remote, remotecall_fetch, pmap(および関数をリモートで実行する他のリモート呼び出し)は、特にクロージャ(大量のデータをキャプチャする可能性がある)において、ワーカーノード上でシリアライズ/デシリアライズされた関数をキャッシュすることで恩恵を受けます。
リモートキャッシュは、返されたCachingPoolオブジェクトのライフタイムの間維持されます。キャッシュを早期にクリアするには、clear!(pool)を使用します。
グローバル変数については、クロージャ内でキャプチャされるのはバインディングのみで、データはキャプチャされません。グローバルデータをキャプチャするには、letブロックを使用できます。
例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end上記は、fooを各ワーカーに一度だけ転送します。
Distributed.default_worker_pool — Functiondefault_worker_pool()AbstractWorkerPool にアイドル状態の workers が含まれています - remote(f) および pmap で使用されます(デフォルトで)。default_worker_pool!(pool) を介して明示的に設定されない限り、デフォルトのワーカープールは WorkerPool に初期化されます。
例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — Functionclear!(syms, pids=workers(); mod=Main)モジュール内のグローバルバインディングを nothing に初期化することでクリアします。 syms は Symbol 型であるか、Symbol のコレクションである必要があります。 pids と mod は、グローバル変数が再初期化されるプロセスとモジュールを特定します。 mod の下で定義されている名前のみがクリアされます。
グローバル定数をクリアするよう要求された場合、例外が発生します。
clear!(pool::CachingPool) -> poolすべての参加ワーカーからすべてのキャッシュされた関数を削除します。
Distributed.remote — Functionremote([p::AbstractWorkerPool], f) -> Function利用可能なワーカーで関数 f を実行する無名関数を返します(提供された場合は WorkerPool p から取得) remotecall_fetch を使用して。
Distributed.remotecall — Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool のバリアントである remotecall(f, pid, ....)。pool から空いているワーカーを待って取得し、それに対して remotecall を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)この例では、タスクは pid 2 で実行され、pid 1 から呼び出されました。
Distributed.remotecall_wait — Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool の remotecall_wait(f, pid, ....) のバリアント。pool から空いているワーカーを待機して取得し、それに対して remotecall_wait を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> resultWorkerPool の remotecall_fetch(f, pid, ....) のバリアント。pool から空いているワーカーを待って取得し、それに対して remotecall_fetch を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothingWorkerPool の remote_do(f, pid, ....) のバリアントです。pool から空いているワーカーを待って取得し、それに対して remote_do を実行します。
remote_do() の結果が完了するのを待つことはできないため、ワーカーはすぐにプールに戻されます(つまり、過剰なサブスクリプションを引き起こす可能性があります)。
Distributed.@spawn — Macro@spawn expr式の周りにクロージャを作成し、自動的に選択されたプロセスで実行し、結果へのFutureを返します。このマクロは非推奨です; 代わりに@spawnat :any exprを使用するべきです。
例
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3Distributed.@spawnat — Macro@spawnat p expr式の周りにクロージャを作成し、プロセス p で非同期にクロージャを実行します。結果への Future を返します。
p が引用されたリテラルシンボル :any の場合、システムは自動的に使用するプロセッサを選択します。:any を使用すると、負荷分散は適用されないため、負荷分散が必要な場合は WorkerPool と remotecall(f, ::WorkerPool) の使用を検討してください。
例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3Distributed.@fetch — Macro@fetch exprfetch(@spawnat :any expr) と同等です。fetch と @spawnat を参照してください。
例
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — Macro@fetchfromfetch(@spawnat p expr) と同等です。 fetch と @spawnat を参照してください。
例
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — Macro@distributed分散メモリの並列forループの形式:
@distributed [reducer] for var = range
body
end指定された範囲は分割され、すべてのワーカーでローカルに実行されます。オプションのリデューサ関数が指定されている場合、@distributedは各ワーカーでローカルなリデュースを行い、呼び出しプロセスで最終的なリデュースを行います。
リデューサ関数がない場合、@distributedは非同期に実行されます。つまり、すべての利用可能なワーカーで独立したタスクを生成し、完了を待たずにすぐに戻ります。完了を待つには、呼び出しの前に@syncを付けます。例えば:
@sync @distributed for var = range
body
endDistributed.@everywhere — Macro@everywhere [procs()] exprすべての procs で Main の下で式を実行します。プロセスのいずれかでエラーが発生した場合、それらは CompositeException に収集され、スローされます。例えば:
@everywhere bar = 1はすべての現在のプロセスで Main.bar を定義します。後で追加されたプロセス(例えば addprocs() を使用して)には、式は定義されません。
@spawnat とは異なり、@everywhere はローカル変数をキャプチャしません。代わりに、ローカル変数は補間を使用してブロードキャストできます:
foo = 1
@everywhere bar = $fooオプションの引数 procs は、式を実行するプロセスのサブセットを指定することを可能にします。
remotecall_eval(Main, procs, expr) を呼び出すのと似ていますが、2つの追加機能があります:
usingとimportステートメントは、パッケージが事前コンパイルされることを保証するために、呼び出しプロセスで最初に実行されます。includeによって使用される現在のソースファイルパスが他のプロセスに伝播されます。
Distributed.remoteref_id — Functionremoteref_id(r::AbstractRemoteRef) -> RRIDFutures と RemoteChannels は次のフィールドによって識別されます:
where- 参照されているオブジェクト/ストレージが実際に存在するノードを指します。whence- リモート参照が作成されたノードを指します。これは、参照されている実際のオブジェクトが存在するノードとは異なることに注意してください。たとえば、マスタープロセスからRemoteChannel(2)を呼び出すと、whereの値は 2 になり、whenceの値は 1 になります。idは、whenceで指定されたワーカーから作成されたすべての参照の中で一意です。
まとめると、whence と id はすべてのワーカーの中で参照を一意に識別します。
remoteref_id は低レベルの API で、リモート参照の whence と id の値をラップした RRID オブジェクトを返します。
Distributed.channel_from_id — Functionchannel_from_id(id) -> cremoteref_id[@ref]によって返されたidのバックエンドAbstractChannelを返す低レベルAPIです。この呼び出しは、バックエンドチャネルが存在するノードでのみ有効です。
Distributed.worker_id_from_socket — Functionworker_id_from_socket(s) -> pidIO接続またはWorkerを与えると、それに接続されているワーカーのpidを返す低レベルAPIです。これは、受信プロセスIDに応じて書き出されるデータを最適化するために、型のカスタムserializeメソッドを書く際に便利です。
Distributed.cluster_cookie — Methodcluster_cookie() -> cookieクラスタクッキーを返します。
Distributed.cluster_cookie — Methodcluster_cookie(cookie) -> cookie渡されたクッキーをクラスタークッキーとして設定し、それを返します。
Cluster Manager Interface
このインターフェースは、異なるクラスタ環境でJuliaワーカーを起動および管理するためのメカニズムを提供します。Baseには2種類のマネージャーが存在します:同じホスト上で追加のワーカーを起動するためのLocalManagerと、sshを介してリモートホスト上で起動するためのSSHManagerです。TCP/IPソケットは、プロセス間で接続し、メッセージを転送するために使用されます。クラスタマネージャーが異なるトランスポートを提供することも可能です。
Distributed.ClusterManager — TypeClusterManagerクラスタのワーカープロセスを制御するクラスタマネージャーのスーパークラスです。クラスタマネージャーは、ワーカーを追加、削除、および通信する方法を実装します。SSHManager と LocalManager はこのサブタイプです。
Distributed.WorkerConfig — TypeWorkerConfigClusterManagerによって使用されるタイプで、クラスターに追加されたワーカーを制御します。いくつかのフィールドは、すべてのクラスター管理者によってホストにアクセスするために使用されます:
io– ワーカーにアクセスするために使用される接続(IOのサブタイプまたはNothing)host– ホストアドレス(StringまたはNothing)port– ワーカーに接続するためにホスト上で使用されるポート(IntまたはNothing)
いくつかは、クラスター管理者がすでに初期化されたホストにワーカーを追加するために使用されます:
count– ホスト上で起動するワーカーの数exename– ホスト上のJulia実行可能ファイルへのパス、デフォルトは"$(Sys.BINDIR)/julia"または"$(Sys.BINDIR)/julia-debug"exeflags– Juliaをリモートで起動する際に使用するフラグ
userdataフィールドは、外部管理者によって各ワーカーの情報を保存するために使用されます。
いくつかのフィールドはSSHManagerや類似の管理者によって使用されます:
tunnel–true(トンネリングを使用)、false(トンネリングを使用しない)、またはnothing(管理者のデフォルトを使用)multiplex–true(トンネリングのためにSSHマルチプレクシングを使用)またはfalseforward– sshの-Lオプションに使用される転送オプションbind_addr– リモートホストでバインドするアドレスsshflags– SSH接続を確立する際に使用するフラグmax_parallel– ホスト上で並行して接続する最大ワーカー数
いくつかのフィールドはLocalManagerとSSHManagerの両方によって使用されます:
connect_at– これはワーカー間の接続かドライバーからワーカーへのセットアップコールかを決定しますprocess– 接続されるプロセス(通常、管理者はaddprocs中にこれを割り当てます)ospid– ホストOSに従ったプロセスID、ワーカープロセスを中断するために使用environ– Local/SSH管理者によって一時的な情報を保存するために使用されるプライベート辞書ident–ClusterManagerによって識別されたワーカーconnect_idents– カスタムトポロジーを使用する場合にワーカーが接続しなければならないワーカーIDのリストenable_threaded_blas–true、false、またはnothing、ワーカーでスレッド化されたBLASを使用するかどうか
Distributed.launch — Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)クラスターマネージャによって実装されます。この関数によって起動されたすべてのJuliaワーカーについて、launchedにWorkerConfigエントリを追加し、launch_ntfyに通知する必要があります。この関数は、managerによって要求されたすべてのワーカーが起動されると終了しなければなりません。paramsは、addprocsが呼び出されたすべてのキーワード引数の辞書です。
Distributed.manage — Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)クラスターマネージャーによって実装されます。これは、ワーカーのライフタイム中にマスタープロセスで呼び出され、適切な op 値が使用されます:
- ワーカーがJuliaワーカープールに追加/削除されるときに
:register/:deregisterとともに。 interrupt(workers)が呼び出されたときに:interruptとともに。ClusterManagerは適切なワーカーに割り込み信号を送信する必要があります。- クリーンアップ目的で
:finalizeとともに。
Base.kill — Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)クラスターマネージャによって実装されます。これは、rmprocsによってマスタープロセスで呼び出されます。pidで指定されたリモートワーカーを終了させるべきです。kill(manager::ClusterManager.....)はpidでリモートのexit()を実行します。
Sockets.connect — Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)クラスターマネージャーによってカスタムトランスポートを使用して実装されます。これは、configによって指定されたpidのワーカーとの論理的な接続を確立し、IOオブジェクトのペアを返す必要があります。pidから現在のプロセスへのメッセージはinstrmから読み取られ、pidに送信されるメッセージはoutstrmに書き込まれます。カスタムトランスポートの実装は、メッセージが完全にかつ順序通りに配信され、受信されることを保証しなければなりません。connect(manager::ClusterManager.....)は、ワーカー間のTCP/IPソケット接続を設定します。
Distributed.init_worker — Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())カスタムトランスポートを実装するクラスターマネージャによって呼び出されます。新しく起動されたプロセスをワーカーとして初期化します。コマンドライン引数 --worker[=<cookie>] は、TCP/IPソケットを使用してプロセスをワーカーとして初期化する効果があります。cookie は cluster_cookie です。
Distributed.start_worker — Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker は、TCP/IP 経由で接続するワーカープロセスのデフォルトのエントリポイントである内部関数です。プロセスを Julia クラスターのワーカーとして設定します。
ホスト:ポート情報はストリーム out に書き込まれます(デフォルトは stdout です)。
この関数は、必要に応じて stdin からクッキーを読み取り、空いているポートでリッスンします(または指定された場合は --bind-to コマンドラインオプションのポート)し、受信した TCP 接続とリクエストを処理するタスクをスケジュールします。また、(オプションで)stdin を閉じ、stderr を stdout にリダイレクトします。
戻り値はありません。
Distributed.process_messages — Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)クラスター管理者によってカスタムトランスポートを使用して呼び出されます。リモートワーカーから最初のメッセージを受信したときに呼び出す必要があります。カスタムトランスポートはリモートワーカーへの論理接続を管理し、受信メッセージ用とリモートワーカー宛のメッセージ用の2つのIOオブジェクトを提供する必要があります。incomingがtrueの場合、リモートピアが接続を開始しました。接続を開始したペアのいずれかがクラスタークッキーとそのJuliaバージョン番号を送信して認証ハンドシェイクを行います。
cluster_cookieも参照してください。
Distributed.default_addprocs_params — Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}クラスターマネージャによって実装されます。addprocs(mgr)を呼び出すときに渡されるデフォルトのキーワードパラメータ。最小限のオプションセットはdefault_addprocs_params()を呼び出すことで利用可能です。