Distributed Computing

Distributed.addprocsFunction
addprocs(manager::ClusterManager; kwargs...) -> プロセス識別子のリスト

指定されたクラスターマネージャーを介してワーカープロセスを起動します。

例えば、Beowulfクラスタは、パッケージClusterManagers.jlに実装されたカスタムクラスターマネージャーを介してサポートされています。

新しく起動されたワーカーがマスターからの接続確立を待つ秒数は、ワーカープロセスの環境内の変数JULIA_WORKER_TIMEOUTを介して指定できます。TCP/IPをトランスポートとして使用する場合にのみ関連します。

REPLをブロックせずにワーカーを起動するには、またはプログラム的にワーカーを起動する場合は、addprocsを独自のタスクで実行します。

# 忙しいクラスタでは、`addprocs`を非同期に呼び出す
t = @async addprocs(...)
# ワーカーがオンラインになるときに利用する
if nprocs() > 1   # 少なくとも1つの新しいワーカーが利用可能であることを確認
   ....   # 分散実行を行う
end
# 新しく起動されたワーカーIDまたはエラーメッセージを取得する
if istaskdone(t)   # `addprocs`が完了したか確認し、`fetch`がブロックしないようにする
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> プロセス識別子のリスト

リモートマシン上にSSHを介してワーカープロセスを追加します。構成はキーワード引数で行います(下記参照)。特に、exenameキーワードを使用して、リモートマシン上のjuliaバイナリへのパスを指定できます。

machinesは、[user@]host[:port] [bind_addr[:port]]の形式の文字列として与えられる「マシンスペック」のベクターです。userは現在のユーザーにデフォルト設定され、portは標準SSHポートに設定されます。[bind_addr[:port]]が指定されている場合、他のワーカーは指定されたbind_addrportでこのワーカーに接続します。

machinesベクター内でタプルを使用するか、(machine_spec, count)の形式を使用することで、リモートホスト上で複数のプロセスを起動することが可能です。ここで、countは指定されたホスト上で起動するワーカーの数です。ワーカー数に:autoを渡すと、リモートホスト上のCPUスレッドの数と同じ数のワーカーが起動します。

:

addprocs([
    "remote1",               # 現在のユーザー名でログインして'remote1'に1つのワーカー
    "user@remote2",          # 'user'ユーザー名でログインして'remote2'に1つのワーカー
    "user@remote3:2222",     # 'remote3'のSSHポートを'2222'に指定
    ("user@remote4", 4),     # 'remote4'に4つのワーカーを起動
    ("user@remote5", :auto), # 'remote5'のCPUスレッド数と同じ数のワーカーを起動
])

