Distributed Computing
Distributed
— Module분산 병렬 처리를 위한 도구.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> 프로세스 식별자 목록
지정된 클러스터 관리자를 통해 작업자 프로세스를 시작합니다.
예를 들어, Beowulf 클러스터는 패키지 ClusterManagers.jl
에 구현된 사용자 정의 클러스터 관리자를 통해 지원됩니다.
새로 시작된 작업자가 마스터로부터 연결 설정을 기다리는 초 수는 작업자 프로세스의 환경에서 변수 JULIA_WORKER_TIMEOUT
을 통해 지정할 수 있습니다. TCP/IP를 전송 수단으로 사용할 때만 관련이 있습니다.
REPL을 차단하지 않고 작업자를 시작하거나, 프로그래밍 방식으로 작업자를 시작하는 경우 포함된 함수에서 addprocs
를 자체 작업으로 실행합니다.
예제
# 바쁜 클러스터에서 비동기적으로 `addprocs` 호출
t = @async addprocs(...)
# 작업자가 온라인 상태가 될 때마다 활용
if nprocs() > 1 # 최소한 하나의 새로운 작업자가 사용 가능해야 함
.... # 분산 실행 수행
end
# 새로 시작된 작업자 ID 또는 오류 메시지 검색
if istaskdone(t) # `addprocs`가 완료되었는지 확인하여 `fetch`가 차단되지 않도록 함
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> 프로세스 식별자 목록
원격 머신에서 SSH를 통해 작업자 프로세스를 추가합니다. 구성은 키워드 인수로 수행됩니다(아래 참조). 특히, exename
키워드를 사용하여 원격 머신에서 julia
바이너리의 경로를 지정할 수 있습니다.
machines
는 "[user@]host[:port] [bindaddr[:port]]" 형식의 문자열로 주어진 "머신 사양"의 벡터입니다. user
는 현재 사용자로 기본 설정되며, port
는 표준 SSH 포트로 기본 설정됩니다. `[bindaddr[:port]]가 지정되면 다른 작업자는 지정된
bind_addr및
port`에서 이 작업자에 연결합니다.
machines
벡터에서 튜플을 사용하거나 (machine_spec, count)
형식을 사용하여 원격 호스트에서 여러 프로세스를 시작할 수 있습니다. 여기서 count
는 지정된 호스트에서 시작할 작업자의 수입니다. 작업자 수로 :auto
를 전달하면 원격 호스트의 CPU 스레드 수만큼 작업자가 시작됩니다.
예제:
addprocs([
"remote1", # 현재 사용자 이름으로 로그인하여 'remote1'에서 하나의 작업자
"user@remote2", # 'user' 사용자 이름으로 로그인하여 'remote2'에서 하나의 작업자
"user@remote3:2222", # 'remote3'에 대해 SSH 포트를 '2222'로 지정
("user@remote4", 4), # 'remote4'에서 4개의 작업자 시작
("user@remote5", :auto), # 'remote5'에서 CPU 스레드 수만큼 작업자 시작
])
키워드 인수:
tunnel
:true
인 경우 마스터 프로세스에서 작업자에 연결하기 위해 SSH 터널링이 사용됩니다. 기본값은false
입니다.multiplex
:true
인 경우 SSH 터널링을 위해 SSH 다중화가 사용됩니다. 기본값은false
입니다.ssh
: 작업자를 시작하는 데 사용되는 SSH 클라이언트 실행 파일의 이름 또는 경로입니다. 기본값은"ssh"
입니다.sshflags
: 추가 ssh 옵션을 지정합니다. 예:sshflags=`-i /home/foo/bar.pem`
max_parallel
: 호스트에서 동시에 연결할 수 있는 최대 작업자 수를 지정합니다. 기본값은 10입니다.shell
: 작업자에서 ssh가 연결되는 셸의 유형을 지정합니다.shell=:posix
: POSIX 호환 Unix/Linux 셸(sh, ksh, bash, dash, zsh 등). 기본값입니다.shell=:csh
: Unix C 셸(csh, tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: 작업자에서의 작업 디렉토리를 지정합니다. 기본값은 호스트의 현재 디렉토리(pwd()
로 찾은)입니다.enable_threaded_blas
:true
인 경우 추가된 프로세스에서 BLAS가 여러 스레드에서 실행됩니다. 기본값은false
입니다.exename
:julia
실행 파일의 이름입니다. 기본값은"$(Sys.BINDIR)/julia"
또는"$(Sys.BINDIR)/julia-debug"
입니다. 모든 원격 머신에서 공통의 Julia 버전을 사용하는 것이 좋습니다. 그렇지 않으면 직렬화 및 코드 배포가 실패할 수 있습니다.exeflags
: 작업자 프로세스에 전달되는 추가 플래그입니다.topology
: 작업자가 서로 연결되는 방식을 지정합니다. 연결되지 않은 작업자 간의 메시지 전송은 오류를 발생시킵니다.topology=:all_to_all
: 모든 프로세스가 서로 연결되어 있습니다. 기본값입니다.topology=:master_worker
: 드라이버 프로세스, 즉pid
1만 작업자에 연결됩니다. 작업자는 서로 연결되지 않습니다.topology=:custom
: 클러스터 관리자의launch
메서드가WorkerConfig
의ident
및connect_idents
필드를 통해 연결 토폴로지를 지정합니다. 클러스터 관리자 IDident
를 가진 작업자는connect_idents
에 지정된 모든 작업자에 연결됩니다.
lazy
:topology=:all_to_all
에만 적용됩니다.true
인 경우 작업자 간의 연결이 지연 설정됩니다. 즉, 작업자 간의 원격 호출의 첫 번째 인스턴스에서 설정됩니다. 기본값은 true입니다.env
:env=["JULIA_DEPOT_PATH"=>"/depot"]
와 같은 문자열 쌍의 배열을 제공하여 원격 머신에서 환경 변수가 설정되도록 요청합니다. 기본적으로JULIA_WORKER_TIMEOUT
환경 변수만 로컬에서 원격 환경으로 자동으로 전달됩니다.cmdline_cookie
:--worker
명령줄 옵션을 통해 인증 쿠키를 전달합니다. SSH stdio를 통해 쿠키를 전달하는 (더 안전한) 기본 동작은 이전 (ConPTY 이전) Julia 또는 Windows 버전을 사용하는 Windows 작업자와 함께 중단될 수 있으며, 이 경우cmdline_cookie=true
가 해결 방법을 제공합니다.
키워드 인수 ssh
, shell
, env
및 cmdline_cookie
는 Julia 1.6에서 추가되었습니다.
환경 변수:
마스터 프로세스가 새로 시작된 작업자와 60.0초 이내에 연결을 설정하지 못하면 작업자는 이를 치명적인 상황으로 간주하고 종료합니다. 이 타임아웃은 환경 변수 JULIA_WORKER_TIMEOUT
를 통해 제어할 수 있습니다. 마스터 프로세스에서 JULIA_WORKER_TIMEOUT
의 값은 새로 시작된 작업자가 연결 설정을 기다리는 초 수를 지정합니다.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> 프로세스 식별자 목록
로컬 호스트에서 내장된 LocalManager
를 사용하여 np
작업자를 시작합니다.
로컬 작업자는 현재 패키지 환경(즉, 활성 프로젝트, LOAD_PATH
, 및 DEPOT_PATH
)을 메인 프로세스에서 상속받습니다.
!!! 경고 작업자는 ~/.julia/config/startup.jl
시작 스크립트를 실행하지 않으며, 다른 실행 중인 프로세스와 전역 상태(예: 명령줄 스위치, 전역 변수, 새로운 메서드 정의 및 로드된 모듈)를 동기화하지 않습니다.
키워드 인수:
restrict::Bool
:true
(기본값)인 경우 바인딩이127.0.0.1
로 제한됩니다.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
:SSHManager
와 동일한 효과,addprocs(machines::AbstractVector)
문서를 참조하십시오.
!!! 호환성 "Julia 1.9" 패키지 환경의 상속과 env
키워드 인수는 Julia 1.9에서 추가되었습니다.
Distributed.nprocs
— Functionnprocs()
사용 가능한 프로세스의 수를 가져옵니다.
예제
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
사용 가능한 작업자 프로세스의 수를 가져옵니다. 이는 nprocs()
보다 하나 적습니다. nprocs() == 1
인 경우 nprocs()
와 같습니다.
예제
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
모든 프로세스 식별자 목록을 반환하며, 여기에는 pid 1도 포함됩니다(이는 workers()
에서 포함되지 않음).
예시
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
동일한 물리적 노드에 있는 모든 프로세스 식별자의 목록을 반환합니다. 구체적으로 pid
와 동일한 IP 주소에 바인딩된 모든 작업자가 반환됩니다.
Distributed.workers
— Functionworkers()
모든 작업자 프로세스 식별자의 목록을 반환합니다.
예제
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
지정된 작업자를 제거합니다. 프로세스 1만 작업자를 추가하거나 제거할 수 있습니다.
인수 waitfor
는 작업자가 종료될 때까지 기다리는 시간을 지정합니다:
- 지정하지 않으면,
rmprocs
는 요청된 모든pids
가 제거될 때까지 기다립니다. - 요청된
waitfor
초 이전에 모든 작업자를 종료할 수 없는 경우ErrorException
이 발생합니다. waitfor
값이 0이면, 호출은 즉시 반환되며 작업자는 다른 작업에서 제거될 예정입니다. 예약된Task
객체가 반환됩니다. 사용자는 다른 병렬 호출을 수행하기 전에 작업에서wait
를 호출해야 합니다.
예제
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
지정된 작업자에서 현재 실행 중인 작업을 중단합니다. 이는 로컬 머신에서 Ctrl-C를 누르는 것과 동일합니다. 인수가 주어지지 않으면 모든 작업자가 중단됩니다.
interrupt(pids::AbstractVector=workers())
지정된 작업자에서 현재 실행 중인 작업을 중단합니다. 이는 로컬 머신에서 Ctrl-C를 누르는 것과 동일합니다. 인수가 주어지지 않으면 모든 작업자가 중단됩니다.
Distributed.myid
— Functionmyid()
현재 프로세스의 ID를 가져옵니다.
예제
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
컬렉션 c
를 변환하여 각 요소에 f
를 적용합니다. 사용 가능한 작업자와 작업을 사용합니다.
여러 컬렉션 인수가 있는 경우, f
를 요소별로 적용합니다.
f
는 모든 작업자 프로세스에서 사용할 수 있어야 합니다. 자세한 내용은 코드 가용성 및 패키지 로딩을 참조하십시오.
작업자 풀이 지정되지 않은 경우, 모든 사용 가능한 작업자가 CachingPool
을 통해 사용됩니다.
기본적으로 pmap
은 지정된 모든 작업자에게 계산을 분산합니다. 로컬 프로세스만 사용하고 작업에 분산하려면 distributed=false
를 지정하십시오. 이는 asyncmap
을 사용하는 것과 동일합니다. 예를 들어, pmap(f, c; distributed=false)
는 asyncmap(f,c; ntasks=()->nworkers())
와 동일합니다.
pmap
은 batch_size
인수를 통해 프로세스와 작업의 혼합을 사용할 수도 있습니다. 배치 크기가 1보다 큰 경우, 컬렉션은 여러 배치로 처리되며, 각 배치의 길이는 batch_size
이하입니다. 배치는 자유로운 작업자에게 단일 요청으로 전송되며, 로컬 asyncmap
이 여러 동시 작업을 사용하여 배치의 요소를 처리합니다.
오류가 발생하면 pmap
은 컬렉션의 나머지 부분을 처리하지 않습니다. 이 동작을 재정의하려면, 단일 인수(즉, 예외)를 받는 오류 처리 함수를 on_error
인수로 지정할 수 있습니다. 이 함수는 오류를 다시 발생시켜 처리를 중단하거나, 계속 진행하기 위해 어떤 값을 반환하여 결과와 함께 호출자에게 반환할 수 있습니다.
다음 두 가지 예를 고려하십시오. 첫 번째 예는 예외 객체를 인라인으로 반환하고, 두 번째 예는 예외 대신 0을 반환합니다:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
오류는 실패한 계산을 재시도하여 처리할 수도 있습니다. 키워드 인수 retry_delays
와 retry_check
는 각각 retry
로 키워드 인수 delays
와 check
로 전달됩니다. 배치가 지정된 경우, 전체 배치가 실패하면 배치의 모든 항목이 재시도됩니다.
on_error
와 retry_delays
가 모두 지정된 경우, 재시도하기 전에 on_error
후크가 호출됩니다. on_error
가 예외를 던지지 않거나 다시 발생시키지 않으면 해당 요소는 재시도되지 않습니다.
예: 오류가 발생할 경우, 요소에 대해 최대 3회 지연 없이 f
를 재시도합니다.
pmap(f, c; retry_delays = zeros(3))
예: 예외가 InexactError
유형이 아닐 경우에만 f
를 재시도하며, 최대 3회까지 지수적으로 증가하는 지연을 사용합니다. 모든 InexactError
발생에 대해 NaN
을 반환합니다.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
원격 계산에서 발생한 예외는 캡처되어 로컬에서 다시 발생합니다. RemoteException
은 작업자의 pid
와 캡처된 예외를 래핑합니다. CapturedException
은 원격 예외와 예외가 발생했을 때의 호출 스택의 직렬화 가능한 형태를 캡처합니다.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
클라이언트 Julia 프로세스가 종료된 후, 죽은 자식을 참조하려는 추가 시도는 이 예외를 발생시킵니다.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
는 알 수 없는 종료 상태와 시간을 가진 단일 계산을 위한 자리 표시자입니다. 여러 잠재적 계산에 대해서는 RemoteChannel
을 참조하십시오. AbstractRemoteRef
를 식별하기 위해 remoteref_id
를 참조하십시오.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
프로세스 pid
에서 Channel{Any}(1)
에 대한 참조를 만듭니다. 기본 pid
는 현재 프로세스입니다.
RemoteChannel(f::Function, pid::Integer=myid())
특정 크기와 유형의 원격 채널에 대한 참조를 생성합니다. f
는 pid
에서 실행될 때 AbstractChannel
의 구현을 반환해야 하는 함수입니다.
예를 들어, RemoteChannel(()->Channel{Int}(10), pid)
는 pid
에서 유형 Int
와 크기 10의 채널에 대한 참조를 반환합니다.
기본 pid
는 현재 프로세스입니다.
Base.fetch
— Methodfetch(x::Future)
Future
의 값을 기다리고 가져옵니다. 가져온 값은 로컬에 캐시됩니다. 동일한 참조에 대한 추가 fetch
호출은 캐시된 값을 반환합니다. 원격 값이 예외인 경우, 원격 예외와 백트레이스를 캡처하는 RemoteException
을 발생시킵니다.
Base.fetch
— Methodfetch(c::RemoteChannel)
RemoteChannel
에서 값을 기다리고 가져옵니다. 발생하는 예외는 Future
와 동일합니다. 가져온 항목을 제거하지 않습니다.
fetch(x::Any)
x
를 반환합니다.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
지정된 프로세스에서 주어진 인수로 함수 f
를 비동기적으로 호출합니다. Future
를 반환합니다. 키워드 인수가 있는 경우, f
에 전달됩니다.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
지정된 작업자 ID id
에 대해 하나의 메시지로 더 빠른 wait(remotecall(...))
을 수행합니다. 키워드 인수는 있는 경우 f
로 전달됩니다.
또한 wait
및 remotecall
를 참조하십시오.
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
fetch(remotecall(...))
를 한 메시지에서 수행합니다. 키워드 인수는 있을 경우 f
로 전달됩니다. 모든 원격 예외는 RemoteException
으로 캡처되어 던져집니다.
자세한 내용은 fetch
및 remotecall
를 참조하세요.
예제
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt는 음의 실수 인수로 호출되었지만 복소수 인수로 호출될 경우에만 복소수 결과를 반환합니다. sqrt(Complex(x))를 시도해 보세요.
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
작업자 id
에서 비동기적으로 f
를 실행합니다. remotecall
과는 달리, 계산 결과를 저장하지 않으며 완료를 기다리는 방법도 없습니다.
성공적인 호출은 요청이 원격 노드에서 실행을 위해 수락되었음을 나타냅니다.
같은 작업자에 대한 연속적인 remotecall
은 호출된 순서대로 직렬화되지만, 원격 작업자에서의 실행 순서는 불확실합니다. 예를 들어, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
는 f1
에 대한 호출을 직렬화한 다음, 그 순서대로 f2
와 f3
를 실행합니다. 그러나 f1
이 작업자 2에서 f3
보다 먼저 실행된다는 보장은 없습니다.
f
에서 발생한 모든 예외는 원격 작업자의 stderr
에 출력됩니다.
키워드 인수는 있는 경우 f
로 전달됩니다.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
RemoteChannel
에 값 집합을 저장합니다. 채널이 가득 차면 공간이 생길 때까지 차단됩니다. 첫 번째 인수를 반환합니다.
Base.put!
— Methodput!(rr::Future, v)
값을 Future
rr
에 저장합니다. Future
는 한 번만 쓸 수 있는 원격 참조입니다. 이미 설정된 Future
에 대해 put!
을 호출하면 Exception
이 발생합니다. 모든 비동기 원격 호출은 Future
를 반환하며, 완료 시 호출의 반환 값으로 값을 설정합니다.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
RemoteChannel
rr
에서 값(들)을 가져오고, 그 과정에서 값(들)을 제거합니다.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
RemoteChannel
에 값이 저장되어 있는지 확인합니다. 이 함수는 결과를 받을 때까지 시간이 걸리기 때문에 경쟁 조건이 발생할 수 있습니다. 그러나 Future
에서는 한 번만 할당되므로 안전하게 사용할 수 있습니다.
Base.isready
— Methodisready(rr::Future)
Future
에 값이 저장되어 있는지 확인합니다.
인수 Future
가 다른 노드에 의해 소유되는 경우, 이 호출은 답변을 기다리기 위해 차단됩니다. 대신 rr
을 별도의 작업에서 기다리거나 로컬 Channel
를 프록시로 사용하는 것이 좋습니다:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # 차단되지 않음
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
워커 풀의 수퍼타입으로 WorkerPool
및 CachingPool
와 같은. AbstractWorkerPool
은 다음을 구현해야 합니다:
push!
- 전체 풀에 새로운 워커 추가 (사용 가능 + 바쁜)put!
- 워커를 사용 가능한 풀로 되돌리기take!
- 사용 가능한 풀에서 워커를 가져오기 (원격 함수 실행에 사용)length
- 전체 풀에서 사용 가능한 워커 수isready
- 풀에서take!
가 블록되면 false를 반환하고, 그렇지 않으면 true를 반환
위의 기본 구현( AbstractWorkerPool
에서)은 다음 필드를 요구합니다:
channel::Channel{Int}
workers::Set{Int}
여기서 channel
은 자유로운 워커 pid를 포함하고 workers
는 이 풀과 관련된 모든 워커의 집합입니다.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
벡터 또는 작업자 ID의 범위에서 WorkerPool
을 생성합니다.
예제
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
AbstractWorkerPool
의 구현입니다. remote
, remotecall_fetch
, pmap
(및 원격으로 함수를 실행하는 다른 원격 호출)은 특히 클로저(대량의 데이터를 캡처할 수 있음)에서 작업 노드에 직렬화/역직렬화된 함수를 캐싱하는 것의 이점을 누립니다.
원격 캐시는 반환된 CachingPool
객체의 수명 동안 유지됩니다. 캐시를 더 일찍 지우려면 clear!(pool)
을 사용하세요.
전역 변수의 경우, 클로저에서 캡처되는 것은 바인딩만이며 데이터는 아닙니다. 전역 데이터를 캡처하기 위해 let
블록을 사용할 수 있습니다.
예제
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
위 코드는 foo
를 각 작업자에게 한 번만 전송합니다.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
는 유휴 workers
를 포함하고 있으며, remote(f)
및 pmap
(기본적으로)에서 사용됩니다. default_worker_pool!(pool)
을 통해 명시적으로 설정되지 않는 한, 기본 작업자 풀은 WorkerPool
로 초기화됩니다.
예제
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
모듈에서 전역 바인딩을 nothing
으로 초기화하여 지웁니다. syms
는 Symbol
유형이거나 Symbol
의 컬렉션이어야 합니다. pids
와 mod
는 전역 변수를 재초기화할 프로세스와 모듈을 식별합니다. mod
아래에 정의된 이름만 지워집니다.
전역 상수를 지우려고 할 경우 예외가 발생합니다.
clear!(pool::CachingPool) -> pool
모든 참여 작업자에서 모든 캐시된 함수를 제거합니다.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
사용 가능한 작업자에서 함수 f
를 실행하는 익명 함수를 반환합니다(제공된 경우 WorkerPool
p
에서 가져옴) remotecall_fetch
를 사용하여.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
변형의 remotecall(f, pid, ....)
. pool
에서 대기 중인 작업자를 가져와 remotecall
을 수행합니다.
예제
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
이 예제에서 작업은 pid 2에서 실행되었고, pid 1에서 호출되었습니다.
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
변형의 remotecall_wait(f, pid, ....)
. pool
에서 자유로운 작업자를 기다리고 가져와서 그 위에서 remotecall_wait
을 수행합니다.
예제
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
변형의 remotecall_fetch(f, pid, ....)
. pool
에서 유휴 작업자를 기다리고 가져와서 그 위에서 remotecall_fetch
를 수행합니다.
예제
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
WorkerPool
변형의 remote_do(f, pid, ....)
. pool
에서 자유로운 작업자를 기다리고 가져와서 그 위에서 remote_do
를 수행합니다.
Distributed.@spawn
— Macro@spawn expr
표현식 주위에 클로저를 생성하고 자동으로 선택된 프로세스에서 실행하여 결과에 대한 Future
를 반환합니다. 이 매크로는 더 이상 사용되지 않으며, 대신 @spawnat :any expr
를 사용해야 합니다.
예제
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Julia 1.3부터 이 매크로는 더 이상 사용되지 않습니다. 대신 @spawnat :any
를 사용하세요.
Distributed.@spawnat
— Macro@spawnat p expr
표현식 주위에 클로저를 생성하고 프로세스 p
에서 비동기적으로 클로저를 실행합니다. 결과에 대한 Future
를 반환합니다. p
가 인용된 리터럴 기호 :any
인 경우, 시스템은 자동으로 사용할 프로세서를 선택합니다.
예제
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
:any
인자는 Julia 1.3부터 사용할 수 있습니다.
Distributed.@fetch
— Macro@fetch expr
fetch(@spawnat :any expr)
와 동일합니다. fetch
및 @spawnat
를 참조하십시오.
예제
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
fetch(@spawnat p expr)
와 동일합니다. fetch
및 @spawnat
를 참조하십시오.
예제
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
분산 메모리, 병렬 for 루프의 형태:
@distributed [reducer] for var = range
body
end
지정된 범위는 분할되어 모든 작업자에서 로컬로 실행됩니다. 선택적 리듀서 함수가 지정된 경우, @distributed
는 각 작업자에서 로컬 축소를 수행하고 호출 프로세스에서 최종 축소를 수행합니다.
리듀서 함수 없이 @distributed
는 비동기적으로 실행되며, 즉 모든 사용 가능한 작업자에서 독립적인 작업을 생성하고 완료를 기다리지 않고 즉시 반환합니다. 완료를 기다리려면 호출을 @sync
로 접두어를 붙여야 합니다, 예를 들어:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
모든 procs
에서 Main
아래에서 표현식을 실행합니다. 프로세스 중 하나에서 발생한 오류는 CompositeException
으로 수집되어 던져집니다. 예를 들어:
@everywhere bar = 1
는 모든 현재 프로세스에서 Main.bar
를 정의합니다. 나중에 추가된 프로세스(예: addprocs()
로 추가된 프로세스)에서는 표현식이 정의되지 않습니다.
@spawnat
와 달리, @everywhere
는 로컬 변수를 캡처하지 않습니다. 대신, 로컬 변수를 보간을 사용하여 브로드캐스트할 수 있습니다:
foo = 1
@everywhere bar = $foo
선택적 인수 procs
는 표현식을 실행할 프로세스의 하위 집합을 지정할 수 있습니다.
remotecall_eval(Main, procs, expr)
을 호출하는 것과 유사하지만 두 가지 추가 기능이 있습니다:
- `using` 및 `import` 문이 호출 프로세스에서 먼저 실행되어 패키지가 미리 컴파일되도록 합니다.
- `include`에 의해 사용되는 현재 소스 파일 경로가 다른 프로세스로 전파됩니다.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s와 RemoteChannel
s는 다음 필드로 식별됩니다:
where
- 참조가 실제로 존재하는 기본 객체/저장소가 있는 노드를 나타냅니다.whence
- 원격 참조가 생성된 노드를 나타냅니다. 이는 참조된 기본 객체가 실제로 존재하는 노드와 다릅니다. 예를 들어, 마스터 프로세스에서RemoteChannel(2)
를 호출하면where
값은 2이고whence
값은 1이 됩니다.id
는whence
로 지정된 작업자에서 생성된 모든 참조에서 고유합니다.
함께 고려할 때, whence
와 id
는 모든 작업자에서 참조를 고유하게 식별합니다.
remoteref_id
는 원격 참조의 whence
및 id
값을 래핑하는 RRID
객체를 반환하는 저수준 API입니다.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
remoteref_id
[@ref]로 반환된 id
에 대한 백업 AbstractChannel
을 반환하는 저수준 API입니다. 호출은 백업 채널이 존재하는 노드에서만 유효합니다.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
주어진 IO
연결 또는 Worker
에 대해 연결된 작업자의 pid
를 반환하는 저수준 API입니다. 이는 수신 프로세스 ID에 따라 작성되는 데이터를 최적화하는 유형에 대한 사용자 정의 serialize
메서드를 작성할 때 유용합니다.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
클러스터 쿠키를 반환합니다.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
전달된 쿠키를 클러스터 쿠키로 설정한 다음, 이를 반환합니다.
Cluster Manager Interface
이 인터페이스는 다양한 클러스터 환경에서 Julia 작업자를 시작하고 관리하는 메커니즘을 제공합니다. Base에는 두 가지 유형의 관리자가 있습니다: 동일한 호스트에서 추가 작업자를 시작하기 위한 LocalManager
와 ssh
를 통해 원격 호스트에서 시작하기 위한 SSHManager
입니다. TCP/IP 소켓은 프로세스 간의 연결 및 메시지 전송에 사용됩니다. 클러스터 관리자가 다른 전송 방식을 제공하는 것도 가능합니다.
Distributed.ClusterManager
— TypeClusterManager
클러스터를 제어하는 작업자 프로세스에 대한 슈퍼타입입니다. 클러스터 관리자는 작업자를 추가, 제거 및 통신하는 방법을 구현합니다. SSHManager
와 LocalManager
는 이의 서브타입입니다.
Distributed.WorkerConfig
— TypeWorkerConfig
ClusterManager
에서 클러스터에 추가된 작업자를 제어하는 데 사용되는 유형입니다. 일부 필드는 모든 클러스터 관리자가 호스트에 접근하는 데 사용됩니다:
io
– 작업자에 접근하는 데 사용되는 연결 (하위 유형IO
또는Nothing
)host
– 호스트 주소 (문자열String
또는Nothing
)port
– 작업자에 연결하는 데 사용되는 호스트의 포트 (정수Int
또는Nothing
)
일부는 클러스터 관리자가 이미 초기화된 호스트에 작업자를 추가하는 데 사용됩니다:
count
– 호스트에서 시작할 작업자의 수exename
– 호스트에서의 Julia 실행 파일 경로, 기본값은"$(Sys.BINDIR)/julia"
또는"$(Sys.BINDIR)/julia-debug"
exeflags
– 원격으로 Julia를 시작할 때 사용할 플래그
userdata
필드는 외부 관리자가 각 작업자에 대한 정보를 저장하는 데 사용됩니다.
일부 필드는 SSHManager
및 유사한 관리자에서 사용됩니다:
tunnel
–true
(터널링 사용),false
(터널링 사용 안 함), 또는nothing
(관리자에 대한 기본값 사용)multiplex
–true
(터널링을 위한 SSH 다중화 사용) 또는false
forward
– ssh의-L
옵션에 사용되는 포워딩 옵션bind_addr
– 원격 호스트에서 바인딩할 주소sshflags
– SSH 연결을 설정할 때 사용할 플래그max_parallel
– 호스트에서 병렬로 연결할 수 있는 최대 작업자 수
일부 필드는 LocalManager
와 SSHManager
모두에서 사용됩니다:
connect_at
– 이것이 작업자 간의 설정 호출인지 드라이버에서 작업자로의 설정 호출인지 결정process
– 연결될 프로세스 (일반적으로 관리자가addprocs
중에 이를 할당)ospid
– 호스트 OS에 따른 프로세스 ID, 작업자 프로세스를 중단하는 데 사용environ
– Local/SSH 관리자가 임시 정보를 저장하는 데 사용하는 개인 사전ident
–ClusterManager
에 의해 식별된 작업자connect_idents
– 사용자 정의 토폴로지를 사용하는 경우 작업자가 연결해야 하는 작업자 ID 목록enable_threaded_blas
–true
,false
, 또는nothing
, 작업자에서 스레드화된 BLAS를 사용할지 여부
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
클러스터 관리자에 의해 구현됩니다. 이 함수에 의해 시작된 모든 Julia 작업자에 대해 launched
에 WorkerConfig
항목을 추가하고 launch_ntfy
를 알립니다. 이 함수는 manager
에 의해 요청된 모든 작업자가 시작되면 반드시 종료되어야 합니다. params
는 addprocs
호출 시 사용된 모든 키워드 인수의 사전입니다.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
클러스터 관리자에 의해 구현됩니다. 작업자의 생애 동안 마스터 프로세스에서 호출되며, 적절한 op
값과 함께 호출됩니다:
- 작업자가 Julia 작업자 풀에 추가/제거될 때
:register
/:deregister
와 함께. interrupt(workers)
가 호출될 때:interrupt
와 함께.ClusterManager
는 적절한 작업자에게 인터럽트 신호를 보내야 합니다.- 정리 목적으로
:finalize
와 함께.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
클러스터 관리자에 의해 구현됩니다. 이는 마스터 프로세스에서 rmprocs
에 의해 호출됩니다. 이는 pid
로 지정된 원격 작업자가 종료되도록 해야 합니다. kill(manager::ClusterManager.....)
는 pid
에서 원격 exit()
를 실행합니다.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
클러스터 관리자가 사용자 정의 전송을 사용하여 구현합니다. 이는 config
에 의해 지정된 pid
를 가진 작업자와의 논리적 연결을 설정하고 IO
객체 쌍을 반환해야 합니다. pid
에서 현재 프로세스로의 메시지는 instrm
에서 읽히고, pid
로 전송될 메시지는 outstrm
에 기록됩니다. 사용자 정의 전송 구현은 메시지가 완전하고 순서대로 전달되고 수신되도록 보장해야 합니다. connect(manager::ClusterManager.....)
는 작업자 간의 TCP/IP 소켓 연결을 설정합니다.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
사용자 정의 전송을 구현하는 클러스터 관리자가 호출합니다. 새로 시작된 프로세스를 작업자로 초기화합니다. 명령줄 인수 --worker[=<cookie>]
는 TCP/IP 소켓을 사용하여 프로세스를 작업자로 초기화하는 효과가 있습니다. cookie
는 cluster_cookie
입니다.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
는 TCP/IP를 통해 연결되는 작업자 프로세스의 기본 진입점인 내부 함수입니다. 이 함수는 프로세스를 Julia 클러스터 작업자로 설정합니다.
호스트:포트 정보는 스트림 out
에 기록됩니다(기본값은 stdout).
이 함수는 필요할 경우 stdin에서 쿠키를 읽고, 사용 가능한 포트에서 수신 대기하며(또는 지정된 경우 --bind-to
명령줄 옵션의 포트에서) 들어오는 TCP 연결 및 요청을 처리할 작업을 예약합니다. 또한 (선택적으로) stdin을 닫고 stderr를 stdout으로 리디렉션합니다.
반환하지 않습니다.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
클러스터 관리자가 사용자 정의 전송을 사용하여 호출합니다. 원격 작업자로부터 첫 번째 메시지를 수신할 때 사용자 정의 전송 구현에서 호출해야 합니다. 사용자 정의 전송은 원격 작업자에 대한 논리적 연결을 관리하고 수신 메시지용 IO
객체 하나와 원격 작업자에게 전송되는 메시지용 IO
객체 하나를 제공해야 합니다. incoming
이 true
인 경우 원격 피어가 연결을 시작한 것입니다. 쌍 중 어느 쪽이 연결을 시작하든 클러스터 쿠키와 해당 Julia 버전 번호를 전송하여 인증 핸드셰이크를 수행합니다.
자세한 내용은 cluster_cookie
를 참조하십시오.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
클러스터 관리자에 의해 구현됩니다. addprocs(mgr)
를 호출할 때 전달되는 기본 키워드 매개변수입니다. 최소한의 옵션 세트는 default_addprocs_params()
를 호출하여 사용할 수 있습니다.