Distributed Computing
Distributed — ModuleTools für verteilte parallele Verarbeitung.
Distributed.addprocs — Functionaddprocs(manager::ClusterManager; kwargs...) -> Liste der ProzessidentifikatorenStartet Arbeitsprozesse über den angegebenen Cluster-Manager.
Zum Beispiel werden Beowulf-Cluster über einen benutzerdefinierten Cluster-Manager unterstützt, der im Paket ClusterManagers.jl implementiert ist.
Die Anzahl der Sekunden, die ein neu gestarteter Arbeiter auf die Verbindungsherstellung vom Master wartet, kann über die Variable JULIA_WORKER_TIMEOUT in der Umgebung des Arbeiterprozesses angegeben werden. Relevant nur bei Verwendung von TCP/IP als Transport.
Um Arbeiter zu starten, ohne die REPL oder die umgebende Funktion zu blockieren, wenn Arbeiter programmgesteuert gestartet werden, führen Sie addprocs in seiner eigenen Aufgabe aus.
Beispiele
# In beschäftigten Clustern `addprocs` asynchron aufrufen
t = @async addprocs(...)# Arbeiter nutzen, sobald sie online sind
if nprocs() > 1 # Sicherstellen, dass mindestens ein neuer Arbeiter verfügbar ist
.... # verteilte Ausführung durchführen
end# Neu gestartete Arbeiter-IDs oder Fehlermeldungen abrufen
if istaskdone(t) # Überprüfen, ob `addprocs` abgeschlossen ist, um sicherzustellen, dass `fetch` nicht blockiert
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Liste der ProzessidentifikatorenFügen Sie Arbeitsprozesse auf Remote-Maschinen über SSH hinzu. Die Konfiguration erfolgt über Schlüsselwortargumente (siehe unten). Insbesondere kann das Schlüsselwort exename verwendet werden, um den Pfad zur julia-Binärdatei auf der Remote-Maschine(n) anzugeben.
machines ist ein Vektor von "Maschinenspezifikationen", die als Strings der Form [user@]host[:port] [bind_addr[:port]] angegeben werden. user ist standardmäßig der aktuelle Benutzer und port der Standard-SSH-Port. Wenn [bind_addr[:port]] angegeben ist, stellen andere Arbeiter eine Verbindung zu diesem Arbeiter über die angegebene bind_addr und port her.
Es ist möglich, mehrere Prozesse auf einem Remote-Host zu starten, indem ein Tupel im machines-Vektor oder die Form (machine_spec, count) verwendet wird, wobei count die Anzahl der zu startenden Arbeiter auf dem angegebenen Host ist. Das Übergeben von :auto als Arbeiteranzahl startet so viele Arbeiter wie CPU-Threads auf dem Remote-Host.
Beispiele:
addprocs([
"remote1", # ein Arbeiter auf 'remote1', der sich mit dem aktuellen Benutzernamen anmeldet
"user@remote2", # ein Arbeiter auf 'remote2', der sich mit dem Benutzernamen 'user' anmeldet
"user@remote3:2222", # SSH-Port auf '2222' für 'remote3' angeben
("user@remote4", 4), # 4 Arbeiter auf 'remote4' starten
("user@remote5", :auto), # so viele Arbeiter wie CPU-Threads auf 'remote5' starten
])Schlüsselwortargumente:
tunnel: wenntrue, wird SSH-Tunneling verwendet, um sich vom Master-Prozess mit dem Arbeiter zu verbinden. Standard istfalse.multiplex: wenntrue, wird SSH-Multiplexing für SSH-Tunneling verwendet. Standard istfalse.ssh: der Name oder Pfad des SSH-Client-Executables, das zum Starten der Arbeiter verwendet wird. Standard ist"ssh".sshflags: gibt zusätzliche SSH-Optionen an, z.B.sshflags=`-i /home/foo/bar.pem`max_parallel: gibt die maximale Anzahl von Arbeitern an, die parallel zu einem Host verbunden sind. Standardmäßig 10.shell: gibt den Typ der Shell an, mit der SSH sich bei den Arbeitern verbindet.shell=:posix: eine POSIX-kompatible Unix/Linux-Shell (sh, ksh, bash, dash, zsh usw.). Der Standard.shell=:csh: eine Unix C-Shell (csh, tcsh).shell=:wincmd: Microsoft Windowscmd.exe.
dir: gibt das Arbeitsverzeichnis auf den Arbeitern an. Standardmäßig das aktuelle Verzeichnis des Hosts (wie vonpwd()gefunden).enable_threaded_blas: wenntrue, wird BLAS in mehreren Threads in hinzugefügten Prozessen ausgeführt. Standard istfalse.exename: Name derjulia-Executable. Standard ist"$(Sys.BINDIR)/julia"oder"$(Sys.BINDIR)/julia-debug"je nach Fall. Es wird empfohlen, auf allen Remote-Maschinen eine gemeinsame Julia-Version zu verwenden, da sonst die Serialisierung und der Codevertrieb fehlschlagen könnten.exeflags: zusätzliche Flags, die an die Arbeiterprozesse übergeben werden.topology: Gibt an, wie die Arbeiter miteinander verbunden sind. Das Senden einer Nachricht zwischen nicht verbundenen Arbeitern führt zu einem Fehler.topology=:all_to_all: Alle Prozesse sind miteinander verbunden. Der Standard.topology=:master_worker: Nur der Treiberprozess, d.h.pid1, verbindet sich mit den Arbeitern. Die Arbeiter verbinden sich nicht untereinander.topology=:custom: Dielaunch-Methode des Cluster-Managers gibt die Verbindungstopologie über die Felderidentundconnect_identsinWorkerConfigan. Ein Arbeiter mit einer Cluster-Manager-Identitätidentverbindet sich mit allen Arbeitern, die inconnect_identsangegeben sind.
lazy: Anwendbar nur mittopology=:all_to_all. Wenntrue, werden die Verbindungen zwischen Arbeitern faul eingerichtet, d.h. sie werden beim ersten Remote-Aufruf zwischen Arbeitern eingerichtet. Standard isttrue.env: Stellen Sie ein Array von String-Paaren wieenv=["JULIA_DEPOT_PATH"=>"/depot"]bereit, um zu verlangen, dass Umgebungsvariablen auf der Remote-Maschine gesetzt werden. Standardmäßig wird nur die UmgebungsvariableJULIA_WORKER_TIMEOUTautomatisch von der lokalen in die Remote-Umgebung übergeben.cmdline_cookie: Übergeben Sie das Authentifizierungscookie über die--worker-Befehlszeilenoption. Das (sicherere) Standardverhalten, das Cookie über SSH stdio zu übergeben, kann bei Windows-Arbeitern, die ältere (vor ConPTY) Julia- oder Windows-Versionen verwenden, hängen bleiben, in diesem Fall bietetcmdline_cookie=trueeine Umgehungslösung.
Die Schlüsselwortargumente ssh, shell, env und cmdline_cookie wurden in Julia 1.6 hinzugefügt.
Umgebungsvariablen:
Wenn der Master-Prozess innerhalb von 60,0 Sekunden keine Verbindung zu einem neu gestarteten Arbeiter herstellen kann, betrachtet der Arbeiter dies als eine fatale Situation und beendet sich. Dieses Timeout kann über die Umgebungsvariable JULIA_WORKER_TIMEOUT gesteuert werden. Der Wert von JULIA_WORKER_TIMEOUT im Master-Prozess gibt die Anzahl der Sekunden an, die ein neu gestarteter Arbeiter auf die Herstellung der Verbindung wartet.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Liste der Prozess-IdentifikatorenStarten Sie np Arbeiter auf dem lokalen Host mit dem integrierten LocalManager.
Lokale Arbeiter erben die aktuelle Paketumgebung (d.h. aktives Projekt, LOAD_PATH und DEPOT_PATH) vom Hauptprozess.
Beachten Sie, dass Arbeiter kein ~/.julia/config/startup.jl Startskript ausführen und ihren globalen Zustand (wie Befehlszeilenoptionen, globale Variablen, neue Methodendefinitionen und geladene Module) nicht mit anderen laufenden Prozessen synchronisieren.
Schlüsselwortargumente:
restrict::Bool: wenntrue(Standard) ist die Bindung auf127.0.0.1beschränkt.dir,exename,exeflags,env,topology,lazy,enable_threaded_blas: derselbe Effekt wie beiSSHManager, siehe Dokumentation füraddprocs(machines::AbstractVector).
Das Erben der Paketumgebung und das Schlüsselwortargument env wurden in Julia 1.9 hinzugefügt.
Distributed.nprocs — Functionnprocs()Holen Sie die Anzahl der verfügbaren Prozesse.
Beispiele
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — Functionnworkers()Holen Sie die Anzahl der verfügbaren Arbeitsprozesse. Dies ist eins weniger als nprocs(). Gleich nprocs(), wenn nprocs() == 1.
Beispiele
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — Methodprocs()Gibt eine Liste aller Prozess-Identifikatoren zurück, einschließlich pid 1 (die nicht von workers() eingeschlossen ist).
Beispiele
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — Methodprocs(pid::Integer)Gibt eine Liste aller Prozessidentifikatoren auf demselben physischen Knoten zurück. Insbesondere werden alle Arbeiter zurückgegeben, die an dieselbe IP-Adresse wie pid gebunden sind.
Distributed.workers — Functionworkers()Gibt eine Liste aller Arbeitsprozess-Identifikatoren zurück.
Beispiele
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — Functionrmprocs(pids...; waitfor=typemax(Int))Entfernen Sie die angegebenen Arbeiter. Beachten Sie, dass nur Prozess 1 Arbeiter hinzufügen oder entfernen kann.
Das Argument waitfor gibt an, wie lange gewartet werden soll, bis die Arbeiter heruntergefahren sind:
- Wenn nicht angegeben, wartet
rmprocs, bis alle angefordertenpidsentfernt sind. - Eine
ErrorExceptionwird ausgelöst, wenn nicht alle Arbeiter vor den angefordertenwaitforSekunden beendet werden können. - Mit einem
waitfor-Wert von 0 gibt der Aufruf sofort zurück, wobei die Arbeiter in einer anderen Aufgabe zur Entfernung geplant sind. Das geplanteTaskObjekt wird zurückgegeben. Der Benutzer solltewaitauf der Aufgabe aufrufen, bevor er andere parallele Aufrufe tätigt.
Beispiele
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — Functioninterrupt(pids::Integer...)Unterbricht die derzeit ausgeführte Aufgabe auf den angegebenen Arbeitern. Dies entspricht dem Drücken von Ctrl-C auf dem lokalen Computer. Wenn keine Argumente angegeben sind, werden alle Arbeiter unterbrochen.
interrupt(pids::AbstractVector=workers())Unterbrechen Sie die aktuell ausgeführte Aufgabe auf den angegebenen Arbeitern. Dies entspricht dem Drücken von Ctrl-C auf dem lokalen Computer. Wenn keine Argumente angegeben sind, werden alle Arbeiter unterbrochen.
Distributed.myid — Functionmyid()Holen Sie sich die ID des aktuellen Prozesses.
Beispiele
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> SammlungTransformiere die Sammlung c, indem f auf jedes Element unter Verwendung verfügbarer Arbeiter und Aufgaben angewendet wird.
Bei mehreren Sammlungsargumenten wird f elementweise angewendet.
Beachte, dass f allen Arbeiterprozessen zur Verfügung stehen muss; siehe Codeverfügbarkeit und Laden von Paketen für Details.
Wenn kein Arbeiterpool angegeben ist, werden alle verfügbaren Arbeiter über einen CachingPool verwendet.
Standardmäßig verteilt pmap die Berechnung über alle angegebenen Arbeiter. Um nur den lokalen Prozess zu verwenden und über Aufgaben zu verteilen, gib distributed=false an. Dies entspricht der Verwendung von asyncmap. Zum Beispiel ist pmap(f, c; distributed=false) äquivalent zu asyncmap(f,c; ntasks=()->nworkers())
pmap kann auch eine Mischung aus Prozessen und Aufgaben über das Argument batch_size verwenden. Bei Batchgrößen größer als 1 wird die Sammlung in mehreren Batches verarbeitet, die jeweils die Länge batch_size oder weniger haben. Ein Batch wird als einzelne Anfrage an einen freien Arbeiter gesendet, wo eine lokale asyncmap Elemente aus dem Batch mit mehreren gleichzeitigen Aufgaben verarbeitet.
Ein Fehler stoppt pmap daran, den Rest der Sammlung zu verarbeiten. Um dieses Verhalten zu überschreiben, kannst du eine Fehlerbehandlungsfunktion über das Argument on_error angeben, die ein einzelnes Argument, d.h. die Ausnahme, entgegennimmt. Die Funktion kann die Verarbeitung stoppen, indem sie den Fehler erneut auslöst, oder, um fortzufahren, einen beliebigen Wert zurückgeben, der dann inline mit den Ergebnissen an den Aufrufer zurückgegeben wird.
Betrachte die folgenden zwei Beispiele. Das erste gibt das Ausnahmeobjekt inline zurück, das zweite eine 0 anstelle einer Ausnahme:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0Fehler können auch behandelt werden, indem fehlgeschlagene Berechnungen erneut versucht werden. Die Schlüsselwortargumente retry_delays und retry_check werden als Schlüsselwortargumente delays und check an retry weitergegeben. Wenn das Batching angegeben ist und ein gesamter Batch fehlschlägt, werden alle Elemente im Batch erneut versucht.
Beachte, dass wenn sowohl on_error als auch retry_delays angegeben sind, der on_error Hook vor dem erneuten Versuch aufgerufen wird. Wenn on_error keine Ausnahme auslöst (oder erneut auslöst), wird das Element nicht erneut versucht.
Beispiel: Bei Fehlern, versuche f auf einem Element maximal 3 Mal ohne Verzögerung zwischen den Versuchen.
pmap(f, c; retry_delays = zeros(3))Beispiel: Versuche f nur, wenn die Ausnahme nicht vom Typ InexactError ist, mit exponentiell zunehmenden Verzögerungen bis zu 3 Mal. Gib ein NaN anstelle aller InexactError-Vorkommen zurück.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — TypeRemoteException(captured)Ausnahmen bei Remote-Berechnungen werden erfasst und lokal erneut ausgelöst. Eine RemoteException umschließt die pid des Arbeiters und eine erfasste Ausnahme. Eine CapturedException erfasst die Remote-Ausnahme und eine serialisierbare Form des Aufrufstapels, als die Ausnahme ausgelöst wurde.
Distributed.ProcessExitedException — TypeProcessExitedException(worker_id::Int)Nachdem ein Client-Julia-Prozess beendet wurde, führen weitere Versuche, auf das tote Kind zuzugreifen, zu dieser Ausnahme.
Distributed.Future — TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Ein Future ist ein Platzhalter für eine einzelne Berechnung mit unbekanntem Abschlussstatus und -zeit. Für mehrere potenzielle Berechnungen siehe RemoteChannel. Siehe remoteref_id zur Identifizierung eines AbstractRemoteRef.
Distributed.RemoteChannel — TypeRemoteChannel(pid::Integer=myid())Erstellt eine Referenz zu einem Channel{Any}(1) im Prozess pid. Der Standardwert für pid ist der aktuelle Prozess.
RemoteChannel(f::Function, pid::Integer=myid())Erstellt Referenzen zu Remote-Kanälen einer bestimmten Größe und Art. f ist eine Funktion, die bei Ausführung auf pid eine Implementierung eines AbstractChannel zurückgeben muss.
Zum Beispiel wird RemoteChannel(()->Channel{Int}(10), pid) eine Referenz zu einem Kanal vom Typ Int und der Größe 10 auf pid zurückgeben.
Der Standardwert für pid ist der aktuelle Prozess.
Base.fetch — Methodfetch(x::Future)Warten Sie auf und erhalten Sie den Wert eines Future. Der abgerufene Wert wird lokal zwischengespeichert. Weitere Aufrufe von fetch auf demselben Verweis geben den zwischengespeicherten Wert zurück. Wenn der entfernte Wert eine Ausnahme ist, wird eine RemoteException ausgelöst, die die entfernte Ausnahme und den Rückverfolgungsstapel erfasst.
Base.fetch — Methodfetch(c::RemoteChannel)Warten Sie auf und erhalten Sie einen Wert von einem RemoteChannel. Die ausgelösten Ausnahmen sind die gleichen wie bei einem Future. Entfernt das abgerufene Element nicht.
fetch(x::Any)Gibt x zurück.
Distributed.remotecall — Methodremotecall(f, id::Integer, args...; kwargs...) -> FutureRufe eine Funktion f asynchron mit den angegebenen Argumenten auf dem angegebenen Prozess auf. Gibt ein Future zurück. Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben.
Distributed.remotecall_wait — Methodremotecall_wait(f, id::Integer, args...; kwargs...)Führen Sie ein schnelleres wait(remotecall(...)) in einer Nachricht auf dem Worker aus, der durch die Worker-ID id angegeben ist. Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben.
Siehe auch wait und remotecall. ```
Distributed.remotecall_fetch — Methodremotecall_fetch(f, id::Integer, args...; kwargs...)Führt fetch(remotecall(...)) in einer Nachricht aus. Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben. Alle Remote-Ausnahmen werden in einer RemoteException erfasst und ausgelöst.
Siehe auch fetch und remotecall.
Beispiele
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: Auf Arbeiter 2:
DomainError mit -4.0:
sqrt wurde mit einem negativen reellen Argument aufgerufen, gibt jedoch nur ein komplexes Ergebnis zurück, wenn es mit einem komplexen Argument aufgerufen wird. Versuchen Sie sqrt(Complex(x)).
...Distributed.remote_do — Methodremote_do(f, id::Integer, args...; kwargs...) -> nothingFührt f asynchron auf dem Worker id aus. Im Gegensatz zu remotecall wird das Ergebnis der Berechnung nicht gespeichert, noch gibt es eine Möglichkeit, auf dessen Abschluss zu warten.
Ein erfolgreicher Aufruf zeigt an, dass die Anfrage zur Ausführung auf dem Remote-Knoten akzeptiert wurde.
Während aufeinanderfolgende remotecalls an denselben Worker in der Reihenfolge, in der sie aufgerufen werden, serialisiert werden, ist die Reihenfolge der Ausführungen auf dem Remote-Worker unbestimmt. Zum Beispiel wird remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) den Aufruf von f1 serialisieren, gefolgt von f2 und f3 in dieser Reihenfolge. Es ist jedoch nicht garantiert, dass f1 vor f3 auf Worker 2 ausgeführt wird.
Alle Ausnahmen, die von f ausgelöst werden, werden auf stderr des Remote-Workers ausgegeben.
Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben.
Base.put! — Methodput!(rr::RemoteChannel, args...)Speichern Sie eine Menge von Werten im RemoteChannel. Wenn der Kanal voll ist, blockiert er, bis Platz verfügbar ist. Gibt das erste Argument zurück.
Base.put! — Methodput!(rr::Future, v)Speichern Sie einen Wert in einer Future rr. Futures sind einmal schreibbare Remote-Referenzen. Ein put! auf einer bereits gesetzten Future wirft eine Exception. Alle asynchronen Remote-Aufrufe geben Futures zurück und setzen den Wert auf den Rückgabewert des Aufrufs nach Abschluss.
Base.take! — Methodtake!(rr::RemoteChannel, args...)Werte von einem RemoteChannel rr abrufen und dabei die Werte entfernen.
Base.isready — Methodisready(rr::RemoteChannel, args...)Bestimmen Sie, ob ein RemoteChannel einen Wert gespeichert hat. Beachten Sie, dass diese Funktion zu Wettlaufbedingungen führen kann, da es sein kann, dass es nicht mehr zutrifft, bis Sie das Ergebnis erhalten. Es kann jedoch sicher auf einem Future verwendet werden, da diese nur einmal zugewiesen werden.
Base.isready — Methodisready(rr::Future)Bestimmen Sie, ob ein Future einen Wert gespeichert hat.
Wenn das Argument Future von einem anderen Knoten besessen wird, wird dieser Aufruf blockieren, um auf die Antwort zu warten. Es wird empfohlen, auf rr in einer separaten Aufgabe zu warten oder einen lokalen Channel als Proxy zu verwenden:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # wird nicht blockierenDistributed.AbstractWorkerPool — TypeAbstractWorkerPoolSupertyp für Arbeiterpools wie WorkerPool und CachingPool. Ein AbstractWorkerPool sollte implementieren:
push!- einen neuen Arbeiter zum gesamten Pool hinzufügen (verfügbar + beschäftigt)put!- einen Arbeiter zurück in den verfügbaren Pool gebentake!- einen Arbeiter aus dem verfügbaren Pool nehmen (um ihn für die Ausführung von Remote-Funktionen zu verwenden)length- Anzahl der im gesamten Pool verfügbaren Arbeiterisready- false zurückgeben, wenn eintake!auf dem Pool blockieren würde, sonst true
Die Standardimplementierungen der oben genannten Methoden (auf einem AbstractWorkerPool) erfordern die Felder
channel::Channel{Int}workers::Set{Int}
wobei channel die freien Arbeiter-pids enthält und workers die Menge aller Arbeiter ist, die mit diesem Pool verbunden sind.
Distributed.WorkerPool — TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})Erstellen Sie einen WorkerPool aus einem Vektor oder Bereich von Arbeiter-IDs.
Beispiele
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — TypeCachingPool(workers::Vector{Int})Eine Implementierung eines AbstractWorkerPool. remote, remotecall_fetch, pmap (und andere Remoteaufrufe, die Funktionen remote ausführen) profitieren davon, die serialisierten/deserialisierten Funktionen auf den Arbeitsknoten zwischenzuspeichern, insbesondere Closures (die große Datenmengen erfassen können).
Der Remote-Cache wird für die Lebensdauer des zurückgegebenen CachingPool-Objekts aufrechterhalten. Um den Cache früher zu leeren, verwenden Sie clear!(pool).
Für globale Variablen werden nur die Bindungen in einem Closure erfasst, nicht die Daten. let-Blöcke können verwendet werden, um globale Daten zu erfassen.
Beispiele
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
endDas obige würde foo nur einmal an jeden Arbeiter übertragen.
Distributed.default_worker_pool — Functiondefault_worker_pool()AbstractWorkerPool mit inaktiven workers - verwendet von remote(f) und pmap (standardmäßig). Es sei denn, es wird explizit über default_worker_pool!(pool) festgelegt, wird der Standard-Worker-Pool auf einen WorkerPool initialisiert.
Beispiele
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — Functionclear!(syms, pids=workers(); mod=Main)Löscht globale Bindungen in Modulen, indem sie auf nothing initialisiert werden. syms sollte vom Typ Symbol oder einer Sammlung von Symbols sein. pids und mod identifizieren die Prozesse und das Modul, in dem globale Variablen neu initialisiert werden sollen. Nur die Namen, die unter mod definiert sind, werden gelöscht.
Eine Ausnahme wird ausgelöst, wenn ein globaler Konstante angefordert wird, gelöscht zu werden.
clear!(pool::CachingPool) -> poolEntfernt alle zwischengespeicherten Funktionen von allen beteiligten Arbeitern.
Distributed.remote — Functionremote([p::AbstractWorkerPool], f) -> FunktionGibt eine anonyme Funktion zurück, die die Funktion f auf einem verfügbaren Arbeiter (aus dem WorkerPool p, falls angegeben) unter Verwendung von remotecall_fetch ausführt.
Distributed.remotecall — Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool Variante von remotecall(f, pid, ....). Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool und führen Sie einen remotecall darauf aus.
Beispiele
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)In diesem Beispiel wurde die Aufgabe auf pid 2 ausgeführt, aufgerufen von pid 1.
Distributed.remotecall_wait — Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool Variante von remotecall_wait(f, pid, ....). Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool und führen Sie ein remotecall_wait darauf aus.
Beispiele
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> resultWorkerPool Variante von remotecall_fetch(f, pid, ....). Wartet auf und nimmt einen freien Arbeiter aus pool und führt ein remotecall_fetch darauf aus.
Beispiele
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nichtsWorkerPool Variante von remote_do(f, pid, ....). Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool und führen Sie ein remote_do darauf aus.
Distributed.@spawn — Macro@spawn exprErstellt einen Closure um einen Ausdruck und führt ihn auf einem automatisch gewählten Prozess aus, wobei ein Future zum Ergebnis zurückgegeben wird. Dieses Makro ist veraltet; @spawnat :any expr sollte stattdessen verwendet werden.
Beispiele
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nichts)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nichts)
julia> fetch(f)
3Ab Julia 1.3 ist dieses Makro veraltet. Verwenden Sie stattdessen @spawnat :any.
Distributed.@spawnat — Macro@spawnat p exprErstellen Sie einen Closure um einen Ausdruck und führen Sie den Closure asynchron auf dem Prozess p aus. Geben Sie ein Future zum Ergebnis zurück. Wenn p das zitierte literale Symbol :any ist, wählt das System automatisch einen Prozessor aus.
Beispiele
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3Das Argument :any ist seit Julia 1.3 verfügbar.
Distributed.@fetch — Macro@fetch exprEntspricht fetch(@spawnat :any expr). Siehe fetch und @spawnat.
Beispiele
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — Macro@fetchfromEntspricht fetch(@spawnat p expr). Siehe fetch und @spawnat.
Beispiele
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — Macro@distributedEine verteilte Speicher-, parallele Schleife der Form:
@distributed [reducer] for var = range
body
endDer angegebene Bereich wird partitioniert und lokal über alle Arbeiter ausgeführt. Falls eine optionale Reduzierfunktion angegeben ist, führt @distributed lokale Reduzierungen auf jedem Arbeiter durch, gefolgt von einer endgültigen Reduzierung im aufrufenden Prozess.
Beachten Sie, dass @distributed ohne eine Reduzierfunktion asynchron ausgeführt wird, d.h. es startet unabhängige Aufgaben auf allen verfügbaren Arbeitern und gibt sofort zurück, ohne auf den Abschluss zu warten. Um auf den Abschluss zu warten, setzen Sie den Aufruf mit @sync in der Form:
@sync @distributed for var = range
body
endDistributed.@everywhere — Macro@everywhere [procs()] exprFühren Sie einen Ausdruck unter Main auf allen procs aus. Fehler in einem der Prozesse werden in eine CompositeException gesammelt und ausgelöst. Zum Beispiel:
@everywhere bar = 1definiert Main.bar auf allen aktuellen Prozessen. Alle später hinzugefügten Prozesse (zum Beispiel mit addprocs()) haben den Ausdruck nicht definiert.
Im Gegensatz zu @spawnat erfasst @everywhere keine lokalen Variablen. Stattdessen können lokale Variablen mit Interpolation übertragen werden:
foo = 1
@everywhere bar = $fooDas optionale Argument procs ermöglicht es, eine Teilmenge aller Prozesse anzugeben, die den Ausdruck ausführen sollen.
Ähnlich wie beim Aufruf von remotecall_eval(Main, procs, expr), jedoch mit zwei zusätzlichen Funktionen:
- `using` und `import` Anweisungen werden zuerst im aufrufenden Prozess ausgeführt, um sicherzustellen,
dass Pakete vorcompiliert sind.
- Der aktuelle Quelldateipfad, der von `include` verwendet wird, wird an andere Prozesse weitergegeben.Distributed.remoteref_id — Functionremoteref_id(r::AbstractRemoteRef) -> RRIDFutures und RemoteChannels werden durch Felder identifiziert:
where- bezieht sich auf den Knoten, an dem das zugrunde liegende Objekt/Speicher, auf das die Referenz verweist, tatsächlich existiert.whence- bezieht sich auf den Knoten, von dem die Remote-Referenz erstellt wurde. Beachten Sie, dass dies sich von dem Knoten unterscheidet, an dem das zugrunde liegende Objekt, auf das verwiesen wird, tatsächlich existiert. Zum Beispiel würde der Aufruf vonRemoteChannel(2)vom Master-Prozess zu einemwhere-Wert von 2 und einemwhence-Wert von 1 führen.idist einzigartig für alle Referenzen, die von dem Worker erstellt wurden, der durchwhenceangegeben ist.
Zusammen identifizieren whence und id eindeutig eine Referenz über alle Worker hinweg.
remoteref_id ist eine Low-Level-API, die ein RRID-Objekt zurückgibt, das die whence- und id-Werte einer Remote-Referenz umschließt.
Distributed.channel_from_id — Functionchannel_from_id(id) -> cEine Low-Level-API, die das zugrunde liegende AbstractChannel für eine id zurückgibt, die von remoteref_id zurückgegeben wird. Der Aufruf ist nur auf dem Knoten gültig, auf dem der zugrunde liegende Kanal existiert.
Distributed.worker_id_from_socket — Functionworker_id_from_socket(s) -> pidEine Low-Level-API, die, gegeben eine IO-Verbindung oder einen Worker, die pid des Workers zurückgibt, mit dem sie verbunden ist. Dies ist nützlich, wenn benutzerdefinierte serialize Methoden für einen Typ geschrieben werden, die die geschriebenen Daten je nach Prozess-ID des empfangenden Prozesses optimieren.
Distributed.cluster_cookie — Methodcluster_cookie() -> cookieGibt das Cluster-Cookie zurück.
Distributed.cluster_cookie — Methodcluster_cookie(cookie) -> cookieSetzt das übergebene Cookie als das Cluster-Cookie und gibt es dann zurück.
Cluster Manager Interface
Diese Schnittstelle bietet einen Mechanismus zum Starten und Verwalten von Julia-Arbeitern in verschiedenen Cluster-Umgebungen. Es gibt zwei Arten von Managern in Base: LocalManager, um zusätzliche Arbeiter auf demselben Host zu starten, und SSHManager, um auf entfernten Hosts über ssh zu starten. TCP/IP-Sockets werden verwendet, um Prozesse zu verbinden und Nachrichten zwischen ihnen zu transportieren. Es ist möglich, dass Cluster-Manager einen anderen Transport bereitstellen.
Distributed.ClusterManager — TypeClusterManagerSupertyp für Cluster-Manager, die Arbeitsprozesse als Cluster steuern. Cluster-Manager implementieren, wie Arbeiter hinzugefügt, entfernt und kommuniziert werden können. SSHManager und LocalManager sind Subtypen davon.
Distributed.WorkerConfig — TypeWorkerConfigTyp, der von ClusterManagers verwendet wird, um die zu ihren Clustern hinzugefügten Worker zu steuern. Einige Felder werden von allen Cluster-Managern verwendet, um auf einen Host zuzugreifen:
io– die Verbindung, die verwendet wird, um auf den Worker zuzugreifen (ein Subtyp vonIOoderNothing)host– die Hostadresse (entweder einStringoderNothing)port– der Port auf dem Host, der verwendet wird, um eine Verbindung zum Worker herzustellen (entweder einIntoderNothing)
Einige werden vom Cluster-Manager verwendet, um Worker zu einem bereits initialisierten Host hinzuzufügen:
count– die Anzahl der Worker, die auf dem Host gestartet werden sollenexename– der Pfad zur Julia-Ausführungsdatei auf dem Host, standardmäßig"$(Sys.BINDIR)/julia"oder"$(Sys.BINDIR)/julia-debug"exeflags– Flags, die beim Starten von Julia remote verwendet werden
Das Feld userdata wird verwendet, um Informationen für jeden Worker von externen Managern zu speichern.
Einige Felder werden von SSHManager und ähnlichen Managern verwendet:
tunnel–true(Tunneling verwenden),false(kein Tunneling verwenden) odernothing(Standard für den Manager verwenden)multiplex–true(SSH-Multiplexing für Tunneling verwenden) oderfalseforward– die Weiterleitungsoption, die für die-L-Option von ssh verwendet wirdbind_addr– die Adresse auf dem Remote-Host, an die gebunden werden sollsshflags– Flags, die beim Herstellen der SSH-Verbindung verwendet werdenmax_parallel– die maximale Anzahl von Workern, die parallel auf dem Host verbunden werden sollen
Einige Felder werden sowohl von LocalManagers als auch von SSHManagers verwendet:
connect_at– bestimmt, ob dies ein Worker-zu-Worker- oder Driver-zu-Worker-Setup-Aufruf istprocess– der Prozess, der verbunden werden soll (normalerweise weist der Manager dies währendaddprocszu)ospid– die Prozess-ID gemäß dem Host-OS, die verwendet wird, um Worker-Prozesse zu unterbrechenenviron– privates Wörterbuch, das von Local/SSH-Managern verwendet wird, um temporäre Informationen zu speichernident– Worker, wie er vomClusterManageridentifiziert wirdconnect_idents– Liste der Worker-IDs, mit denen der Worker verbunden werden muss, wenn eine benutzerdefinierte Topologie verwendet wirdenable_threaded_blas–true,falseodernothing, ob auf den Workern threaded BLAS verwendet werden soll oder nicht
Distributed.launch — Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)Implementiert von Cluster-Managern. Für jeden Julia-Arbeiter, der von dieser Funktion gestartet wird, sollte ein WorkerConfig-Eintrag zu launched hinzugefügt und launch_ntfy benachrichtigt werden. Die Funktion MUSS beenden, sobald alle Arbeiter, die von manager angefordert wurden, gestartet wurden. params ist ein Wörterbuch aller Schlüsselargumente, mit denen addprocs aufgerufen wurde.
Distributed.manage — Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)Implementiert von Cluster-Managern. Es wird im Master-Prozess während der Lebensdauer eines Workers mit den entsprechenden op-Werten aufgerufen:
- mit
:register/:deregister, wenn ein Worker zum Julia-Worker-Pool hinzugefügt / entfernt wird. - mit
:interrupt, wenninterrupt(workers)aufgerufen wird. DerClusterManagersollte den entsprechenden Worker mit einem Interrupt-Signal benachrichtigen. - mit
:finalizezu Reinigungszwecken.
Base.kill — Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)Wird von Cluster-Managern implementiert. Es wird im Master-Prozess von rmprocs aufgerufen. Es sollte dazu führen, dass der entfernte Worker, der durch pid angegeben ist, beendet wird. kill(manager::ClusterManager.....) führt ein entferntes exit() auf pid aus.
Sockets.connect — Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)Implementiert von Cluster-Managern mit benutzerdefinierten Transporten. Es sollte eine logische Verbindung zum Worker mit der ID pid, die durch config angegeben ist, hergestellt werden und ein Paar von IO-Objekten zurückgeben. Nachrichten von pid an den aktuellen Prozess werden von instrm gelesen, während Nachrichten, die an pid gesendet werden sollen, in outstrm geschrieben werden. Die Implementierung des benutzerdefinierten Transports muss sicherstellen, dass Nachrichten vollständig und in der richtigen Reihenfolge zugestellt und empfangen werden. connect(manager::ClusterManager.....) richtet TCP/IP-Socket-Verbindungen zwischen den Workern ein.
Distributed.init_worker — Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())Aufgerufen von Cluster-Managern, die benutzerdefinierte Transports implementieren. Es initialisiert einen neu gestarteten Prozess als Arbeiter. Das Befehlszeilenargument --worker[=<cookie>] hat die Wirkung, einen Prozess als Arbeiter zu initialisieren, der TCP/IP-Sockets für den Transport verwendet. cookie ist ein cluster_cookie.
Distributed.start_worker — Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker ist eine interne Funktion, die den Standard-Einstiegspunkt für Arbeitsprozesse darstellt, die sich über TCP/IP verbinden. Sie richtet den Prozess als Julia-Cluster-Arbeiter ein.
Host:Port-Informationen werden in den Stream out geschrieben (standardmäßig stdout).
Die Funktion liest das Cookie von stdin, falls erforderlich, und hört auf einem freien Port (oder, falls angegeben, dem Port in der --bind-to-Befehlszeilenoption) und plant Aufgaben zur Verarbeitung eingehender TCP-Verbindungen und -Anfragen. Sie schließt auch (optional) stdin und leitet stderr an stdout um.
Sie gibt nichts zurück.
Distributed.process_messages — Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)Wird von Cluster-Managern mit benutzerdefinierten Transporten aufgerufen. Es sollte aufgerufen werden, wenn die benutzerdefinierte Transportimplementierung die erste Nachricht von einem entfernten Worker erhält. Der benutzerdefinierte Transport muss eine logische Verbindung zum entfernten Worker verwalten und zwei IO-Objekte bereitstellen, eines für eingehende Nachrichten und das andere für an den entfernten Worker adressierte Nachrichten. Wenn incoming true ist, hat der entfernte Peer die Verbindung initiiert. Derjenige, der die Verbindung initiiert, sendet das Cluster-Cookie und seine Julia-Versionnummer, um den Authentifizierungs-Handschlag durchzuführen.
Siehe auch cluster_cookie.
Distributed.default_addprocs_params — Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}Implementiert von Cluster-Managern. Die Standard-Keyword-Parameter, die beim Aufruf von addprocs(mgr) übergeben werden. Die minimale Menge an Optionen ist verfügbar, indem default_addprocs_params() aufgerufen wird.