キーワード引数:

  • tunnel: trueの場合、マスタープロセスからワーカーに接続するためにSSHトンネリングが使用されます。デフォルトはfalseです。

  • multiplex: trueの場合、SSHトンネリングにSSHマルチプレクシングが使用されます。デフォルトはfalseです。

  • ssh: ワーカーを起動するために使用されるSSHクライアント実行可能ファイルの名前またはパス。デフォルトは"ssh"です。

  • sshflags: 追加のsshオプションを指定します。例: sshflags=`-i /home/foo/bar.pem`

  • max_parallel: ホストに並行して接続する最大ワーカー数を指定します。デフォルトは10です。

  • shell: ワーカー上でsshが接続するシェルの種類を指定します。

    • shell=:posix: POSIX互換のUnix/Linuxシェル(sh、ksh、bash、dash、zshなど)。デフォルト。
    • shell=:csh: Unix Cシェル(csh、tcsh)。
    • shell=:wincmd: Microsoft Windows cmd.exe
  • dir: ワーカー上の作業ディレクトリを指定します。デフォルトはホストの現在のディレクトリ(pwd()で見つけたもの)です。

  • enable_threaded_blas: trueの場合、追加されたプロセスでBLASが複数のスレッドで実行されます。デフォルトはfalseです。

  • exename: julia実行可能ファイルの名前。デフォルトは"$(Sys.BINDIR)/julia"または"$(Sys.BINDIR)/julia-debug"です。すべてのリモートマシンで共通のJuliaバージョンを使用することが推奨されます。そうしないと、シリアル化やコード配布が失敗する可能性があります。

  • exeflags: ワーカープロセスに渡される追加のフラグ。

  • topology: ワーカーが互いに接続する方法を指定します。接続されていないワーカー間でメッセージを送信するとエラーが発生します。

    • topology=:all_to_all: すべてのプロセスが互いに接続されています。デフォルト。
    • topology=:master_worker: ドライバープロセス、すなわちpid 1のみがワーカーに接続します。ワーカー同士は接続しません。
    • topology=:custom: クラスターマネージャのlaunchメソッドがWorkerConfig内のidentおよびconnect_identsフィールドを介して接続トポロジーを指定します。クラスターマネージャのID identを持つワーカーは、connect_identsで指定されたすべてのワーカーに接続します。
  • lazy: topology=:all_to_allの場合のみ適用されます。trueの場合、ワーカー間の接続は遅延して設定されます。すなわち、ワーカー間のリモート呼び出しの最初のインスタンスで設定されます。デフォルトはtrueです。

  • env: env=["JULIA_DEPOT_PATH"=>"/depot"]のような文字列ペアの配列を提供して、リモートマシン上で環境変数が設定されるように要求します。デフォルトでは、環境変数JULIA_WORKER_TIMEOUTのみが自動的にローカルからリモート環境に渡されます。

  • cmdline_cookie: --workerコマンドラインオプションを介して認証クッキーを渡します。クッキーをssh stdioを介して渡す(より安全な)デフォルトの動作は、古い(ConPTY以前の)JuliaまたはWindowsバージョンを使用するWindowsワーカーでハングする可能性があるため、その場合はcmdline_cookie=trueがワークアラウンドを提供します。

Julia 1.6

キーワード引数sshshellenvおよびcmdline_cookieはJulia 1.6で追加されました。

環境変数:

マスタープロセスが新しく起動したワーカーとの接続を60.0秒以内に確立できない場合、ワーカーはそれを致命的な状況と見なし、終了します。このタイムアウトは環境変数JULIA_WORKER_TIMEOUTを介して制御できます。マスタープロセス上のJULIA_WORKER_TIMEOUTの値は、新しく起動したワーカーが接続確立を待つ秒数を指定します。

source
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> プロセス識別子のリスト

ローカルホストで np ワーカーを起動し、内蔵の LocalManager を使用します。

ローカルワーカーは、メインプロセスから現在のパッケージ環境(すなわち、アクティブプロジェクト、LOAD_PATH、および DEPOT_PATH)を継承します。

Warning

ワーカーは ~/.julia/config/startup.jl スタートアップスクリプトを実行せず、他の実行中のプロセスとグローバル状態(コマンドラインスイッチ、グローバル変数、新しいメソッド定義、読み込まれたモジュールなど)を同期しないことに注意してください。

キーワード引数:

  • restrict::Bool: true(デフォルト)の場合、バインディングは 127.0.0.1 に制限されます。
  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas: SSHManager と同様の効果があります。 addprocs(machines::AbstractVector) のドキュメントを参照してください。
Julia 1.9

パッケージ環境の継承と env キーワード引数は、Julia 1.9 で追加されました。

source
Distributed.nprocsFunction
nprocs()

利用可能なプロセスの数を取得します。

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.nworkersFunction
nworkers()

利用可能なワーカープロセスの数を取得します。これはnprocs()より1少ないです。nprocs() == 1の場合はnprocs()と等しくなります。

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
source
Distributed.procsMethod
procs()

プロセス識別子のリストを返します。pid 1(workers() には含まれていない)を含みます。

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source
Distributed.procsMethod
procs(pid::Integer)

同じ物理ノード上のすべてのプロセス識別子のリストを返します。具体的には、pid と同じ IP アドレスにバインドされているすべてのワーカーが返されます。

