Distributed Computing
Distributed — Module分布式并行处理的工具。
Distributed.addprocs — Functionaddprocs(manager::ClusterManager; kwargs...) -> 进程标识符列表通过指定的集群管理器启动工作进程。
例如,通过在包 ClusterManagers.jl 中实现的自定义集群管理器支持 Beowulf 集群。
新启动的工作进程等待从主节点建立连接的秒数可以通过工作进程环境中的变量 JULIA_WORKER_TIMEOUT 指定。仅在使用 TCP/IP 作为传输时相关。
要在不阻塞 REPL 或者如果以编程方式启动工作进程的包含函数的情况下启动工作进程,请在其自己的任务中执行 addprocs。
示例
# 在繁忙的集群上,异步调用 `addprocs`
t = @async addprocs(...)# 利用工作进程在它们上线时
if nprocs() > 1 # 确保至少有一个新工作进程可用
.... # 执行分布式计算
end# 检索新启动的工作进程 ID 或任何错误消息
if istaskdone(t) # 检查 `addprocs` 是否已完成,以确保 `fetch` 不会阻塞
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> 进程标识符列表通过 SSH 在远程机器上添加工作进程。配置通过关键字参数完成(见下文)。特别是,exename 关键字可用于指定远程机器上的 julia 二进制文件的路径。
machines 是一个“机器规格”的向量,格式为 [user@]host[:port] [bind_addr[:port]] 的字符串。user 默认为当前用户,port 默认为标准 SSH 端口。如果指定了 [bind_addr[:port]],其他工作进程将通过指定的 bind_addr 和 port 连接到此工作进程。
可以通过在 machines 向量中使用元组或形式 (machine_spec, count) 来在远程主机上启动多个进程,其中 count 是要在指定主机上启动的工作进程数量。将 :auto 作为工作进程数量传递将启动与远程主机上的 CPU 线程数量相同的工作进程。
示例:
addprocs([
"remote1", # 在 'remote1' 上以当前用户名登录的一个工作进程
"user@remote2", # 在 'remote2' 上以 'user' 用户名登录的一个工作进程
"user@remote3:2222", # 为 'remote3' 指定 SSH 端口为 '2222'
("user@remote4", 4), # 在 'remote4' 上启动 4 个工作进程
("user@remote5", :auto), # 在 'remote5' 上启动与 CPU 线程数量相同的工作进程
])关键字参数:
tunnel:如果为true,则将使用 SSH 隧道从主进程连接到工作进程。默认值为false。multiplex:如果为true,则在 SSH 隧道中使用 SSH 复用。默认值为false。ssh:用于启动工作进程的 SSH 客户端可执行文件的名称或路径。默认值为"ssh"。sshflags:指定额外的 ssh 选项,例如sshflags=`-i /home/foo/bar.pem`max_parallel:指定在主机上并行连接的最大工作进程数量。默认值为 10。shell:指定 SSH 在工作进程上连接的 shell 类型。shell=:posix:兼容 POSIX 的 Unix/Linux shell(sh、ksh、bash、dash、zsh 等)。默认值。shell=:csh:Unix C shell(csh、tcsh)。shell=:wincmd:Microsoft Windowscmd.exe。
dir:指定工作进程上的工作目录。默认值为主机的当前目录(通过pwd()找到)。enable_threaded_blas:如果为true,则 BLAS 将在添加的进程中以多个线程运行。默认值为false。exename:julia可执行文件的名称。默认值为"$(Sys.BINDIR)/julia"或"$(Sys.BINDIR)/julia-debug",具体取决于情况。建议在所有远程机器上使用相同的 Julia 版本,因为否则序列化和代码分发可能会失败。exeflags:传递给工作进程的额外标志。topology:指定工作进程之间的连接方式。在未连接的工作进程之间发送消息会导致错误。topology=:all_to_all:所有进程彼此连接。默认值。topology=:master_worker:只有驱动进程,即pid1 连接到工作进程。工作进程之间不相互连接。topology=:custom:集群管理器的launch方法通过WorkerConfig中的ident和connect_idents字段指定连接拓扑。具有集群管理器身份ident的工作进程将连接到connect_idents中指定的所有工作进程。
lazy:仅适用于topology=:all_to_all。如果为true,则工作进程之间的连接是懒惰设置的,即在工作进程之间的第一次远程调用时设置。默认值为 true。env:提供一个字符串对数组,例如env=["JULIA_DEPOT_PATH"=>"/depot"],以请求在远程机器上设置环境变量。默认情况下,只有环境变量JULIA_WORKER_TIMEOUT会自动从本地传递到远程环境。cmdline_cookie:通过--worker命令行选项传递身份验证 cookie。通过 ssh stdio 传递 cookie 的(更安全的)默认行为可能会在使用较旧(预 ConPTY)Julia 或 Windows 版本的 Windows 工作进程中挂起,在这种情况下,cmdline_cookie=true提供了一种解决方法。
关键字参数 ssh、shell、env 和 cmdline_cookie 在 Julia 1.6 中添加。
环境变量:
如果主进程在 60.0 秒内未能与新启动的工作进程建立连接,则该工作进程将其视为致命情况并终止。此超时可以通过环境变量 JULIA_WORKER_TIMEOUT 控制。主进程上的 JULIA_WORKER_TIMEOUT 的值指定新启动的工作进程等待建立连接的秒数。
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> 进程标识符列表使用内置的 LocalManager 在本地主机上启动 np 个工作进程。
本地工作进程从主进程继承当前的包环境(即活动项目、LOAD_PATH 和 DEPOT_PATH)。
请注意,工作进程不会运行 ~/.julia/config/startup.jl 启动脚本,也不会与其他正在运行的进程同步其全局状态(例如命令行开关、全局变量、新方法定义和加载的模块)。
关键字参数:
restrict::Bool:如果为true(默认),绑定限制为127.0.0.1。dir、exename、exeflags、env、topology、lazy、enable_threaded_blas:与SSHManager的效果相同,参见addprocs(machines::AbstractVector)的文档。
包环境的继承和 env 关键字参数是在 Julia 1.9 中添加的。
Distributed.nprocs — Functionnprocs()获取可用进程的数量。
示例
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — Functionnworkers()获取可用工作进程的数量。这比 nprocs() 少一个。如果 nprocs() == 1,则等于 nprocs()。
示例
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — Methodprocs()返回所有进程标识符的列表,包括 pid 1(该进程不包括在 workers() 中)。
示例
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — Methodprocs(pid::Integer)返回同一物理节点上所有进程标识符的列表。具体来说,返回与 pid 绑定到相同 IP 地址的所有工作进程。
Distributed.workers — Functionworkers()返回所有工作进程标识符的列表。
示例
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — Functionrmprocs(pids...; waitfor=typemax(Int))移除指定的工作进程。请注意,只有进程 1 可以添加或移除工作进程。
参数 waitfor 指定等待工作进程关闭的时间:
- 如果未指定,
rmprocs将等待直到所有请求的pids被移除。 - 如果在请求的
waitfor秒之前无法终止所有工作进程,将引发ErrorException。 - 当
waitfor值为 0 时,调用将立即返回,工作进程将在不同的任务中安排移除。返回调度的Task对象。用户应在调用任何其他并行调用之前对任务调用wait。
示例
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — Functioninterrupt(pids::Integer...)中断指定工作者上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果没有给出参数,则所有工作者都会被中断。
interrupt(pids::AbstractVector=workers())中断指定工作者上当前正在执行的任务。这相当于在本地机器上按下 Ctrl-C。如果没有给出参数,则所有工作者都会被中断。
Distributed.myid — Functionmyid()获取当前进程的 ID。
示例
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection通过对每个元素应用 f 来转换集合 c,使用可用的工作者和任务。
对于多个集合参数,逐元素应用 f。
请注意,f 必须对所有工作进程可用;有关详细信息,请参见 代码可用性和加载包。
如果未指定工作者池,将通过 CachingPool 使用所有可用的工作者。
默认情况下,pmap 在所有指定的工作者上分配计算。要仅使用本地进程并在任务之间分配,请指定 distributed=false。这相当于使用 asyncmap。例如,pmap(f, c; distributed=false) 相当于 asyncmap(f,c; ntasks=()->nworkers())
pmap 还可以通过 batch_size 参数使用进程和任务的混合。对于大于 1 的批量大小,集合将分成多个批次处理,每个批次的长度为 batch_size 或更少。一个批次作为单个请求发送到空闲工作者,在那里本地的 asyncmap 使用多个并发任务处理批次中的元素。
任何错误都会阻止 pmap 处理集合的其余部分。要覆盖此行为,您可以通过参数 on_error 指定一个错误处理函数,该函数接受一个参数,即异常。该函数可以通过重新抛出错误来停止处理,或者为了继续,返回任何值,然后与结果一起返回给调用者。
考虑以下两个示例。第一个示例在线返回异常对象,第二个示例在任何异常的位置返回 0:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0错误也可以通过重试失败的计算来处理。关键字参数 retry_delays 和 retry_check 作为关键字参数 delays 和 check 传递给 retry。如果指定了批处理,并且整个批次失败,则批次中的所有项目都会被重试。
请注意,如果同时指定了 on_error 和 retry_delays,则在重试之前会调用 on_error 钩子。如果 on_error 不抛出(或重新抛出)异常,则该元素将不会被重试。
示例:在错误发生时,最多重试 f 对一个元素 3 次,重试之间没有任何延迟。
pmap(f, c; retry_delays = zeros(3))示例:仅在异常不是 InexactError 类型时重试 f,并以指数方式增加延迟,最多重试 3 次。对于所有 InexactError 的出现返回 NaN。
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — TypeRemoteException(captured)在远程计算中捕获并重新抛出异常。RemoteException 包装了工作者的 pid 和一个捕获的异常。CapturedException 捕获远程异常以及在异常被抛出时调用栈的可序列化形式。
Distributed.ProcessExitedException — TypeProcessExitedException(worker_id::Int)在客户端 Julia 进程退出后,进一步尝试引用已死亡的子进程将抛出此异常。
Distributed.Future — TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Future 是一个占位符,用于表示一个未知终止状态和时间的单一计算。有关多个潜在计算的信息,请参见 RemoteChannel。有关识别 AbstractRemoteRef 的信息,请参见 remoteref_id。
Distributed.RemoteChannel — TypeRemoteChannel(pid::Integer=myid())在进程 pid 上创建对 Channel{Any}(1) 的引用。默认的 pid 是当前进程。
RemoteChannel(f::Function, pid::Integer=myid())创建对特定大小和类型的远程通道的引用。f 是一个函数,当在 pid 上执行时,必须返回一个 AbstractChannel 的实现。
例如,RemoteChannel(()->Channel{Int}(10), pid) 将返回对 pid 上类型为 Int 和大小为 10 的通道的引用。
默认的 pid 是当前进程。
Base.fetch — Methodfetch(x::Future)等待并获取Future的值。获取的值会被本地缓存。对同一引用的进一步调用fetch将返回缓存的值。如果远程值是一个异常,则抛出一个RemoteException,该异常捕获远程异常和回溯信息。
Base.fetch — Methodfetch(c::RemoteChannel)等待并从RemoteChannel获取一个值。引发的异常与Future相同。不会移除获取的项目。
fetch(x::Any)返回 x。
Distributed.remotecall — Methodremotecall(f, id::Integer, args...; kwargs...) -> Future在指定的进程上异步调用函数 f,使用给定的参数。返回一个 Future。如果有关键字参数,它们将传递给 f。
Distributed.remotecall_wait — Methodremotecall_wait(f, id::Integer, args...; kwargs...)在由工作者 ID id 指定的 Worker 上以单个消息执行更快的 wait(remotecall(...))。关键字参数(如果有)将传递给 f。
另见 wait 和 remotecall。
Distributed.remotecall_fetch — Methodremotecall_fetch(f, id::Integer, args...; kwargs...)在一条消息中执行 fetch(remotecall(...))。关键字参数(如果有)将传递给 f。任何远程异常都将被捕获在 RemoteException 中并抛出。
另请参见 fetch 和 remotecall。
示例
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: 在工作节点 2:
DomainError with -4.0:
sqrt 被调用时使用了负实数参数,但仅在使用复数参数时才会返回复数结果。尝试 sqrt(Complex(x))。
...Distributed.remote_do — Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing在工作节点 id 上异步执行 f。与 remotecall 不同,它不存储计算结果,也没有等待其完成的方法。
成功的调用表示请求已被接受在远程节点上执行。
虽然对同一工作节点的连续 remotecall 是按调用顺序序列化的,但在远程工作节点上的执行顺序是未确定的。例如,remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) 将序列化对 f1 的调用,随后是 f2 和 f3。然而,并不能保证 f1 在工作节点 2 上的执行会在 f3 之前。
f 抛出的任何异常都会在远程工作节点的 stderr 上打印。
关键字参数(如果有的话)会传递给 f。
Base.put! — Methodput!(rr::RemoteChannel, args...)将一组值存储到 RemoteChannel。如果通道已满,则阻塞直到有空间可用。返回第一个参数。
Base.put! — Methodput!(rr::Future, v)将一个值存储到一个 Future rr。Future 是一次性写入的远程引用。对已经设置的 Future 进行 put! 会抛出一个 Exception。所有异步远程调用返回 Future,并在完成时将值设置为调用的返回值。
Base.take! — Methodtake!(rr::RemoteChannel, args...)从 RemoteChannel rr 中获取值,过程中移除该值。
Base.isready — Methodisready(rr::RemoteChannel, args...)确定一个 RemoteChannel 是否存储了一个值。请注意,这个函数可能会导致竞争条件,因为在你收到结果时,它可能不再成立。然而,它可以安全地用于 Future,因为它们只被赋值一次。
Base.isready — Methodisready(rr::Future)确定一个 Future 是否有值存储。
如果参数 Future 被不同的节点拥有,此调用将会阻塞以等待答案。建议在单独的任务中等待 rr,或者使用本地的 Channel 作为代理:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # 不会阻塞Distributed.AbstractWorkerPool — TypeAbstractWorkerPool工人池的超类型,例如 WorkerPool 和 CachingPool。AbstractWorkerPool 应该实现:
push!- 将一个新工人添加到整体池中(可用 + 忙碌)put!- 将一个工人放回可用池中take!- 从可用池中取出一个工人(用于远程函数执行)length- 整体池中可用工人的数量isready- 如果对池的take!操作会阻塞,则返回 false,否则返回 true
上述的默认实现(在 AbstractWorkerPool 上)需要字段
channel::Channel{Int}workers::Set{Int}
其中 channel 包含空闲工人的 pid,workers 是与此池关联的所有工人的集合。
Distributed.WorkerPool — TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})从一个工作者 ID 的向量或范围创建一个 WorkerPool。
示例
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — TypeCachingPool(workers::Vector{Int})AbstractWorkerPool 的实现。 remote, remotecall_fetch, pmap(以及其他远程调用,这些调用远程执行函数)受益于在工作节点上缓存序列化/反序列化的函数,特别是闭包(可能捕获大量数据)。
远程缓存在返回的 CachingPool 对象的生命周期内维护。要提前清除缓存,请使用 clear!(pool)。
对于全局变量,仅在闭包中捕获绑定,而不是数据。可以使用 let 块来捕获全局数据。
示例
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end上述代码只会将 foo 转移到每个工作节点一次。
Distributed.default_worker_pool — Functiondefault_worker_pool()AbstractWorkerPool 包含空闲的 workers - 被 remote(f) 和 pmap(默认情况下)使用。除非通过 default_worker_pool!(pool) 显式设置,否则默认工作池初始化为 WorkerPool。
示例
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — Functionclear!(syms, pids=workers(); mod=Main)通过将全局绑定初始化为 nothing 来清除模块中的全局绑定。 syms 应该是 Symbol 类型或 Symbol 的集合。 pids 和 mod 用于标识要重新初始化全局变量的进程和模块。 只有在 mod 下定义的名称才会被清除。
如果请求清除全局常量,将引发异常。
clear!(pool::CachingPool) -> pool从所有参与的工作节点中移除所有缓存的函数。
Distributed.remote — Functionremote([p::AbstractWorkerPool], f) -> Function返回一个匿名函数,该函数在可用的工作者上执行函数 f(如果提供了,则从 WorkerPool p 中抽取),使用 remotecall_fetch。
Distributed.remotecall — Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool 变体的 remotecall(f, pid, ....)。等待并从 pool 中获取一个空闲的工作者,并在其上执行 remotecall。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)在这个例子中,任务在 pid 2 上运行,从 pid 1 调用。
Distributed.remotecall_wait — Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool 变体的 remotecall_wait(f, pid, ....)。等待并从 pool 中获取一个空闲的工作者,并在其上执行 remotecall_wait。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> resultWorkerPool 变体的 remotecall_fetch(f, pid, ....)。等待并从 pool 中获取一个空闲的工作者,并在其上执行 remotecall_fetch。
示例
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothingWorkerPool 变体的 remote_do(f, pid, ....)。等待并从 pool 中获取一个空闲的工作者,并在其上执行 remote_do。
Distributed.@spawn — Macro@spawn expr创建一个围绕表达式的闭包,并在自动选择的进程上运行它,返回一个 Future 结果。此宏已被弃用;应使用 @spawnat :any expr。
示例
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3从 Julia 1.3 开始,此宏已被弃用。请改用 @spawnat :any。
Distributed.@spawnat — Macro@spawnat p expr创建一个围绕表达式的闭包,并在进程 p 上异步运行该闭包。返回一个 Future 结果。如果 p 是引用的字面符号 :any,则系统将自动选择一个处理器来使用。
示例
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3:any 参数自 Julia 1.3 起可用。
Distributed.@fetch — Macro@fetch expr等同于 fetch(@spawnat :any expr)。请参见 fetch 和 @spawnat。
示例
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — Macro@fetchfrom相当于 fetch(@spawnat p expr)。请参见 fetch 和 @spawnat。
示例
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — Macro@distributed一种分布式内存的并行 for 循环形式:
@distributed [reducer] for var = range
body
end指定的范围被划分并在所有工作节点上本地执行。如果指定了可选的 reducer 函数,@distributed 会在每个工作节点上执行本地归约,并在调用进程上进行最终归约。
请注意,如果没有 reducer 函数,@distributed 会异步执行,即它会在所有可用工作节点上生成独立的任务,并立即返回而不等待完成。要等待完成,请在调用前加上 @sync,如:
@sync @distributed for var = range
body
endDistributed.@everywhere — Macro@everywhere [procs()] expr在所有 procs 上执行一个表达式。任何进程上的错误都会被收集到一个 CompositeException 中并抛出。例如:
@everywhere bar = 1将在所有当前进程中定义 Main.bar。任何稍后添加的进程(例如使用 addprocs())将不会定义该表达式。
与 @spawnat 不同,@everywhere 不捕获任何局部变量。相反,可以使用插值广播局部变量:
foo = 1
@everywhere bar = $foo可选参数 procs 允许指定一个子集的所有进程来执行该表达式。
类似于调用 remotecall_eval(Main, procs, expr),但有两个额外的功能:
- `using` 和 `import` 语句首先在调用进程上运行,以确保
包已预编译。
- `include` 使用的当前源文件路径会传播到其他进程。Distributed.remoteref_id — Functionremoteref_id(r::AbstractRemoteRef) -> RRIDFutures 和 RemoteChannels 通过以下字段进行标识:
where- 指的是底层对象/存储实际存在的节点。whence- 指的是创建远程引用的节点。请注意,这与底层对象实际存在的节点不同。例如,从主进程调用RemoteChannel(2)将导致where值为 2,whence值为 1。id在由whence指定的工作者创建的所有引用中是唯一的。
综合来看,whence 和 id 唯一标识了所有工作者中的一个引用。
remoteref_id 是一个低级 API,返回一个 RRID 对象,该对象封装了远程引用的 whence 和 id 值。
Distributed.channel_from_id — Functionchannel_from_id(id) -> c一个低级 API,返回由 remoteref_id 返回的 id 的后备 AbstractChannel。该调用仅在存在后备通道的节点上有效。
Distributed.worker_id_from_socket — Functionworker_id_from_socket(s) -> pid一个低级 API,给定一个 IO 连接或一个 Worker,返回它所连接的工作者的 pid。在为一个类型编写自定义 serialize 方法时,这非常有用,因为它根据接收进程的 id 优化写出的数据。
Distributed.cluster_cookie — Methodcluster_cookie() -> cookie返回集群 cookie。
Distributed.cluster_cookie — Methodcluster_cookie(cookie) -> cookie将传递的 cookie 设置为集群 cookie,然后返回它。
Cluster Manager Interface
此接口提供了一种机制,用于在不同的集群环境中启动和管理 Julia 工作进程。Base 中存在两种类型的管理器:LocalManager,用于在同一主机上启动额外的工作进程,以及 SSHManager,用于通过 ssh 在远程主机上启动工作进程。TCP/IP 套接字用于连接和传输进程之间的消息。集群管理器可以提供不同的传输方式。
Distributed.ClusterManager — TypeClusterManager集群管理器的超类型,控制作为集群的工作进程。集群管理器实现了如何添加、移除和与工作进程进行通信。SSHManager 和 LocalManager 是其子类型。
Distributed.WorkerConfig — TypeWorkerConfig由 ClusterManager 使用的类型,用于控制添加到其集群中的工作者。一些字段被所有集群管理器用于访问主机:
io– 用于访问工作者的连接(IO的子类型或Nothing)host– 主机地址(可以是String或Nothing)port– 用于连接工作者的主机端口(可以是Int或Nothing)
一些字段被集群管理器用于向已初始化的主机添加工作者:
count– 要在主机上启动的工作者数量exename– 主机上 Julia 可执行文件的路径,默认为"$(Sys.BINDIR)/julia"或"$(Sys.BINDIR)/julia-debug"exeflags– 启动远程 Julia 时使用的标志
userdata 字段用于外部管理器存储每个工作者的信息。
一些字段被 SSHManager 和类似的管理器使用:
tunnel–true(使用隧道),false(不使用隧道),或nothing(使用管理器的默认设置)multiplex–true(使用 SSH 复用进行隧道)或falseforward– 用于 ssh 的-L选项的转发选项bind_addr– 在远程主机上绑定的地址sshflags– 用于建立 SSH 连接的标志max_parallel– 在主机上并行连接的最大工作者数量
一些字段被 LocalManager 和 SSHManager 都使用:
connect_at– 确定这是工作者到工作者还是驱动程序到工作者的设置调用process– 将要连接的进程(通常管理器会在addprocs期间分配此项)ospid– 根据主机操作系统的进程 ID,用于中断工作者进程environ– 本地/SSH 管理器用于存储临时信息的私有字典ident– 由ClusterManager识别的工作者connect_idents– 如果使用自定义拓扑,工作者必须连接的工作者 ID 列表enable_threaded_blas–true、false或nothing,是否在工作者上使用线程化 BLAS
Distributed.launch — Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)由集群管理器实现。对于通过此函数启动的每个Julia工作者,它应将一个WorkerConfig条目附加到launched中,并通知launch_ntfy。一旦所有由manager请求的工作者都已启动,该函数必须退出。params是调用时所有关键字参数的字典addprocs。
Distributed.manage — Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)由集群管理器实现。它在主进程中被调用,在工作者的生命周期内,使用适当的 op 值:
- 当工作者被添加/移除到/从 Julia 工作者池时,使用
:register/:deregister。 - 当调用
interrupt(workers)时,使用:interrupt。ClusterManager应该向适当的工作者发送中断信号。 - 使用
:finalize进行清理。
Base.kill — Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)由集群管理器实现。它在主进程中被 rmprocs 调用。它应该导致由 pid 指定的远程工作者退出。kill(manager::ClusterManager.....) 在 pid 上执行远程 exit()。
Sockets.connect — Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)由使用自定义传输的集群管理器实现。它应该建立与指定为 config 的 pid 的工作者的逻辑连接,并返回一对 IO 对象。从 pid 到当前进程的消息将从 instrm 中读取,而要发送到 pid 的消息将写入 outstrm。自定义传输实现必须确保消息被完整且按顺序传递和接收。connect(manager::ClusterManager.....) 在工作者之间设置 TCP/IP 套接字连接。
Distributed.init_worker — Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())由实现自定义传输的集群管理器调用。它将新启动的进程初始化为工作者。命令行参数 --worker[=<cookie>] 的效果是使用 TCP/IP 套接字将进程初始化为工作者。cookie 是一个 cluster_cookie。
Distributed.start_worker — Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker 是一个内部函数,是通过 TCP/IP 连接的工作进程的默认入口点。它将进程设置为 Julia 集群工作者。
主机:端口 信息被写入流 out(默认为 stdout)。
该函数在需要时从 stdin 读取 cookie,并在一个空闲端口上监听(或者如果指定,则在 --bind-to 命令行选项中指定的端口上)并调度任务以处理传入的 TCP 连接和请求。它还(可选地)关闭 stdin 并将 stderr 重定向到 stdout。
它不返回。
Distributed.process_messages — Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)由集群管理器使用自定义传输调用。当自定义传输实现接收到来自远程工作者的第一条消息时,应调用此函数。自定义传输必须管理与远程工作者的逻辑连接,并提供两个 IO 对象,一个用于传入消息,另一个用于发往远程工作者的消息。如果 incoming 为 true,则表示远程对等方发起了连接。无论哪一方发起连接,都将发送集群 cookie 及其 Julia 版本号以执行身份验证握手。
另见 cluster_cookie.
Distributed.default_addprocs_params — Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}由集群管理器实现。调用 addprocs(mgr) 时传递的默认关键字参数。通过调用 default_addprocs_params() 可以获得最小选项集。