Distributed Computing
Distributed
— Module分散並列処理のためのツール。
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> プロセス識別子のリスト
指定されたクラスターマネージャーを介してワーカープロセスを起動します。
例えば、Beowulfクラスタは、パッケージClusterManagers.jl
に実装されたカスタムクラスターマネージャーを介してサポートされています。
新しく起動されたワーカーがマスターからの接続確立を待つ秒数は、ワーカープロセスの環境内の変数JULIA_WORKER_TIMEOUT
を介して指定できます。TCP/IPをトランスポートとして使用する場合にのみ関連します。
REPLをブロックせずにワーカーを起動するには、またはプログラム的にワーカーを起動する場合は、addprocs
を独自のタスクで実行します。
例
# 忙しいクラスタでは、`addprocs`を非同期に呼び出す
t = @async addprocs(...)
# ワーカーがオンラインになるときに利用する
if nprocs() > 1 # 少なくとも1つの新しいワーカーが利用可能であることを確認
.... # 分散実行を行う
end
# 新しく起動されたワーカーIDまたはエラーメッセージを取得する
if istaskdone(t) # `addprocs`が完了したか確認し、`fetch`がブロックしないようにする
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> プロセス識別子のリスト
リモートマシン上にSSHを介してワーカープロセスを追加します。構成はキーワード引数で行います(下記参照)。特に、exename
キーワードを使用して、リモートマシン上のjulia
バイナリへのパスを指定できます。
machines
は、[user@]host[:port] [bind_addr[:port]]
の形式の文字列として与えられる「マシンスペック」のベクターです。user
は現在のユーザーにデフォルト設定され、port
は標準SSHポートに設定されます。[bind_addr[:port]]
が指定されている場合、他のワーカーは指定されたbind_addr
とport
でこのワーカーに接続します。
machines
ベクター内でタプルを使用するか、(machine_spec, count)
の形式を使用することで、リモートホスト上で複数のプロセスを起動することが可能です。ここで、count
は指定されたホスト上で起動するワーカーの数です。ワーカー数に:auto
を渡すと、リモートホスト上のCPUスレッドの数と同じ数のワーカーが起動します。
例:
addprocs([
"remote1", # 現在のユーザー名でログインして'remote1'に1つのワーカー
"user@remote2", # 'user'ユーザー名でログインして'remote2'に1つのワーカー
"user@remote3:2222", # 'remote3'のSSHポートを'2222'に指定
("user@remote4", 4), # 'remote4'に4つのワーカーを起動
("user@remote5", :auto), # 'remote5'のCPUスレッド数と同じ数のワーカーを起動
])
キーワード引数:
tunnel
:true
の場合、マスタープロセスからワーカーに接続するためにSSHトンネリングが使用されます。デフォルトはfalse
です。multiplex
:true
の場合、SSHトンネリングにSSHマルチプレクシングが使用されます。デフォルトはfalse
です。ssh
: ワーカーを起動するために使用されるSSHクライアント実行可能ファイルの名前またはパス。デフォルトは"ssh"
です。sshflags
: 追加のsshオプションを指定します。例:sshflags=`-i /home/foo/bar.pem`
max_parallel
: ホストに並行して接続する最大ワーカー数を指定します。デフォルトは10です。shell
: ワーカー上でsshが接続するシェルの種類を指定します。shell=:posix
: POSIX互換のUnix/Linuxシェル(sh、ksh、bash、dash、zshなど)。デフォルト。shell=:csh
: Unix Cシェル(csh、tcsh)。shell=:wincmd
: Microsoft Windowscmd.exe
。
dir
: ワーカー上の作業ディレクトリを指定します。デフォルトはホストの現在のディレクトリ(pwd()
で見つけたもの)です。enable_threaded_blas
:true
の場合、追加されたプロセスでBLASが複数のスレッドで実行されます。デフォルトはfalse
です。exename
:julia
実行可能ファイルの名前。デフォルトは"$(Sys.BINDIR)/julia"
または"$(Sys.BINDIR)/julia-debug"
です。すべてのリモートマシンで共通のJuliaバージョンを使用することが推奨されます。そうしないと、シリアル化やコード配布が失敗する可能性があります。exeflags
: ワーカープロセスに渡される追加のフラグ。topology
: ワーカーが互いに接続する方法を指定します。接続されていないワーカー間でメッセージを送信するとエラーが発生します。topology=:all_to_all
: すべてのプロセスが互いに接続されています。デフォルト。topology=:master_worker
: ドライバープロセス、すなわちpid
1のみがワーカーに接続します。ワーカー同士は接続しません。topology=:custom
: クラスターマネージャのlaunch
メソッドがWorkerConfig
内のident
およびconnect_idents
フィールドを介して接続トポロジーを指定します。クラスターマネージャのIDident
を持つワーカーは、connect_idents
で指定されたすべてのワーカーに接続します。
lazy
:topology=:all_to_all
の場合のみ適用されます。true
の場合、ワーカー間の接続は遅延して設定されます。すなわち、ワーカー間のリモート呼び出しの最初のインスタンスで設定されます。デフォルトはtrue
です。env
:env=["JULIA_DEPOT_PATH"=>"/depot"]
のような文字列ペアの配列を提供して、リモートマシン上で環境変数が設定されるように要求します。デフォルトでは、環境変数JULIA_WORKER_TIMEOUT
のみが自動的にローカルからリモート環境に渡されます。cmdline_cookie
:--worker
コマンドラインオプションを介して認証クッキーを渡します。クッキーをssh stdioを介して渡す(より安全な)デフォルトの動作は、古い(ConPTY以前の)JuliaまたはWindowsバージョンを使用するWindowsワーカーでハングする可能性があるため、その場合はcmdline_cookie=true
がワークアラウンドを提供します。
キーワード引数ssh
、shell
、env
およびcmdline_cookie
はJulia 1.6で追加されました。
環境変数:
マスタープロセスが新しく起動したワーカーとの接続を60.0秒以内に確立できない場合、ワーカーはそれを致命的な状況と見なし、終了します。このタイムアウトは環境変数JULIA_WORKER_TIMEOUT
を介して制御できます。マスタープロセス上のJULIA_WORKER_TIMEOUT
の値は、新しく起動したワーカーが接続確立を待つ秒数を指定します。
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> プロセス識別子のリスト
ローカルホストで np
ワーカーを起動し、内蔵の LocalManager
を使用します。
ローカルワーカーは、メインプロセスから現在のパッケージ環境(すなわち、アクティブプロジェクト、LOAD_PATH
、および DEPOT_PATH
)を継承します。
ワーカーは ~/.julia/config/startup.jl
スタートアップスクリプトを実行せず、他の実行中のプロセスとグローバル状態(コマンドラインスイッチ、グローバル変数、新しいメソッド定義、読み込まれたモジュールなど)を同期しないことに注意してください。
キーワード引数:
restrict::Bool
:true
(デフォルト)の場合、バインディングは127.0.0.1
に制限されます。dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
:SSHManager
と同様の効果があります。addprocs(machines::AbstractVector)
のドキュメントを参照してください。
パッケージ環境の継承と env
キーワード引数は、Julia 1.9 で追加されました。
Distributed.nprocs
— Functionnprocs()
利用可能なプロセスの数を取得します。
例
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
利用可能なワーカープロセスの数を取得します。これはnprocs()
より1少ないです。nprocs() == 1
の場合はnprocs()
と等しくなります。
例
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
プロセス識別子のリストを返します。pid 1(workers()
には含まれていない)を含みます。
例
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
同じ物理ノード上のすべてのプロセス識別子のリストを返します。具体的には、pid
と同じ IP アドレスにバインドされているすべてのワーカーが返されます。
Distributed.workers
— Functionworkers()
すべてのワーカープロセス識別子のリストを返します。
例
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
指定されたワーカーを削除します。ワーカーの追加または削除はプロセス1のみが行えます。
引数 waitfor
は、ワーカーがシャットダウンするまでの待機時間を指定します:
- 指定しない場合、
rmprocs
は要求されたすべてのpids
が削除されるまで待機します。 - 要求された
waitfor
秒の前にすべてのワーカーを終了できない場合、ErrorException
が発生します。 waitfor
の値が0の場合、呼び出しは即座に戻り、ワーカーは別のタスクで削除される予定です。スケジュールされたTask
オブジェクトが返されます。ユーザーは、他の並列呼び出しを行う前にタスクに対してwait
を呼び出す必要があります。
例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
指定されたワーカー上で現在実行中のタスクを中断します。これは、ローカルマシンでCtrl-Cを押すことに相当します。引数が指定されていない場合、すべてのワーカーが中断されます。
interrupt(pids::AbstractVector=workers())
指定されたワーカー上で現在実行中のタスクを中断します。これは、ローカルマシンでCtrl-Cを押すのと同等です。引数が指定されていない場合、すべてのワーカーが中断されます。
Distributed.myid
— Functionmyid()
現在のプロセスのIDを取得します。
例
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
コレクション c
を変換し、利用可能なワーカーとタスクを使用して各要素に f
を適用します。
複数のコレクション引数がある場合、要素ごとに f
を適用します。
f
はすべてのワーカープロセスで利用可能でなければなりません。詳細については、コードの可用性とパッケージの読み込みを参照してください。
ワーカープールが指定されていない場合、すべての利用可能なワーカーが CachingPool
を介して使用されます。
デフォルトでは、pmap
は指定されたすべてのワーカーに計算を分散します。ローカルプロセスのみを使用し、タスクに分散させるには、distributed=false
を指定します。これは asyncmap
を使用するのと同等です。例えば、pmap(f, c; distributed=false)
は asyncmap(f,c; ntasks=()->nworkers())
と同等です。
pmap
は batch_size
引数を介してプロセスとタスクの混合を使用することもできます。バッチサイズが1より大きい場合、コレクションは複数のバッチで処理され、それぞれの長さは batch_size
以下です。バッチは、空いているワーカーに対して単一のリクエストとして送信され、ローカルの asyncmap
が複数の同時タスクを使用してバッチの要素を処理します。
エラーが発生すると、pmap
はコレクションの残りの部分の処理を停止します。この動作をオーバーライドするには、引数 on_error
を介してエラーハンドリング関数を指定できます。この関数は単一の引数、すなわち例外を受け取ります。関数はエラーを再スローすることで処理を停止することができます。また、処理を続行するために、呼び出し元に結果とインラインで返される任意の値を返すことができます。
次の2つの例を考えてみましょう。最初の例は例外オブジェクトをインラインで返し、2番目の例は例外の代わりに0を返します:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
エラーは、失敗した計算を再試行することで処理することもできます。キーワード引数 retry_delays
と retry_check
は、それぞれキーワード引数 delays
と check
として retry
に渡されます。バッチ処理が指定されている場合、バッチ全体が失敗した場合、バッチ内のすべてのアイテムが再試行されます。
on_error
と retry_delays
の両方が指定されている場合、再試行の前に on_error
フックが呼び出されることに注意してください。on_error
が例外をスロー(または再スロー)しない場合、その要素は再試行されません。
例:エラーが発生した場合、要素に対して最大3回 f
を再試行し、再試行の間に遅延を設けない。
pmap(f, c; retry_delays = zeros(3))
例:例外が InexactError
型でない場合にのみ f
を再試行し、最大3回まで指数的に増加する遅延を設ける。すべての InexactError
の発生に対して NaN
を返す。
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
リモート計算での例外はキャプチャされ、ローカルで再スローされます。RemoteException
は、ワーカーのpid
とキャプチャされた例外をラップします。CapturedException
は、リモート例外と例外が発生したときのコールスタックのシリアライズ可能な形式をキャプチャします。
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
クライアントのJuliaプロセスが終了した後、死んだ子プロセスを参照しようとすると、この例外がスローされます。
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
は、終了ステータスと時間が不明な単一の計算のプレースホルダーです。複数の潜在的な計算については、RemoteChannel
を参照してください。AbstractRemoteRef
を識別するにはremoteref_id
を参照してください。
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
プロセス pid
の Channel{Any}(1)
への参照を作成します。デフォルトの pid
は現在のプロセスです。
RemoteChannel(f::Function, pid::Integer=myid())
特定のサイズと型のリモートチャネルへの参照を作成します。f
は pid
で実行されるときに AbstractChannel
の実装を返さなければならない関数です。
例えば、RemoteChannel(()->Channel{Int}(10), pid)
は、pid
上の型 Int
でサイズ 10 のチャネルへの参照を返します。
デフォルトの pid
は現在のプロセスです。
Base.fetch
— Methodfetch(x::Future)
Future
の値を待機して取得します。取得した値はローカルにキャッシュされます。同じ参照に対するさらなる fetch
の呼び出しはキャッシュされた値を返します。リモート値が例外である場合、リモート例外とバックトレースをキャプチャする RemoteException
をスローします。
Base.fetch
— Methodfetch(c::RemoteChannel)
RemoteChannel
から値を待って取得します。発生する例外は Future
と同じです。取得したアイテムは削除されません。
fetch(x::Any)
x
を返します。
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
指定されたプロセスで与えられた引数に対して関数 f
を非同期に呼び出します。 Future
を返します。キーワード引数がある場合、それらは f
に渡されます。
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
指定されたワーカーID id
の Worker
に対して、1つのメッセージでより高速な wait(remotecall(...))
を実行します。キーワード引数がある場合は、f
に渡されます。
wait
および remotecall
も参照してください。
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
fetch(remotecall(...))
を1つのメッセージで実行します。キーワード引数がある場合、それらはf
に渡されます。リモート例外はRemoteException
にキャッチされ、スローされます。
他にfetch
とremotecall
も参照してください。
例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrtは負の実引数で呼び出されましたが、複素引数で呼び出された場合にのみ複素結果を返します。sqrt(Complex(x))を試してください。
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
ワーカー id
で非同期に f
を実行します。 remotecall
とは異なり、計算の結果を保存せず、完了を待つ方法もありません。
成功した呼び出しは、リモートノードでの実行のためにリクエストが受け入れられたことを示します。
同じワーカーへの連続した remotecall
は、呼び出された順序で直列化されますが、リモートワーカーでの実行の順序は不確定です。たとえば、remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
は、f1
への呼び出しを直列化し、その後に f2
と f3
をその順序で続けます。しかし、f1
がワーカー 2 で f3
よりも先に実行されることは保証されていません。
f
によってスローされた例外は、リモートワーカーの stderr
に印刷されます。
キーワード引数がある場合、それらは f
に渡されます。
Base.put!
— Methodput!(rr::RemoteChannel, args...)
RemoteChannel
に値のセットを格納します。チャネルが満杯の場合、空きが出るまでブロックします。最初の引数を返します。
Base.put!
— Methodput!(rr::Future, v)
値をFuture
rr
に格納します。Future
は一度だけ書き込むことができるリモート参照です。すでに設定されたFuture
に対してput!
を実行すると、Exception
がスローされます。すべての非同期リモート呼び出しはFuture
を返し、完了時に呼び出しの戻り値に値を設定します。
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
RemoteChannel
rr
から値を取得し、プロセスの中で値を削除します。
Base.isready
— Methodisready(rr::RemoteChannel, args...)
RemoteChannel
に値が格納されているかどうかを判断します。この関数はレースコンディションを引き起こす可能性があることに注意してください。結果を受け取る時点で、もはや真でない可能性があります。しかし、Future
に対しては一度だけ割り当てられるため、安全に使用できます。
Base.isready
— Methodisready(rr::Future)
Future
に値が格納されているかどうかを判断します。
引数 Future
が別のノードによって所有されている場合、この呼び出しは回答を待つためにブロックします。rr
を別のタスクで待機するか、ローカルの Channel
をプロキシとして使用することをお勧めします:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # ブロックしない
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
WorkerPool
やCachingPool
などのワーカープールのスーパタイプです。AbstractWorkerPool
は以下を実装する必要があります:
push!
- 新しいワーカーを全体のプールに追加する(利用可能 + ビジー)put!
- ワーカーを利用可能なプールに戻すtake!
- 利用可能なプールからワーカーを取得する(リモート関数実行に使用するため)length
- 全体のプールに利用可能なワーカーの数isready
- プールでtake!
がブロックされる場合はfalseを返し、そうでなければtrueを返す
上記のデフォルト実装(AbstractWorkerPool
上)は以下のフィールドを必要とします:
channel::Channel{Int}
workers::Set{Int}
ここで、channel
は自由なワーカーピッドを含み、workers
はこのプールに関連付けられたすべてのワーカーのセットです。
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
ベクターまたはワーカーIDの範囲から WorkerPool
を作成します。
例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
AbstractWorkerPool
の実装です。 remote
, remotecall_fetch
, pmap
(および関数をリモートで実行する他のリモート呼び出し)は、特にクロージャ(大量のデータをキャプチャする可能性がある)において、ワーカーノード上でシリアライズ/デシリアライズされた関数をキャッシュすることで恩恵を受けます。
リモートキャッシュは、返されたCachingPool
オブジェクトのライフタイムの間維持されます。キャッシュを早期にクリアするには、clear!(pool)
を使用します。
グローバル変数については、データではなくバインディングのみがクロージャにキャプチャされます。グローバルデータをキャプチャするにはlet
ブロックを使用できます。
例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
上記は、foo
を各ワーカーに一度だけ転送します。
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
はアイドル状態の workers
を含んでおり、remote(f)
および pmap
(デフォルトで)によって使用されます。default_worker_pool!(pool)
を介して明示的に設定されない限り、デフォルトのワーカープールは WorkerPool
に初期化されます。
例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
モジュール内のグローバルバインディングを nothing
に初期化することでクリアします。 syms
は Symbol
型であるか、Symbol
のコレクションである必要があります。 pids
と mod
は、グローバル変数を再初期化するプロセスとモジュールを特定します。 mod
の下で定義されていると見なされる名前のみがクリアされます。
グローバル定数をクリアしようとすると例外が発生します。
clear!(pool::CachingPool) -> pool
すべての参加ワーカーからすべてのキャッシュされた関数を削除します。
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
利用可能なワーカーで関数 f
を実行する無名関数を返します(提供されている場合は WorkerPool
p
から取得) remotecall_fetch
を使用して。
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
の remotecall(f, pid, ....)
のバリアントです。pool
から空いているワーカーを待って取得し、それに対して remotecall
を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
この例では、タスクは pid 2 で実行され、pid 1 から呼び出されました。
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
の remotecall_wait(f, pid, ....)
のバリアント。pool
から空いているワーカーを待機して取得し、それに対して remotecall_wait
を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
の remotecall_fetch(f, pid, ....)
のバリアントです。pool
から空いているワーカーを待って取得し、それに対して remotecall_fetch
を実行します。
例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
のバリアントである remote_do(f, pid, ....)
。pool
から空いているワーカーを待機して取得し、それに対して remote_do
を実行します。
Distributed.@spawn
— Macro@spawn expr
式の周りにクロージャを作成し、自動的に選択されたプロセスで実行し、結果へのFuture
を返します。このマクロは非推奨です; 代わりに@spawnat :any expr
を使用するべきです。
例
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Julia 1.3以降、このマクロは非推奨です。代わりに@spawnat :any
を使用してください。
Distributed.@spawnat
— Macro@spawnat p expr
式の周りにクロージャを作成し、プロセス p
で非同期にクロージャを実行します。結果への Future
を返します。p
が引用されたリテラルシンボル :any
の場合、システムは自動的に使用するプロセッサを選択します。
例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
:any
引数は Julia 1.3 以降で利用可能です。
Distributed.@fetch
— Macro@fetch expr
fetch(@spawnat :any expr)
と同等です。 fetch
と @spawnat
を参照してください。
例
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
fetch(@spawnat p expr)
と同等です。 fetch
と @spawnat
を参照してください。
例
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
分散メモリの並列forループの形式:
@distributed [reducer] for var = range
body
end
指定された範囲は分割され、すべてのワーカーでローカルに実行されます。オプションのリデューサ関数が指定されている場合、@distributed
は各ワーカーでローカルなリデュースを行い、呼び出しプロセスで最終的なリデュースを行います。
リデューサ関数がない場合、@distributed
は非同期に実行されることに注意してください。つまり、すべての利用可能なワーカーで独立したタスクを生成し、完了を待たずにすぐに戻ります。完了を待つには、呼び出しの前に@sync
を付けます。例えば:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
すべての procs
で Main
の下で式を実行します。プロセスのいずれかでエラーが発生した場合、それらは CompositeException
に収集され、スローされます。例えば:
@everywhere bar = 1
は、すべての現在のプロセスで Main.bar
を定義します。後で追加されたプロセス(例えば addprocs()
を使用して)は、式が定義されません。
@spawnat
とは異なり、@everywhere
はローカル変数をキャプチャしません。代わりに、ローカル変数は補間を使用してブロードキャストできます:
foo = 1
@everywhere bar = $foo
オプションの引数 procs
は、式を実行するプロセスのサブセットを指定することを可能にします。
remotecall_eval(Main, procs, expr)
を呼び出すのと似ていますが、2つの追加機能があります:
- `using` と `import` ステートメントは、最初に呼び出しプロセスで実行され、パッケージが事前コンパイルされることを保証します。
- `include` によって使用される現在のソースファイルパスが他のプロセスに伝播されます。
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s と RemoteChannel
s は次のフィールドによって識別されます:
where
- 参照によって指される基盤となるオブジェクト/ストレージが実際に存在するノードを指します。whence
- リモート参照が作成されたノードを指します。これは、実際に存在する基盤となるオブジェクトが指されているノードとは異なることに注意してください。たとえば、マスタープロセスからRemoteChannel(2)
を呼び出すと、where
の値は 2 になり、whence
の値は 1 になります。id
は、whence
で指定されたワーカーから作成されたすべての参照の中で一意です。
これらを合わせると、whence
と id
はすべてのワーカーにわたって参照を一意に識別します。
remoteref_id
は低レベルの API で、リモート参照の whence
と id
の値をラップした RRID
オブジェクトを返します。
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
remoteref_id
[@ref]によって返されたid
のバックエンドAbstractChannel
を返す低レベルAPIです。この呼び出しは、バックエンドチャネルが存在するノードでのみ有効です。
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
IO
接続またはWorker
を与えると、それに接続されているワーカーのpid
を返す低レベルAPIです。これは、受信プロセスIDに応じて書き出されるデータを最適化するために、型のカスタムserialize
メソッドを書く際に便利です。
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
クラスタクッキーを返します。
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
渡されたクッキーをクラスタークッキーとして設定し、それを返します。
Cluster Manager Interface
このインターフェースは、異なるクラスタ環境でJuliaワーカーを起動および管理するためのメカニズムを提供します。Baseには2種類のマネージャーが存在します:同じホスト上で追加のワーカーを起動するためのLocalManager
と、ssh
を介してリモートホストで起動するためのSSHManager
です。TCP/IPソケットは、プロセス間で接続し、メッセージを転送するために使用されます。クラスタマネージャーが異なるトランスポートを提供することも可能です。
Distributed.ClusterManager
— TypeClusterManager
クラスターを制御するワーカープロセスのためのクラスのスーパークラス。クラスター管理者は、ワーカーを追加、削除、通信する方法を実装します。SSHManager
と LocalManager
はこのサブタイプです。
Distributed.WorkerConfig
— TypeWorkerConfig
ClusterManager
によって使用されるタイプで、クラスターに追加されたワーカーを制御します。いくつかのフィールドは、すべてのクラスター管理者がホストにアクセスするために使用します:
io
– ワーカーにアクセスするために使用される接続(IO
のサブタイプまたはNothing
)host
– ホストアドレス(String
またはNothing
)port
– ワーカーに接続するためにホスト上で使用されるポート(Int
またはNothing
)
いくつかは、クラスター管理者がすでに初期化されたホストにワーカーを追加するために使用されます:
count
– ホスト上で起動されるワーカーの数exename
– ホスト上のJulia実行可能ファイルへのパス、デフォルトは"$(Sys.BINDIR)/julia"
または"$(Sys.BINDIR)/julia-debug"
exeflags
– Juliaをリモートで起動する際に使用するフラグ
userdata
フィールドは、外部管理者によって各ワーカーの情報を保存するために使用されます。
いくつかのフィールドはSSHManager
や類似の管理者によって使用されます:
tunnel
–true
(トンネリングを使用)、false
(トンネリングを使用しない)、またはnothing
(管理者のデフォルトを使用)multiplex
–true
(トンネリングのためにSSHマルチプレクシングを使用)またはfalse
forward
– sshの-L
オプションに使用される転送オプションbind_addr
– リモートホストでバインドするアドレスsshflags
– SSH接続を確立する際に使用するフラグmax_parallel
– ホスト上で並行して接続する最大ワーカー数
いくつかのフィールドはLocalManager
とSSHManager
の両方によって使用されます:
connect_at
– これはワーカー間接続かドライバーからワーカーへのセットアップコールかを決定しますprocess
– 接続されるプロセス(通常、管理者はaddprocs
中にこれを割り当てます)ospid
– ホストOSに従ったプロセスID、ワーカープロセスを中断するために使用environ
– Local/SSH管理者によって一時的な情報を保存するために使用されるプライベート辞書ident
–ClusterManager
によって識別されたワーカーconnect_idents
– カスタムトポロジーを使用する場合にワーカーが接続しなければならないワーカーIDのリストenable_threaded_blas
–true
、false
、またはnothing
、ワーカーでスレッド化されたBLASを使用するかどうか
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
クラスターマネージャによって実装されます。この関数によって起動されたすべてのJuliaワーカーについて、launched
にWorkerConfig
エントリを追加し、launch_ntfy
に通知する必要があります。この関数は、manager
によって要求されたすべてのワーカーが起動された時点で終了しなければなりません。params
は、addprocs
が呼び出されたすべてのキーワード引数の辞書です。
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
クラスターマネージャーによって実装されます。これは、ワーカーのライフタイム中にマスタープロセスで呼び出され、適切な op
値が使用されます:
- ワーカーがJuliaワーカープールに追加/削除されるときに
:register
/:deregister
とともに。 interrupt(workers)
が呼び出されたときに:interrupt
とともに。ClusterManager
は適切なワーカーに割り込み信号を送信する必要があります。- クリーンアップ目的で
:finalize
とともに。
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
クラスターマネージャによって実装されます。これは、rmprocs
によってマスタープロセスで呼び出されます。これは、pid
で指定されたリモートワーカーを終了させるべきです。kill(manager::ClusterManager.....)
は、pid
でリモートのexit()
を実行します。
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
クラスターマネージャーによってカスタムトランスポートを使用して実装されます。これは、config
で指定されたpid
のワーカーとの論理的な接続を確立し、IO
オブジェクトのペアを返す必要があります。pid
から現在のプロセスへのメッセージはinstrm
から読み取られ、pid
に送信されるメッセージはoutstrm
に書き込まれます。カスタムトランスポートの実装は、メッセージが完全にかつ順序通りに配信され、受信されることを保証しなければなりません。connect(manager::ClusterManager.....)
は、ワーカー間のTCP/IPソケット接続を設定します。
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
カスタムトランスポートを実装するクラスターマネージャーによって呼び出されます。新しく起動されたプロセスをワーカーとして初期化します。コマンドライン引数 --worker[=<cookie>]
は、TCP/IPソケットを使用してプロセスをワーカーとして初期化する効果があります。cookie
は cluster_cookie
です。
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
は、TCP/IP 経由で接続するワーカープロセスのデフォルトのエントリポイントである内部関数です。プロセスを Julia クラスターのワーカーとして設定します。
ホスト:ポート情報はストリーム out
に書き込まれます(デフォルトは stdout です)。
この関数は、必要に応じて stdin からクッキーを読み取り、空いているポートでリッスンします(または指定された場合は --bind-to
コマンドラインオプションのポート)し、受信した TCP 接続とリクエストを処理するタスクをスケジュールします。また、(オプションで)stdin を閉じ、stderr を stdout にリダイレクトします。
戻り値はありません。
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
クラスター管理者によってカスタムトランスポートを使用して呼び出されます。これは、カスタムトランスポートの実装がリモートワーカーからの最初のメッセージを受信したときに呼び出されるべきです。カスタムトランスポートは、リモートワーカーへの論理接続を管理し、受信メッセージ用の IO
オブジェクトとリモートワーカー宛てのメッセージ用の IO
オブジェクトの2つを提供する必要があります。incoming
が true
の場合、リモートピアが接続を開始しました。接続を開始したペアのいずれかが、クラスタークッキーとそのJuliaバージョン番号を送信して認証ハンドシェイクを行います。
詳細は cluster_cookie
を参照してください。
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
クラスターマネージャによって実装されます。addprocs(mgr)
を呼び出すときに渡されるデフォルトのキーワードパラメータです。最小限のオプションセットはdefault_addprocs_params()
を呼び出すことで利用可能です。