source
Distributed.workersFunction
workers()

すべてのワーカープロセス識別子のリストを返します。

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.rmprocsFunction
rmprocs(pids...; waitfor=typemax(Int))

指定されたワーカーを削除します。ワーカーの追加または削除はプロセス1のみが行えます。

引数 waitfor は、ワーカーがシャットダウンするまでの待機時間を指定します:

  • 指定しない場合、rmprocs は要求されたすべての pids が削除されるまで待機します。
  • 要求された waitfor 秒の前にすべてのワーカーを終了できない場合、ErrorException が発生します。
  • waitfor の値が0の場合、呼び出しは即座に戻り、ワーカーは別のタスクで削除される予定です。スケジュールされた Task オブジェクトが返されます。ユーザーは、他の並列呼び出しを行う前にタスクに対して wait を呼び出す必要があります。

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source
Distributed.interruptFunction
interrupt(pids::Integer...)

指定されたワーカー上で現在実行中のタスクを中断します。これは、ローカルマシンでCtrl-Cを押すことに相当します。引数が指定されていない場合、すべてのワーカーが中断されます。

source
interrupt(pids::AbstractVector=workers())

指定されたワーカー上で現在実行中のタスクを中断します。これは、ローカルマシンでCtrl-Cを押すのと同等です。引数が指定されていない場合、すべてのワーカーが中断されます。

source
Distributed.myidFunction
myid()

現在のプロセスのIDを取得します。

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source
Distributed.pmapFunction
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

コレクション c を変換し、利用可能なワーカーとタスクを使用して各要素に f を適用します。

複数のコレクション引数がある場合、要素ごとに f を適用します。

f はすべてのワーカープロセスで利用可能でなければなりません。詳細については、コードの可用性とパッケージの読み込みを参照してください。

ワーカープールが指定されていない場合、すべての利用可能なワーカーが CachingPool を介して使用されます。

デフォルトでは、pmap は指定されたすべてのワーカーに計算を分散します。ローカルプロセスのみを使用し、タスクに分散させるには、distributed=false を指定します。これは asyncmap を使用するのと同等です。例えば、pmap(f, c; distributed=false)asyncmap(f,c; ntasks=()->nworkers()) と同等です。

pmapbatch_size 引数を介してプロセスとタスクの混合を使用することもできます。バッチサイズが1より大きい場合、コレクションは複数のバッチで処理され、それぞれの長さは batch_size 以下です。バッチは、空いているワーカーに対して単一のリクエストとして送信され、ローカルの asyncmap が複数の同時タスクを使用してバッチの要素を処理します。

エラーが発生すると、pmap はコレクションの残りの部分の処理を停止します。この動作をオーバーライドするには、引数 on_error を介してエラーハンドリング関数を指定できます。この関数は単一の引数、すなわち例外を受け取ります。関数はエラーを再スローすることで処理を停止することができます。また、処理を続行するために、呼び出し元に結果とインラインで返される任意の値を返すことができます。

次の2つの例を考えてみましょう。最初の例は例外オブジェクトをインラインで返し、2番目の例は例外の代わりに0を返します:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

エラーは、失敗した計算を再試行することで処理することもできます。キーワード引数 retry_delaysretry_check は、それぞれキーワード引数 delayscheck として retry に渡されます。バッチ処理が指定されている場合、バッチ全体が失敗した場合、バッチ内のすべてのアイテムが再試行されます。

on_errorretry_delays の両方が指定されている場合、再試行の前に on_error フックが呼び出されることに注意してください。on_error が例外をスロー(または再スロー)しない場合、その要素は再試行されません。

例:エラーが発生した場合、要素に対して最大3回 f を再試行し、再試行の間に遅延を設けない。

pmap(f, c; retry_delays = zeros(3))

例:例外が InexactError 型でない場合にのみ f を再試行し、最大3回まで指数的に増加する遅延を設ける。すべての InexactError の発生に対して NaN を返す。

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
source
Distributed.RemoteExceptionType
RemoteException(captured)

