Distributed Computing
Distributed
— ModuleTools für verteilte parallele Verarbeitung.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> Liste der Prozessidentifikatoren
Startet Arbeitsprozesse über den angegebenen Cluster-Manager.
Zum Beispiel werden Beowulf-Cluster über einen benutzerdefinierten Cluster-Manager unterstützt, der im Paket ClusterManagers.jl
implementiert ist.
Die Anzahl der Sekunden, die ein neu gestarteter Arbeiter auf die Verbindungsherstellung vom Master wartet, kann über die Variable JULIA_WORKER_TIMEOUT
in der Umgebung des Arbeiterprozesses angegeben werden. Relevant nur bei Verwendung von TCP/IP als Transport.
Um Arbeiter zu starten, ohne die REPL oder die umgebende Funktion zu blockieren, wenn Arbeiter programmgesteuert gestartet werden, führen Sie addprocs
in seiner eigenen Aufgabe aus.
Beispiele
# In beschäftigten Clustern `addprocs` asynchron aufrufen
t = @async addprocs(...)
# Arbeiter nutzen, sobald sie online sind
if nprocs() > 1 # Sicherstellen, dass mindestens ein neuer Arbeiter verfügbar ist
.... # verteilte Ausführung durchführen
end
# Neu gestartete Arbeiter-IDs oder Fehlermeldungen abrufen
if istaskdone(t) # Überprüfen, ob `addprocs` abgeschlossen ist, um sicherzustellen, dass `fetch` nicht blockiert
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Liste der Prozessidentifikatoren
Fügen Sie Arbeitsprozesse auf Remote-Maschinen über SSH hinzu. Die Konfiguration erfolgt über Schlüsselwortargumente (siehe unten). Insbesondere kann das Schlüsselwort exename
verwendet werden, um den Pfad zur julia
-Binärdatei auf der Remote-Maschine(n) anzugeben.
machines
ist ein Vektor von "Maschinenspezifikationen", die als Strings der Form [user@]host[:port] [bind_addr[:port]]
angegeben werden. user
ist standardmäßig der aktuelle Benutzer und port
der Standard-SSH-Port. Wenn [bind_addr[:port]]
angegeben ist, stellen andere Arbeiter eine Verbindung zu diesem Arbeiter über die angegebene bind_addr
und port
her.
Es ist möglich, mehrere Prozesse auf einem Remote-Host zu starten, indem ein Tupel im machines
-Vektor oder die Form (machine_spec, count)
verwendet wird, wobei count
die Anzahl der zu startenden Arbeiter auf dem angegebenen Host ist. Das Übergeben von :auto
als Arbeiteranzahl startet so viele Arbeiter wie CPU-Threads auf dem Remote-Host.
Beispiele:
addprocs([
"remote1", # ein Arbeiter auf 'remote1', der sich mit dem aktuellen Benutzernamen anmeldet
"user@remote2", # ein Arbeiter auf 'remote2', der sich mit dem Benutzernamen 'user' anmeldet
"user@remote3:2222", # SSH-Port auf '2222' für 'remote3' angeben
("user@remote4", 4), # 4 Arbeiter auf 'remote4' starten
("user@remote5", :auto), # so viele Arbeiter wie CPU-Threads auf 'remote5' starten
])
Schlüsselwortargumente:
tunnel
: wenntrue
, wird SSH-Tunneling verwendet, um sich vom Master-Prozess mit dem Arbeiter zu verbinden. Standard istfalse
.multiplex
: wenntrue
, wird SSH-Multiplexing für SSH-Tunneling verwendet. Standard istfalse
.ssh
: der Name oder Pfad des SSH-Client-Executables, das zum Starten der Arbeiter verwendet wird. Standard ist"ssh"
.sshflags
: gibt zusätzliche SSH-Optionen an, z.B.sshflags=`-i /home/foo/bar.pem`
max_parallel
: gibt die maximale Anzahl von Arbeitern an, die parallel zu einem Host verbunden sind. Standardmäßig 10.shell
: gibt den Typ der Shell an, mit der SSH sich bei den Arbeitern verbindet.shell=:posix
: eine POSIX-kompatible Unix/Linux-Shell (sh, ksh, bash, dash, zsh usw.). Der Standard.shell=:csh
: eine Unix C-Shell (csh, tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: gibt das Arbeitsverzeichnis auf den Arbeitern an. Standardmäßig das aktuelle Verzeichnis des Hosts (wie vonpwd()
gefunden).enable_threaded_blas
: wenntrue
, wird BLAS in mehreren Threads in hinzugefügten Prozessen ausgeführt. Standard istfalse
.exename
: Name derjulia
-Executable. Standard ist"$(Sys.BINDIR)/julia"
oder"$(Sys.BINDIR)/julia-debug"
je nach Fall. Es wird empfohlen, auf allen Remote-Maschinen eine gemeinsame Julia-Version zu verwenden, da sonst die Serialisierung und der Codevertrieb fehlschlagen könnten.exeflags
: zusätzliche Flags, die an die Arbeiterprozesse übergeben werden.topology
: Gibt an, wie die Arbeiter miteinander verbunden sind. Das Senden einer Nachricht zwischen nicht verbundenen Arbeitern führt zu einem Fehler.topology=:all_to_all
: Alle Prozesse sind miteinander verbunden. Der Standard.topology=:master_worker
: Nur der Treiberprozess, d.h.pid
1, verbindet sich mit den Arbeitern. Die Arbeiter verbinden sich nicht untereinander.topology=:custom
: Dielaunch
-Methode des Cluster-Managers gibt die Verbindungstopologie über die Felderident
undconnect_idents
inWorkerConfig
an. Ein Arbeiter mit einer Cluster-Manager-Identitätident
verbindet sich mit allen Arbeitern, die inconnect_idents
angegeben sind.
lazy
: Anwendbar nur mittopology=:all_to_all
. Wenntrue
, werden die Verbindungen zwischen Arbeitern faul eingerichtet, d.h. sie werden beim ersten Remote-Aufruf zwischen Arbeitern eingerichtet. Standard isttrue
.env
: Stellen Sie ein Array von String-Paaren wieenv=["JULIA_DEPOT_PATH"=>"/depot"]
bereit, um zu verlangen, dass Umgebungsvariablen auf der Remote-Maschine gesetzt werden. Standardmäßig wird nur die UmgebungsvariableJULIA_WORKER_TIMEOUT
automatisch von der lokalen in die Remote-Umgebung übergeben.cmdline_cookie
: Übergeben Sie das Authentifizierungscookie über die--worker
-Befehlszeilenoption. Das (sicherere) Standardverhalten, das Cookie über SSH stdio zu übergeben, kann bei Windows-Arbeitern, die ältere (vor ConPTY) Julia- oder Windows-Versionen verwenden, hängen bleiben, in diesem Fall bietetcmdline_cookie=true
eine Umgehungslösung.
Die Schlüsselwortargumente ssh
, shell
, env
und cmdline_cookie
wurden in Julia 1.6 hinzugefügt.
Umgebungsvariablen:
Wenn der Master-Prozess innerhalb von 60,0 Sekunden keine Verbindung zu einem neu gestarteten Arbeiter herstellen kann, betrachtet der Arbeiter dies als eine fatale Situation und beendet sich. Dieses Timeout kann über die Umgebungsvariable JULIA_WORKER_TIMEOUT
gesteuert werden. Der Wert von JULIA_WORKER_TIMEOUT
im Master-Prozess gibt die Anzahl der Sekunden an, die ein neu gestarteter Arbeiter auf die Herstellung der Verbindung wartet.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Liste der Prozess-Identifikatoren
Starten Sie np
Arbeiter auf dem lokalen Host mit dem integrierten LocalManager
.
Lokale Arbeiter erben die aktuelle Paketumgebung (d.h. aktives Projekt, LOAD_PATH
und DEPOT_PATH
) vom Hauptprozess.
Beachten Sie, dass Arbeiter kein ~/.julia/config/startup.jl
Startskript ausführen und ihren globalen Zustand (wie Befehlszeilenoptionen, globale Variablen, neue Methodendefinitionen und geladene Module) nicht mit anderen laufenden Prozessen synchronisieren.
Schlüsselwortargumente:
restrict::Bool
: wenntrue
(Standard) ist die Bindung auf127.0.0.1
beschränkt.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: derselbe Effekt wie beiSSHManager
, siehe Dokumentation füraddprocs(machines::AbstractVector)
.
Das Erben der Paketumgebung und das Schlüsselwortargument env
wurden in Julia 1.9 hinzugefügt.
Distributed.nprocs
— Functionnprocs()
Holen Sie die Anzahl der verfügbaren Prozesse.
Beispiele
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
Holen Sie die Anzahl der verfügbaren Arbeitsprozesse. Dies ist eins weniger als nprocs()
. Gleich nprocs()
, wenn nprocs() == 1
.
Beispiele
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
Gibt eine Liste aller Prozess-Identifikatoren zurück, einschließlich pid 1 (die nicht von workers()
eingeschlossen ist).
Beispiele
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
Gibt eine Liste aller Prozessidentifikatoren auf demselben physischen Knoten zurück. Insbesondere werden alle Arbeiter zurückgegeben, die an dieselbe IP-Adresse wie pid
gebunden sind.
Distributed.workers
— Functionworkers()
Gibt eine Liste aller Arbeitsprozess-Identifikatoren zurück.
Beispiele
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
Entfernen Sie die angegebenen Arbeiter. Beachten Sie, dass nur Prozess 1 Arbeiter hinzufügen oder entfernen kann.
Das Argument waitfor
gibt an, wie lange gewartet werden soll, bis die Arbeiter heruntergefahren sind:
- Wenn nicht angegeben, wartet
rmprocs
, bis alle angefordertenpids
entfernt sind. - Eine
ErrorException
wird ausgelöst, wenn nicht alle Arbeiter vor den angefordertenwaitfor
Sekunden beendet werden können. - Mit einem
waitfor
-Wert von 0 gibt der Aufruf sofort zurück, wobei die Arbeiter in einer anderen Aufgabe zur Entfernung geplant sind. Das geplanteTask
Objekt wird zurückgegeben. Der Benutzer solltewait
auf der Aufgabe aufrufen, bevor er andere parallele Aufrufe tätigt.
Beispiele
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
Unterbricht die derzeit ausgeführte Aufgabe auf den angegebenen Arbeitern. Dies entspricht dem Drücken von Ctrl-C auf dem lokalen Computer. Wenn keine Argumente angegeben sind, werden alle Arbeiter unterbrochen.
interrupt(pids::AbstractVector=workers())
Unterbrechen Sie die aktuell ausgeführte Aufgabe auf den angegebenen Arbeitern. Dies entspricht dem Drücken von Ctrl-C auf dem lokalen Computer. Wenn keine Argumente angegeben sind, werden alle Arbeiter unterbrochen.
Distributed.myid
— Functionmyid()
Holen Sie sich die ID des aktuellen Prozesses.
Beispiele
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> Sammlung
Transformiere die Sammlung c
, indem f
auf jedes Element unter Verwendung verfügbarer Arbeiter und Aufgaben angewendet wird.
Bei mehreren Sammlungsargumenten wird f
elementweise angewendet.
Beachte, dass f
allen Arbeiterprozessen zur Verfügung stehen muss; siehe Codeverfügbarkeit und Laden von Paketen für Details.
Wenn kein Arbeiterpool angegeben ist, werden alle verfügbaren Arbeiter über einen CachingPool
verwendet.
Standardmäßig verteilt pmap
die Berechnung über alle angegebenen Arbeiter. Um nur den lokalen Prozess zu verwenden und über Aufgaben zu verteilen, gib distributed=false
an. Dies entspricht der Verwendung von asyncmap
. Zum Beispiel ist pmap(f, c; distributed=false)
äquivalent zu asyncmap(f,c; ntasks=()->nworkers())
pmap
kann auch eine Mischung aus Prozessen und Aufgaben über das Argument batch_size
verwenden. Bei Batchgrößen größer als 1 wird die Sammlung in mehreren Batches verarbeitet, die jeweils die Länge batch_size
oder weniger haben. Ein Batch wird als einzelne Anfrage an einen freien Arbeiter gesendet, wo eine lokale asyncmap
Elemente aus dem Batch mit mehreren gleichzeitigen Aufgaben verarbeitet.
Ein Fehler stoppt pmap
daran, den Rest der Sammlung zu verarbeiten. Um dieses Verhalten zu überschreiben, kannst du eine Fehlerbehandlungsfunktion über das Argument on_error
angeben, die ein einzelnes Argument, d.h. die Ausnahme, entgegennimmt. Die Funktion kann die Verarbeitung stoppen, indem sie den Fehler erneut auslöst, oder, um fortzufahren, einen beliebigen Wert zurückgeben, der dann inline mit den Ergebnissen an den Aufrufer zurückgegeben wird.
Betrachte die folgenden zwei Beispiele. Das erste gibt das Ausnahmeobjekt inline zurück, das zweite eine 0 anstelle einer Ausnahme:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Fehler können auch behandelt werden, indem fehlgeschlagene Berechnungen erneut versucht werden. Die Schlüsselwortargumente retry_delays
und retry_check
werden als Schlüsselwortargumente delays
und check
an retry
weitergegeben. Wenn das Batching angegeben ist und ein gesamter Batch fehlschlägt, werden alle Elemente im Batch erneut versucht.
Beachte, dass wenn sowohl on_error
als auch retry_delays
angegeben sind, der on_error
Hook vor dem erneuten Versuch aufgerufen wird. Wenn on_error
keine Ausnahme auslöst (oder erneut auslöst), wird das Element nicht erneut versucht.
Beispiel: Bei Fehlern, versuche f
auf einem Element maximal 3 Mal ohne Verzögerung zwischen den Versuchen.
pmap(f, c; retry_delays = zeros(3))
Beispiel: Versuche f
nur, wenn die Ausnahme nicht vom Typ InexactError
ist, mit exponentiell zunehmenden Verzögerungen bis zu 3 Mal. Gib ein NaN
anstelle aller InexactError
-Vorkommen zurück.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
Ausnahmen bei Remote-Berechnungen werden erfasst und lokal erneut ausgelöst. Eine RemoteException
umschließt die pid
des Arbeiters und eine erfasste Ausnahme. Eine CapturedException
erfasst die Remote-Ausnahme und eine serialisierbare Form des Aufrufstapels, als die Ausnahme ausgelöst wurde.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
Nachdem ein Client-Julia-Prozess beendet wurde, führen weitere Versuche, auf das tote Kind zuzugreifen, zu dieser Ausnahme.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Ein Future
ist ein Platzhalter für eine einzelne Berechnung mit unbekanntem Abschlussstatus und -zeit. Für mehrere potenzielle Berechnungen siehe RemoteChannel
. Siehe remoteref_id
zur Identifizierung eines AbstractRemoteRef
.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
Erstellt eine Referenz zu einem Channel{Any}(1)
im Prozess pid
. Der Standardwert für pid
ist der aktuelle Prozess.
RemoteChannel(f::Function, pid::Integer=myid())
Erstellt Referenzen zu Remote-Kanälen einer bestimmten Größe und Art. f
ist eine Funktion, die bei Ausführung auf pid
eine Implementierung eines AbstractChannel
zurückgeben muss.
Zum Beispiel wird RemoteChannel(()->Channel{Int}(10), pid)
eine Referenz zu einem Kanal vom Typ Int
und der Größe 10 auf pid
zurückgeben.
Der Standardwert für pid
ist der aktuelle Prozess.
Base.fetch
— Methodfetch(x::Future)
Warten Sie auf und erhalten Sie den Wert eines Future
. Der abgerufene Wert wird lokal zwischengespeichert. Weitere Aufrufe von fetch
auf demselben Verweis geben den zwischengespeicherten Wert zurück. Wenn der entfernte Wert eine Ausnahme ist, wird eine RemoteException
ausgelöst, die die entfernte Ausnahme und den Rückverfolgungsstapel erfasst.
Base.fetch
— Methodfetch(c::RemoteChannel)
Warten Sie auf und erhalten Sie einen Wert von einem RemoteChannel
. Die ausgelösten Ausnahmen sind die gleichen wie bei einem Future
. Entfernt das abgerufene Element nicht.
fetch(x::Any)
Gibt x
zurück.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
Rufe eine Funktion f
asynchron mit den angegebenen Argumenten auf dem angegebenen Prozess auf. Gibt ein Future
zurück. Schlüsselwortargumente, falls vorhanden, werden an f
weitergegeben.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
Führen Sie ein schnelleres wait(remotecall(...))
in einer Nachricht auf dem Worker
aus, der durch die Worker-ID id
angegeben ist. Schlüsselwortargumente, falls vorhanden, werden an f
weitergegeben.
Siehe auch wait
und remotecall
. ```
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
Führt fetch(remotecall(...))
in einer Nachricht aus. Schlüsselwortargumente, falls vorhanden, werden an f
weitergegeben. Alle Remote-Ausnahmen werden in einer RemoteException
erfasst und ausgelöst.
Siehe auch fetch
und remotecall
.
Beispiele
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: Auf Arbeiter 2:
DomainError mit -4.0:
sqrt wurde mit einem negativen reellen Argument aufgerufen, gibt jedoch nur ein komplexes Ergebnis zurück, wenn es mit einem komplexen Argument aufgerufen wird. Versuchen Sie sqrt(Complex(x)).
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
Führt f
asynchron auf dem Worker id
aus. Im Gegensatz zu remotecall
wird das Ergebnis der Berechnung nicht gespeichert, noch gibt es eine Möglichkeit, auf dessen Abschluss zu warten.
Ein erfolgreicher Aufruf zeigt an, dass die Anfrage zur Ausführung auf dem Remote-Knoten akzeptiert wurde.
Während aufeinanderfolgende remotecall
s an denselben Worker in der Reihenfolge, in der sie aufgerufen werden, serialisiert werden, ist die Reihenfolge der Ausführungen auf dem Remote-Worker unbestimmt. Zum Beispiel wird remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
den Aufruf von f1
serialisieren, gefolgt von f2
und f3
in dieser Reihenfolge. Es ist jedoch nicht garantiert, dass f1
vor f3
auf Worker 2 ausgeführt wird.
Alle Ausnahmen, die von f
ausgelöst werden, werden auf stderr
des Remote-Workers ausgegeben.
Schlüsselwortargumente, falls vorhanden, werden an f
weitergegeben.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
Speichern Sie eine Menge von Werten im RemoteChannel
. Wenn der Kanal voll ist, blockiert er, bis Platz verfügbar ist. Gibt das erste Argument zurück.
Base.put!
— Methodput!(rr::Future, v)
Speichern Sie einen Wert in einer Future
rr
. Future
s sind einmal schreibbare Remote-Referenzen. Ein put!
auf einer bereits gesetzten Future
wirft eine Exception
. Alle asynchronen Remote-Aufrufe geben Future
s zurück und setzen den Wert auf den Rückgabewert des Aufrufs nach Abschluss.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
Werte von einem RemoteChannel
rr
abrufen und dabei die Werte entfernen.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
Bestimmen Sie, ob ein RemoteChannel
einen Wert gespeichert hat. Beachten Sie, dass diese Funktion zu Wettlaufbedingungen führen kann, da es sein kann, dass es nicht mehr zutrifft, bis Sie das Ergebnis erhalten. Es kann jedoch sicher auf einem Future
verwendet werden, da diese nur einmal zugewiesen werden.
Base.isready
— Methodisready(rr::Future)
Bestimmen Sie, ob ein Future
einen Wert gespeichert hat.
Wenn das Argument Future
von einem anderen Knoten besessen wird, wird dieser Aufruf blockieren, um auf die Antwort zu warten. Es wird empfohlen, auf rr
in einer separaten Aufgabe zu warten oder einen lokalen Channel
als Proxy zu verwenden:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # wird nicht blockieren
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
Supertyp für Arbeiterpools wie WorkerPool
und CachingPool
. Ein AbstractWorkerPool
sollte implementieren:
push!
- einen neuen Arbeiter zum gesamten Pool hinzufügen (verfügbar + beschäftigt)put!
- einen Arbeiter zurück in den verfügbaren Pool gebentake!
- einen Arbeiter aus dem verfügbaren Pool nehmen (um ihn für die Ausführung von Remote-Funktionen zu verwenden)length
- Anzahl der im gesamten Pool verfügbaren Arbeiterisready
- false zurückgeben, wenn eintake!
auf dem Pool blockieren würde, sonst true
Die Standardimplementierungen der oben genannten Methoden (auf einem AbstractWorkerPool
) erfordern die Felder
channel::Channel{Int}
workers::Set{Int}
wobei channel
die freien Arbeiter-pids enthält und workers
die Menge aller Arbeiter ist, die mit diesem Pool verbunden sind.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Erstellen Sie einen WorkerPool
aus einem Vektor oder Bereich von Arbeiter-IDs.
Beispiele
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
Eine Implementierung eines AbstractWorkerPool
. remote
, remotecall_fetch
, pmap
(und andere Remoteaufrufe, die Funktionen remote ausführen) profitieren davon, die serialisierten/deserialisierten Funktionen auf den Arbeitsknoten zwischenzuspeichern, insbesondere Closures (die große Datenmengen erfassen können).
Der Remote-Cache wird für die Lebensdauer des zurückgegebenen CachingPool
-Objekts aufrechterhalten. Um den Cache früher zu leeren, verwenden Sie clear!(pool)
.
Für globale Variablen werden nur die Bindungen in einem Closure erfasst, nicht die Daten. let
-Blöcke können verwendet werden, um globale Daten zu erfassen.
Beispiele
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
Das obige würde foo
nur einmal an jeden Arbeiter übertragen.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
mit inaktiven workers
- verwendet von remote(f)
und pmap
(standardmäßig). Es sei denn, es wird explizit über default_worker_pool!(pool)
festgelegt, wird der Standard-Worker-Pool auf einen WorkerPool
initialisiert.
Beispiele
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
Löscht globale Bindungen in Modulen, indem sie auf nothing
initialisiert werden. syms
sollte vom Typ Symbol
oder einer Sammlung von Symbol
s sein. pids
und mod
identifizieren die Prozesse und das Modul, in dem globale Variablen neu initialisiert werden sollen. Nur die Namen, die unter mod
definiert sind, werden gelöscht.
Eine Ausnahme wird ausgelöst, wenn ein globaler Konstante angefordert wird, gelöscht zu werden.
clear!(pool::CachingPool) -> pool
Entfernt alle zwischengespeicherten Funktionen von allen beteiligten Arbeitern.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Funktion
Gibt eine anonyme Funktion zurück, die die Funktion f
auf einem verfügbaren Arbeiter (aus dem WorkerPool
p
, falls angegeben) unter Verwendung von remotecall_fetch
ausführt.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
Variante von remotecall(f, pid, ....)
. Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool
und führen Sie einen remotecall
darauf aus.
Beispiele
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
In diesem Beispiel wurde die Aufgabe auf pid 2 ausgeführt, aufgerufen von pid 1.
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
Variante von remotecall_wait(f, pid, ....)
. Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool
und führen Sie ein remotecall_wait
darauf aus.
Beispiele
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
Variante von remotecall_fetch(f, pid, ....)
. Wartet auf und nimmt einen freien Arbeiter aus pool
und führt ein remotecall_fetch
darauf aus.
Beispiele
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nichts
WorkerPool
Variante von remote_do(f, pid, ....)
. Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool
und führen Sie ein remote_do
darauf aus.
Distributed.@spawn
— Macro@spawn expr
Erstellt einen Closure um einen Ausdruck und führt ihn auf einem automatisch gewählten Prozess aus, wobei ein Future
zum Ergebnis zurückgegeben wird. Dieses Makro ist veraltet; @spawnat :any expr
sollte stattdessen verwendet werden.
Beispiele
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nichts)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nichts)
julia> fetch(f)
3
Ab Julia 1.3 ist dieses Makro veraltet. Verwenden Sie stattdessen @spawnat :any
.
Distributed.@spawnat
— Macro@spawnat p expr
Erstellen Sie einen Closure um einen Ausdruck und führen Sie den Closure asynchron auf dem Prozess p
aus. Geben Sie ein Future
zum Ergebnis zurück. Wenn p
das zitierte literale Symbol :any
ist, wählt das System automatisch einen Prozessor aus.
Beispiele
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Das Argument :any
ist seit Julia 1.3 verfügbar.
Distributed.@fetch
— Macro@fetch expr
Entspricht fetch(@spawnat :any expr)
. Siehe fetch
und @spawnat
.
Beispiele
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
Entspricht fetch(@spawnat p expr)
. Siehe fetch
und @spawnat
.
Beispiele
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
Eine verteilte Speicher-, parallele Schleife der Form:
@distributed [reducer] for var = range
body
end
Der angegebene Bereich wird partitioniert und lokal über alle Arbeiter ausgeführt. Falls eine optionale Reduzierfunktion angegeben ist, führt @distributed
lokale Reduzierungen auf jedem Arbeiter durch, gefolgt von einer endgültigen Reduzierung im aufrufenden Prozess.
Beachten Sie, dass @distributed
ohne eine Reduzierfunktion asynchron ausgeführt wird, d.h. es startet unabhängige Aufgaben auf allen verfügbaren Arbeitern und gibt sofort zurück, ohne auf den Abschluss zu warten. Um auf den Abschluss zu warten, setzen Sie den Aufruf mit @sync
in der Form:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
Führen Sie einen Ausdruck unter Main
auf allen procs
aus. Fehler in einem der Prozesse werden in eine CompositeException
gesammelt und ausgelöst. Zum Beispiel:
@everywhere bar = 1
definiert Main.bar
auf allen aktuellen Prozessen. Alle später hinzugefügten Prozesse (zum Beispiel mit addprocs()
) haben den Ausdruck nicht definiert.
Im Gegensatz zu @spawnat
erfasst @everywhere
keine lokalen Variablen. Stattdessen können lokale Variablen mit Interpolation übertragen werden:
foo = 1
@everywhere bar = $foo
Das optionale Argument procs
ermöglicht es, eine Teilmenge aller Prozesse anzugeben, die den Ausdruck ausführen sollen.
Ähnlich wie beim Aufruf von remotecall_eval(Main, procs, expr)
, jedoch mit zwei zusätzlichen Funktionen:
- `using` und `import` Anweisungen werden zuerst im aufrufenden Prozess ausgeführt, um sicherzustellen,
dass Pakete vorcompiliert sind.
- Der aktuelle Quelldateipfad, der von `include` verwendet wird, wird an andere Prozesse weitergegeben.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s und RemoteChannel
s werden durch Felder identifiziert:
where
- bezieht sich auf den Knoten, an dem das zugrunde liegende Objekt/Speicher, auf das die Referenz verweist, tatsächlich existiert.whence
- bezieht sich auf den Knoten, von dem die Remote-Referenz erstellt wurde. Beachten Sie, dass dies sich von dem Knoten unterscheidet, an dem das zugrunde liegende Objekt, auf das verwiesen wird, tatsächlich existiert. Zum Beispiel würde der Aufruf vonRemoteChannel(2)
vom Master-Prozess zu einemwhere
-Wert von 2 und einemwhence
-Wert von 1 führen.id
ist einzigartig für alle Referenzen, die von dem Worker erstellt wurden, der durchwhence
angegeben ist.
Zusammen identifizieren whence
und id
eindeutig eine Referenz über alle Worker hinweg.
remoteref_id
ist eine Low-Level-API, die ein RRID
-Objekt zurückgibt, das die whence
- und id
-Werte einer Remote-Referenz umschließt.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
Eine Low-Level-API, die das zugrunde liegende AbstractChannel
für eine id
zurückgibt, die von remoteref_id
zurückgegeben wird. Der Aufruf ist nur auf dem Knoten gültig, auf dem der zugrunde liegende Kanal existiert.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
Eine Low-Level-API, die, gegeben eine IO
-Verbindung oder einen Worker
, die pid
des Workers zurückgibt, mit dem sie verbunden ist. Dies ist nützlich, wenn benutzerdefinierte serialize
Methoden für einen Typ geschrieben werden, die die geschriebenen Daten je nach Prozess-ID des empfangenden Prozesses optimieren.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
Gibt das Cluster-Cookie zurück.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
Setzt das übergebene Cookie als das Cluster-Cookie und gibt es dann zurück.
Cluster Manager Interface
Diese Schnittstelle bietet einen Mechanismus zum Starten und Verwalten von Julia-Arbeitern in verschiedenen Cluster-Umgebungen. Es gibt zwei Arten von Managern in Base: LocalManager
, um zusätzliche Arbeiter auf demselben Host zu starten, und SSHManager
, um auf entfernten Hosts über ssh
zu starten. TCP/IP-Sockets werden verwendet, um Prozesse zu verbinden und Nachrichten zwischen ihnen zu transportieren. Es ist möglich, dass Cluster-Manager einen anderen Transport bereitstellen.
Distributed.ClusterManager
— TypeClusterManager
Supertyp für Cluster-Manager, die Arbeitsprozesse als Cluster steuern. Cluster-Manager implementieren, wie Arbeiter hinzugefügt, entfernt und kommuniziert werden können. SSHManager
und LocalManager
sind Subtypen davon.
Distributed.WorkerConfig
— TypeWorkerConfig
Typ, der von ClusterManager
s verwendet wird, um die zu ihren Clustern hinzugefügten Worker zu steuern. Einige Felder werden von allen Cluster-Managern verwendet, um auf einen Host zuzugreifen:
io
– die Verbindung, die verwendet wird, um auf den Worker zuzugreifen (ein Subtyp vonIO
oderNothing
)host
– die Hostadresse (entweder einString
oderNothing
)port
– der Port auf dem Host, der verwendet wird, um eine Verbindung zum Worker herzustellen (entweder einInt
oderNothing
)
Einige werden vom Cluster-Manager verwendet, um Worker zu einem bereits initialisierten Host hinzuzufügen:
count
– die Anzahl der Worker, die auf dem Host gestartet werden sollenexename
– der Pfad zur Julia-Ausführungsdatei auf dem Host, standardmäßig"$(Sys.BINDIR)/julia"
oder"$(Sys.BINDIR)/julia-debug"
exeflags
– Flags, die beim Starten von Julia remote verwendet werden
Das Feld userdata
wird verwendet, um Informationen für jeden Worker von externen Managern zu speichern.
Einige Felder werden von SSHManager
und ähnlichen Managern verwendet:
tunnel
–true
(Tunneling verwenden),false
(kein Tunneling verwenden) odernothing
(Standard für den Manager verwenden)multiplex
–true
(SSH-Multiplexing für Tunneling verwenden) oderfalse
forward
– die Weiterleitungsoption, die für die-L
-Option von ssh verwendet wirdbind_addr
– die Adresse auf dem Remote-Host, an die gebunden werden sollsshflags
– Flags, die beim Herstellen der SSH-Verbindung verwendet werdenmax_parallel
– die maximale Anzahl von Workern, die parallel auf dem Host verbunden werden sollen
Einige Felder werden sowohl von LocalManager
s als auch von SSHManager
s verwendet:
connect_at
– bestimmt, ob dies ein Worker-zu-Worker- oder Driver-zu-Worker-Setup-Aufruf istprocess
– der Prozess, der verbunden werden soll (normalerweise weist der Manager dies währendaddprocs
zu)ospid
– die Prozess-ID gemäß dem Host-OS, die verwendet wird, um Worker-Prozesse zu unterbrechenenviron
– privates Wörterbuch, das von Local/SSH-Managern verwendet wird, um temporäre Informationen zu speichernident
– Worker, wie er vomClusterManager
identifiziert wirdconnect_idents
– Liste der Worker-IDs, mit denen der Worker verbunden werden muss, wenn eine benutzerdefinierte Topologie verwendet wirdenable_threaded_blas
–true
,false
odernothing
, ob auf den Workern threaded BLAS verwendet werden soll oder nicht
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implementiert von Cluster-Managern. Für jeden Julia-Arbeiter, der von dieser Funktion gestartet wird, sollte ein WorkerConfig
-Eintrag zu launched
hinzugefügt und launch_ntfy
benachrichtigt werden. Die Funktion MUSS beenden, sobald alle Arbeiter, die von manager
angefordert wurden, gestartet wurden. params
ist ein Wörterbuch aller Schlüsselargumente, mit denen addprocs
aufgerufen wurde.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Implementiert von Cluster-Managern. Es wird im Master-Prozess während der Lebensdauer eines Workers mit den entsprechenden op
-Werten aufgerufen:
- mit
:register
/:deregister
, wenn ein Worker zum Julia-Worker-Pool hinzugefügt / entfernt wird. - mit
:interrupt
, wenninterrupt(workers)
aufgerufen wird. DerClusterManager
sollte den entsprechenden Worker mit einem Interrupt-Signal benachrichtigen. - mit
:finalize
zu Reinigungszwecken.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Wird von Cluster-Managern implementiert. Es wird im Master-Prozess von rmprocs
aufgerufen. Es sollte dazu führen, dass der entfernte Worker, der durch pid
angegeben ist, beendet wird. kill(manager::ClusterManager.....)
führt ein entferntes exit()
auf pid
aus.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Implementiert von Cluster-Managern mit benutzerdefinierten Transporten. Es sollte eine logische Verbindung zum Worker mit der ID pid
, die durch config
angegeben ist, hergestellt werden und ein Paar von IO
-Objekten zurückgeben. Nachrichten von pid
an den aktuellen Prozess werden von instrm
gelesen, während Nachrichten, die an pid
gesendet werden sollen, in outstrm
geschrieben werden. Die Implementierung des benutzerdefinierten Transports muss sicherstellen, dass Nachrichten vollständig und in der richtigen Reihenfolge zugestellt und empfangen werden. connect(manager::ClusterManager.....)
richtet TCP/IP-Socket-Verbindungen zwischen den Workern ein.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Aufgerufen von Cluster-Managern, die benutzerdefinierte Transports implementieren. Es initialisiert einen neu gestarteten Prozess als Arbeiter. Das Befehlszeilenargument --worker[=<cookie>]
hat die Wirkung, einen Prozess als Arbeiter zu initialisieren, der TCP/IP-Sockets für den Transport verwendet. cookie
ist ein cluster_cookie
.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
ist eine interne Funktion, die den Standard-Einstiegspunkt für Arbeitsprozesse darstellt, die sich über TCP/IP verbinden. Sie richtet den Prozess als Julia-Cluster-Arbeiter ein.
Host:Port-Informationen werden in den Stream out
geschrieben (standardmäßig stdout).
Die Funktion liest das Cookie von stdin, falls erforderlich, und hört auf einem freien Port (oder, falls angegeben, dem Port in der --bind-to
-Befehlszeilenoption) und plant Aufgaben zur Verarbeitung eingehender TCP-Verbindungen und -Anfragen. Sie schließt auch (optional) stdin und leitet stderr an stdout um.
Sie gibt nichts zurück.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Wird von Cluster-Managern mit benutzerdefinierten Transporten aufgerufen. Es sollte aufgerufen werden, wenn die benutzerdefinierte Transportimplementierung die erste Nachricht von einem entfernten Worker erhält. Der benutzerdefinierte Transport muss eine logische Verbindung zum entfernten Worker verwalten und zwei IO
-Objekte bereitstellen, eines für eingehende Nachrichten und das andere für an den entfernten Worker adressierte Nachrichten. Wenn incoming
true
ist, hat der entfernte Peer die Verbindung initiiert. Derjenige, der die Verbindung initiiert, sendet das Cluster-Cookie und seine Julia-Versionnummer, um den Authentifizierungs-Handschlag durchzuführen.
Siehe auch cluster_cookie
.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Implementiert von Cluster-Managern. Die Standard-Keyword-Parameter, die beim Aufruf von addprocs(mgr)
übergeben werden. Die minimale Menge an Optionen ist verfügbar, indem default_addprocs_params()
aufgerufen wird.