リモート計算での例外はキャプチャされ、ローカルで再スローされます。RemoteExceptionは、ワーカーのpidとキャプチャされた例外をラップします。CapturedExceptionは、リモート例外と例外が発生したときのコールスタックのシリアライズ可能な形式をキャプチャします。

source
Distributed.ProcessExitedExceptionType
ProcessExitedException(worker_id::Int)

クライアントのJuliaプロセスが終了した後、死んだ子プロセスを参照しようとすると、この例外がスローされます。

source
Distributed.FutureType
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Futureは、終了ステータスと時間が不明な単一の計算のプレースホルダーです。複数の潜在的な計算については、RemoteChannelを参照してください。AbstractRemoteRefを識別するにはremoteref_idを参照してください。

source
Distributed.RemoteChannelType
RemoteChannel(pid::Integer=myid())

プロセス pidChannel{Any}(1) への参照を作成します。デフォルトの pid は現在のプロセスです。

RemoteChannel(f::Function, pid::Integer=myid())

特定のサイズと型のリモートチャネルへの参照を作成します。fpid で実行されるときに AbstractChannel の実装を返さなければならない関数です。

例えば、RemoteChannel(()->Channel{Int}(10), pid) は、pid 上の型 Int でサイズ 10 のチャネルへの参照を返します。

デフォルトの pid は現在のプロセスです。

source
Base.fetchMethod
fetch(x::Future)

Future の値を待機して取得します。取得した値はローカルにキャッシュされます。同じ参照に対するさらなる fetch の呼び出しはキャッシュされた値を返します。リモート値が例外である場合、リモート例外とバックトレースをキャプチャする RemoteException をスローします。

source
Base.fetchMethod
fetch(c::RemoteChannel)

RemoteChannel から値を待って取得します。発生する例外は Future と同じです。取得したアイテムは削除されません。

source
fetch(x::Any)

xを返します。

source
Distributed.remotecallMethod
remotecall(f, id::Integer, args...; kwargs...) -> Future

指定されたプロセスで与えられた引数に対して関数 f を非同期に呼び出します。 Future を返します。キーワード引数がある場合、それらは f に渡されます。

source
Distributed.remotecall_waitMethod
remotecall_wait(f, id::Integer, args...; kwargs...)

指定されたワーカーID idWorker に対して、1つのメッセージでより高速な wait(remotecall(...)) を実行します。キーワード引数がある場合は、f に渡されます。

wait および remotecall も参照してください。

source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, id::Integer, args...; kwargs...)

fetch(remotecall(...))を1つのメッセージで実行します。キーワード引数がある場合、それらはfに渡されます。リモート例外はRemoteExceptionにキャッチされ、スローされます。

他にfetchremotecallも参照してください。

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrtは負の実引数で呼び出されましたが、複素引数で呼び出された場合にのみ複素結果を返します。sqrt(Complex(x))を試してください。
...
source
Distributed.remote_doMethod
remote_do(f, id::Integer, args...; kwargs...) -> nothing

ワーカー id で非同期に f を実行します。 remotecall とは異なり、計算の結果を保存せず、完了を待つ方法もありません。

成功した呼び出しは、リモートノードでの実行のためにリクエストが受け入れられたことを示します。

同じワーカーへの連続した remotecall は、呼び出された順序で直列化されますが、リモートワーカーでの実行の順序は不確定です。たとえば、remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) は、f1 への呼び出しを直列化し、その後に f2f3 をその順序で続けます。しかし、f1 がワーカー 2 で f3 よりも先に実行されることは保証されていません。

f によってスローされた例外は、リモートワーカーの stderr に印刷されます。

キーワード引数がある場合、それらは f に渡されます。

source
Base.put!Method
put!(rr::RemoteChannel, args...)

RemoteChannelに値のセットを格納します。チャネルが満杯の場合、空きが出るまでブロックします。最初の引数を返します。

source
Base.put!Method
put!(rr::Future, v)

値をFuture rrに格納します。Futureは一度だけ書き込むことができるリモート参照です。すでに設定されたFutureに対してput!を実行すると、Exceptionがスローされます。すべての非同期リモート呼び出しはFutureを返し、完了時に呼び出しの戻り値に値を設定します。

source
Base.take!Method
take!(rr::RemoteChannel, args...)

RemoteChannel rr から値を取得し、プロセスの中で値を削除します。

source
Base.isreadyMethod
isready(rr::RemoteChannel, args...)

RemoteChannel に値が格納されているかどうかを判断します。この関数はレースコンディションを引き起こす可能性があることに注意してください。結果を受け取る時点で、もはや真でない可能性があります。しかし、Future に対しては一度だけ割り当てられるため、安全に使用できます。

source
Base.isreadyMethod
isready(rr::Future)

Future に値が格納されているかどうかを判断します。

引数 Future が別のノードによって所有されている場合、この呼び出しは回答を待つためにブロックします。rr を別のタスクで待機するか、ローカルの Channel をプロキシとして使用することをお勧めします:

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # ブロックしない
source
Distributed.AbstractWorkerPoolType
AbstractWorkerPool

WorkerPoolCachingPoolなどのワーカープールのスーパタイプです。AbstractWorkerPoolは以下を実装する必要があります:

  • push! - 新しいワーカーを全体のプールに追加する(利用可能 + ビジー)
  • put! - ワーカーを利用可能なプールに戻す
  • take! - 利用可能なプールからワーカーを取得する(リモート関数実行に使用するため)
  • length - 全体のプールに利用可能なワーカーの数
  • isready - プールでtake!がブロックされる場合はfalseを返し、そうでなければtrueを返す

上記のデフォルト実装(AbstractWorkerPool上)は以下のフィールドを必要とします:

  • channel::Channel{Int}
  • workers::Set{Int}

ここで、channelは自由なワーカーピッドを含み、workersはこのプールに関連付けられたすべてのワーカーのセットです。

source
Distributed.WorkerPoolType
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

ベクターまたはワーカーIDの範囲から WorkerPool を作成します。

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
source
Distributed.CachingPoolType
CachingPool(workers::Vector{Int})

AbstractWorkerPoolの実装です。 remote, remotecall_fetch, pmap(および関数をリモートで実行する他のリモート呼び出し)は、特にクロージャ(大量のデータをキャプチャする可能性がある)において、ワーカーノード上でシリアライズ/デシリアライズされた関数をキャッシュすることで恩恵を受けます。

リモートキャッシュは、返されたCachingPoolオブジェクトのライフタイムの間維持されます。キャッシュを早期にクリアするには、clear!(pool)を使用します。

グローバル変数については、データではなくバインディングのみがクロージャにキャプチャされます。グローバルデータをキャプチャするにはletブロックを使用できます。

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

上記は、fooを各ワーカーに一度だけ転送します。

source
Distributed.default_worker_poolFunction
default_worker_pool()

AbstractWorkerPool はアイドル状態の workers を含んでおり、remote(f) および pmap(デフォルトで)によって使用されます。default_worker_pool!(pool) を介して明示的に設定されない限り、デフォルトのワーカープールは WorkerPool に初期化されます。

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source
Distributed.clear!Function
clear!(syms, pids=workers(); mod=Main)

モジュール内のグローバルバインディングを nothing に初期化することでクリアします。 symsSymbol 型であるか、Symbol のコレクションである必要があります。 pidsmod は、グローバル変数を再初期化するプロセスとモジュールを特定します。 mod の下で定義されていると見なされる名前のみがクリアされます。

グローバル定数をクリアしようとすると例外が発生します。

source
clear!(pool::CachingPool) -> pool

すべての参加ワーカーからすべてのキャッシュされた関数を削除します。

source
Distributed.remoteFunction
remote([p::AbstractWorkerPool], f) -> Function

利用可能なワーカーで関数 f を実行する無名関数を返します(提供されている場合は WorkerPool p から取得) remotecall_fetch を使用して。

source
Distributed.remotecallMethod
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPoolremotecall(f, pid, ....) のバリアントです。pool から空いているワーカーを待って取得し、それに対して remotecall を実行します。

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

この例では、タスクは pid 2 で実行され、pid 1 から呼び出されました。

source
Distributed.remotecall_waitMethod
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPoolremotecall_wait(f, pid, ....) のバリアント。pool から空いているワーカーを待機して取得し、それに対して remotecall_wait を実行します。

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPoolremotecall_fetch(f, pid, ....) のバリアントです。pool から空いているワーカーを待って取得し、それに対して remotecall_fetch を実行します。

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source
Distributed.remote_doMethod
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

WorkerPool のバリアントである remote_do(f, pid, ....)pool から空いているワーカーを待機して取得し、それに対して remote_do を実行します。

source
Distributed.@spawnMacro
@spawn expr

式の周りにクロージャを作成し、自動的に選択されたプロセスで実行し、結果へのFutureを返します。このマクロは非推奨です; 代わりに@spawnat :any exprを使用するべきです。

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

Julia 1.3以降、このマクロは非推奨です。代わりに@spawnat :anyを使用してください。

source
Distributed.@spawnatMacro
@spawnat p expr

式の周りにクロージャを作成し、プロセス p で非同期にクロージャを実行します。結果への Future を返します。p が引用されたリテラルシンボル :any の場合、システムは自動的に使用するプロセッサを選択します。

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

:any 引数は Julia 1.3 以降で利用可能です。

source
Distributed.@fetchMacro
@fetch expr

fetch(@spawnat :any expr) と同等です。 fetch@spawnat を参照してください。

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source
Distributed.@fetchfromMacro
@fetchfrom

fetch(@spawnat p expr) と同等です。 fetch@spawnat を参照してください。

julia> addprocs(3);

julia> @fetchfrom 2 myid()
2

julia> @fetchfrom 4 myid()
4
source
Distributed.@distributedMacro
@distributed

分散メモリの並列forループの形式:

@distributed [reducer] for var = range
    body
end

指定された範囲は分割され、すべてのワーカーでローカルに実行されます。オプションのリデューサ関数が指定されている場合、@distributedは各ワーカーでローカルなリデュースを行い、呼び出しプロセスで最終的なリデュースを行います。

リデューサ関数がない場合、@distributedは非同期に実行されることに注意してください。つまり、すべての利用可能なワーカーで独立したタスクを生成し、完了を待たずにすぐに戻ります。完了を待つには、呼び出しの前に@syncを付けます。例えば:

@sync @distributed for var = range
    body
end
source
Distributed.@everywhereMacro
@everywhere [procs()] expr

すべての procsMain の下で式を実行します。プロセスのいずれかでエラーが発生した場合、それらは CompositeException に収集され、スローされます。例えば:

@everywhere bar = 1

は、すべての現在のプロセスで Main.bar を定義します。後で追加されたプロセス(例えば addprocs() を使用して)は、式が定義されません。

@spawnat とは異なり、@everywhere はローカル変数をキャプチャしません。代わりに、ローカル変数は補間を使用してブロードキャストできます:

foo = 1
@everywhere bar = $foo

オプションの引数 procs は、式を実行するプロセスのサブセットを指定することを可能にします。

remotecall_eval(Main, procs, expr) を呼び出すのと似ていますが、2つの追加機能があります:

- `using` と `import` ステートメントは、最初に呼び出しプロセスで実行され、パッケージが事前コンパイルされることを保証します。
- `include` によって使用される現在のソースファイルパスが他のプロセスに伝播されます。
source
Distributed.remoteref_idFunction
remoteref_id(r::AbstractRemoteRef) -> RRID

Futures と RemoteChannels は次のフィールドによって識別されます:

  • where - 参照によって指される基盤となるオブジェクト/ストレージが実際に存在するノードを指します。
  • whence - リモート参照が作成されたノードを指します。これは、実際に存在する基盤となるオブジェクトが指されているノードとは異なることに注意してください。たとえば、マスタープロセスから RemoteChannel(2) を呼び出すと、where の値は 2 になり、whence の値は 1 になります。
  • id は、whence で指定されたワーカーから作成されたすべての参照の中で一意です。

これらを合わせると、whenceid はすべてのワーカーにわたって参照を一意に識別します。

remoteref_id は低レベルの API で、リモート参照の whenceid の値をラップした RRID オブジェクトを返します。

source
Distributed.channel_from_idFunction
channel_from_id(id) -> c

remoteref_id[@ref]によって返されたidのバックエンドAbstractChannelを返す低レベルAPIです。この呼び出しは、バックエンドチャネルが存在するノードでのみ有効です。

source
Distributed.worker_id_from_socketFunction
worker_id_from_socket(s) -> pid

IO接続またはWorkerを与えると、それに接続されているワーカーのpidを返す低レベルAPIです。これは、受信プロセスIDに応じて書き出されるデータを最適化するために、型のカスタムserializeメソッドを書く際に便利です。

source
Distributed.cluster_cookieMethod
cluster_cookie(cookie) -> cookie

渡されたクッキーをクラスタークッキーとして設定し、それを返します。

source

Cluster Manager Interface

このインターフェースは、異なるクラスタ環境でJuliaワーカーを起動および管理するためのメカニズムを提供します。Baseには2種類のマネージャーが存在します:同じホスト上で追加のワーカーを起動するためのLocalManagerと、sshを介してリモートホストで起動するためのSSHManagerです。TCP/IPソケットは、プロセス間で接続し、メッセージを転送するために使用されます。クラスタマネージャーが異なるトランスポートを提供することも可能です。

Distributed.ClusterManagerType
ClusterManager

クラスターを制御するワーカープロセスのためのクラスのスーパークラス。クラスター管理者は、ワーカーを追加、削除、通信する方法を実装します。SSHManagerLocalManager はこのサブタイプです。

source
Distributed.WorkerConfigType
WorkerConfig

ClusterManagerによって使用されるタイプで、クラスターに追加されたワーカーを制御します。いくつかのフィールドは、すべてのクラスター管理者がホストにアクセスするために使用します:

  • io – ワーカーにアクセスするために使用される接続(IOのサブタイプまたはNothing
  • host – ホストアドレス(StringまたはNothing
  • port – ワーカーに接続するためにホスト上で使用されるポート(IntまたはNothing

いくつかは、クラスター管理者がすでに初期化されたホストにワーカーを追加するために使用されます:

  • count – ホスト上で起動されるワーカーの数
  • exename – ホスト上のJulia実行可能ファイルへのパス、デフォルトは"$(Sys.BINDIR)/julia"または"$(Sys.BINDIR)/julia-debug"
  • exeflags – Juliaをリモートで起動する際に使用するフラグ

userdataフィールドは、外部管理者によって各ワーカーの情報を保存するために使用されます。

いくつかのフィールドはSSHManagerや類似の管理者によって使用されます:

  • tunneltrue(トンネリングを使用)、false(トンネリングを使用しない)、またはnothing(管理者のデフォルトを使用)
  • multiplextrue(トンネリングのためにSSHマルチプレクシングを使用)またはfalse
  • forward – sshの-Lオプションに使用される転送オプション
  • bind_addr – リモートホストでバインドするアドレス
  • sshflags – SSH接続を確立する際に使用するフラグ
  • max_parallel – ホスト上で並行して接続する最大ワーカー数

いくつかのフィールドはLocalManagerSSHManagerの両方によって使用されます:

  • connect_at – これはワーカー間接続かドライバーからワーカーへのセットアップコールかを決定します
  • process – 接続されるプロセス(通常、管理者はaddprocs中にこれを割り当てます)
  • ospid – ホストOSに従ったプロセスID、ワーカープロセスを中断するために使用
  • environ – Local/SSH管理者によって一時的な情報を保存するために使用されるプライベート辞書
  • identClusterManagerによって識別されたワーカー
  • connect_idents – カスタムトポロジーを使用する場合にワーカーが接続しなければならないワーカーIDのリスト
  • enable_threaded_blastruefalse、またはnothing、ワーカーでスレッド化されたBLASを使用するかどうか
source
Distributed.launchFunction
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

クラスターマネージャによって実装されます。この関数によって起動されたすべてのJuliaワーカーについて、launchedWorkerConfigエントリを追加し、launch_ntfyに通知する必要があります。この関数は、managerによって要求されたすべてのワーカーが起動された時点で終了しなければなりません。paramsは、addprocsが呼び出されたすべてのキーワード引数の辞書です。

source
Distributed.manageFunction
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

クラスターマネージャーによって実装されます。これは、ワーカーのライフタイム中にマスタープロセスで呼び出され、適切な op 値が使用されます:

  • ワーカーがJuliaワーカープールに追加/削除されるときに :register/:deregister とともに。
  • interrupt(workers) が呼び出されたときに :interrupt とともに。ClusterManager は適切なワーカーに割り込み信号を送信する必要があります。
  • クリーンアップ目的で :finalize とともに。
source
Base.killMethod
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

クラスターマネージャによって実装されます。これは、rmprocsによってマスタープロセスで呼び出されます。これは、pidで指定されたリモートワーカーを終了させるべきです。kill(manager::ClusterManager.....)は、pidでリモートのexit()を実行します。

source
Sockets.connectMethod
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

クラスターマネージャーによってカスタムトランスポートを使用して実装されます。これは、configで指定されたpidのワーカーとの論理的な接続を確立し、IOオブジェクトのペアを返す必要があります。pidから現在のプロセスへのメッセージはinstrmから読み取られ、pidに送信されるメッセージはoutstrmに書き込まれます。カスタムトランスポートの実装は、メッセージが完全にかつ順序通りに配信され、受信されることを保証しなければなりません。connect(manager::ClusterManager.....)は、ワーカー間のTCP/IPソケット接続を設定します。

source
Distributed.init_workerFunction
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

カスタムトランスポートを実装するクラスターマネージャーによって呼び出されます。新しく起動されたプロセスをワーカーとして初期化します。コマンドライン引数 --worker[=<cookie>] は、TCP/IPソケットを使用してプロセスをワーカーとして初期化する効果があります。cookiecluster_cookie です。

source
Distributed.start_workerFunction
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker は、TCP/IP 経由で接続するワーカープロセスのデフォルトのエントリポイントである内部関数です。プロセスを Julia クラスターのワーカーとして設定します。

ホスト:ポート情報はストリーム out に書き込まれます(デフォルトは stdout です)。

この関数は、必要に応じて stdin からクッキーを読み取り、空いているポートでリッスンします(または指定された場合は --bind-to コマンドラインオプションのポート)し、受信した TCP 接続とリクエストを処理するタスクをスケジュールします。また、(オプションで)stdin を閉じ、stderr を stdout にリダイレクトします。

戻り値はありません。

source
Distributed.process_messagesFunction
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

クラスター管理者によってカスタムトランスポートを使用して呼び出されます。これは、カスタムトランスポートの実装がリモートワーカーからの最初のメッセージを受信したときに呼び出されるべきです。カスタムトランスポートは、リモートワーカーへの論理接続を管理し、受信メッセージ用の IO オブジェクトとリモートワーカー宛てのメッセージ用の IO オブジェクトの2つを提供する必要があります。incomingtrue の場合、リモートピアが接続を開始しました。接続を開始したペアのいずれかが、クラスタークッキーとそのJuliaバージョン番号を送信して認証ハンドシェイクを行います。

詳細は cluster_cookie を参照してください。

source
Distributed.default_addprocs_paramsFunction
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

クラスターマネージャによって実装されます。addprocs(mgr)を呼び出すときに渡されるデフォルトのキーワードパラメータです。最小限のオプションセットはdefault_addprocs_params()を呼び出すことで利用可能です。

source