Distributed Computing

Distributed.addprocsFunction
addprocs(manager::ClusterManager; kwargs...) -> Liste der Prozessidentifikatoren

Startet Arbeitsprozesse über den angegebenen Cluster-Manager.

Zum Beispiel werden Beowulf-Cluster über einen benutzerdefinierten Cluster-Manager unterstützt, der im Paket ClusterManagers.jl implementiert ist.

Die Anzahl der Sekunden, die ein neu gestarteter Arbeiter auf die Verbindungsherstellung vom Master wartet, kann über die Variable JULIA_WORKER_TIMEOUT in der Umgebung des Arbeiterprozesses angegeben werden. Relevant nur bei Verwendung von TCP/IP als Transport.

Um Arbeiter zu starten, ohne die REPL oder die umgebende Funktion zu blockieren, wenn Arbeiter programmgesteuert gestartet werden, führen Sie addprocs in seiner eigenen Aufgabe aus.

Beispiele

# In beschäftigten Clustern `addprocs` asynchron aufrufen
t = @async addprocs(...)
# Arbeiter nutzen, sobald sie online sind
if nprocs() > 1   # Sicherstellen, dass mindestens ein neuer Arbeiter verfügbar ist
   ....   # verteilte Ausführung durchführen
end
# Neu gestartete Arbeiter-IDs oder Fehlermeldungen abrufen
if istaskdone(t)   # Überprüfen, ob `addprocs` abgeschlossen ist, um sicherzustellen, dass `fetch` nicht blockiert
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Liste der Prozessidentifikatoren

Fügen Sie Arbeitsprozesse auf Remote-Maschinen über SSH hinzu. Die Konfiguration erfolgt über Schlüsselwortargumente (siehe unten). Insbesondere kann das Schlüsselwort exename verwendet werden, um den Pfad zur julia-Binärdatei auf der Remote-Maschine(n) anzugeben.

machines ist ein Vektor von "Maschinenspezifikationen", die als Strings der Form [user@]host[:port] [bind_addr[:port]] angegeben werden. user ist standardmäßig der aktuelle Benutzer und port der Standard-SSH-Port. Wenn [bind_addr[:port]] angegeben ist, stellen andere Arbeiter eine Verbindung zu diesem Arbeiter über die angegebene bind_addr und port her.

Es ist möglich, mehrere Prozesse auf einem Remote-Host zu starten, indem ein Tupel im machines-Vektor oder die Form (machine_spec, count) verwendet wird, wobei count die Anzahl der zu startenden Arbeiter auf dem angegebenen Host ist. Das Übergeben von :auto als Arbeiteranzahl startet so viele Arbeiter wie CPU-Threads auf dem Remote-Host.

Beispiele:

addprocs([
    "remote1",               # ein Arbeiter auf 'remote1', der sich mit dem aktuellen Benutzernamen anmeldet
    "user@remote2",          # ein Arbeiter auf 'remote2', der sich mit dem Benutzernamen 'user' anmeldet
    "user@remote3:2222",     # SSH-Port auf '2222' für 'remote3' angeben
    ("user@remote4", 4),     # 4 Arbeiter auf 'remote4' starten
    ("user@remote5", :auto), # so viele Arbeiter wie CPU-Threads auf 'remote5' starten
])

Schlüsselwortargumente:

  • tunnel: wenn true, wird SSH-Tunneling verwendet, um sich vom Master-Prozess mit dem Arbeiter zu verbinden. Standard ist false.

  • multiplex: wenn true, wird SSH-Multiplexing für SSH-Tunneling verwendet. Standard ist false.

  • ssh: der Name oder Pfad des SSH-Client-Executables, das zum Starten der Arbeiter verwendet wird. Standard ist "ssh".

  • sshflags: gibt zusätzliche SSH-Optionen an, z.B. sshflags=`-i /home/foo/bar.pem`

  • max_parallel: gibt die maximale Anzahl von Arbeitern an, die parallel zu einem Host verbunden sind. Standardmäßig 10.

  • shell: gibt den Typ der Shell an, mit der SSH sich bei den Arbeitern verbindet.

    • shell=:posix: eine POSIX-kompatible Unix/Linux-Shell (sh, ksh, bash, dash, zsh usw.). Der Standard.
    • shell=:csh: eine Unix C-Shell (csh, tcsh).
    • shell=:wincmd: Microsoft Windows cmd.exe.
  • dir: gibt das Arbeitsverzeichnis auf den Arbeitern an. Standardmäßig das aktuelle Verzeichnis des Hosts (wie von pwd() gefunden).

  • enable_threaded_blas: wenn true, wird BLAS in mehreren Threads in hinzugefügten Prozessen ausgeführt. Standard ist false.

  • exename: Name der julia-Executable. Standard ist "$(Sys.BINDIR)/julia" oder "$(Sys.BINDIR)/julia-debug" je nach Fall. Es wird empfohlen, auf allen Remote-Maschinen eine gemeinsame Julia-Version zu verwenden, da sonst die Serialisierung und der Codevertrieb fehlschlagen könnten.

  • exeflags: zusätzliche Flags, die an die Arbeiterprozesse übergeben werden.

  • topology: Gibt an, wie die Arbeiter miteinander verbunden sind. Das Senden einer Nachricht zwischen nicht verbundenen Arbeitern führt zu einem Fehler.

    • topology=:all_to_all: Alle Prozesse sind miteinander verbunden. Der Standard.
    • topology=:master_worker: Nur der Treiberprozess, d.h. pid 1, verbindet sich mit den Arbeitern. Die Arbeiter verbinden sich nicht untereinander.
    • topology=:custom: Die launch-Methode des Cluster-Managers gibt die Verbindungstopologie über die Felder ident und connect_idents in WorkerConfig an. Ein Arbeiter mit einer Cluster-Manager-Identität ident verbindet sich mit allen Arbeitern, die in connect_idents angegeben sind.
  • lazy: Anwendbar nur mit topology=:all_to_all. Wenn true, werden die Verbindungen zwischen Arbeitern faul eingerichtet, d.h. sie werden beim ersten Remote-Aufruf zwischen Arbeitern eingerichtet. Standard ist true.

  • env: Stellen Sie ein Array von String-Paaren wie env=["JULIA_DEPOT_PATH"=>"/depot"] bereit, um zu verlangen, dass Umgebungsvariablen auf der Remote-Maschine gesetzt werden. Standardmäßig wird nur die Umgebungsvariable JULIA_WORKER_TIMEOUT automatisch von der lokalen in die Remote-Umgebung übergeben.

  • cmdline_cookie: Übergeben Sie das Authentifizierungscookie über die --worker-Befehlszeilenoption. Das (sicherere) Standardverhalten, das Cookie über SSH stdio zu übergeben, kann bei Windows-Arbeitern, die ältere (vor ConPTY) Julia- oder Windows-Versionen verwenden, hängen bleiben, in diesem Fall bietet cmdline_cookie=true eine Umgehungslösung.

Julia 1.6

Die Schlüsselwortargumente ssh, shell, env und cmdline_cookie wurden in Julia 1.6 hinzugefügt.

Umgebungsvariablen:

Wenn der Master-Prozess innerhalb von 60,0 Sekunden keine Verbindung zu einem neu gestarteten Arbeiter herstellen kann, betrachtet der Arbeiter dies als eine fatale Situation und beendet sich. Dieses Timeout kann über die Umgebungsvariable JULIA_WORKER_TIMEOUT gesteuert werden. Der Wert von JULIA_WORKER_TIMEOUT im Master-Prozess gibt die Anzahl der Sekunden an, die ein neu gestarteter Arbeiter auf die Herstellung der Verbindung wartet.

source
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Liste der Prozess-Identifikatoren

Starten Sie np Arbeiter auf dem lokalen Host mit dem integrierten LocalManager.

Lokale Arbeiter erben die aktuelle Paketumgebung (d.h. aktives Projekt, LOAD_PATH und DEPOT_PATH) vom Hauptprozess.

Warning

Beachten Sie, dass Arbeiter kein ~/.julia/config/startup.jl Startskript ausführen und ihren globalen Zustand (wie Befehlszeilenoptionen, globale Variablen, neue Methodendefinitionen und geladene Module) nicht mit anderen laufenden Prozessen synchronisieren.

Schlüsselwortargumente:

  • restrict::Bool: wenn true (Standard) ist die Bindung auf 127.0.0.1 beschränkt.
  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas: derselbe Effekt wie bei SSHManager, siehe Dokumentation für addprocs(machines::AbstractVector).
Julia 1.9

Das Erben der Paketumgebung und das Schlüsselwortargument env wurden in Julia 1.9 hinzugefügt.

source
Distributed.nprocsFunction
nprocs()

Holen Sie die Anzahl der verfügbaren Prozesse.

Beispiele

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.nworkersFunction
nworkers()

Holen Sie die Anzahl der verfügbaren Arbeitsprozesse. Dies ist eins weniger als nprocs(). Gleich nprocs(), wenn nprocs() == 1.

Beispiele

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
source
Distributed.procsMethod
procs()

Gibt eine Liste aller Prozess-Identifikatoren zurück, einschließlich pid 1 (die nicht von workers() eingeschlossen ist).

Beispiele

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source
Distributed.procsMethod
procs(pid::Integer)

Gibt eine Liste aller Prozessidentifikatoren auf demselben physischen Knoten zurück. Insbesondere werden alle Arbeiter zurückgegeben, die an dieselbe IP-Adresse wie pid gebunden sind.

source
Distributed.workersFunction
workers()

Gibt eine Liste aller Arbeitsprozess-Identifikatoren zurück.

Beispiele

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.rmprocsFunction
rmprocs(pids...; waitfor=typemax(Int))

Entfernen Sie die angegebenen Arbeiter. Beachten Sie, dass nur Prozess 1 Arbeiter hinzufügen oder entfernen kann.

Das Argument waitfor gibt an, wie lange gewartet werden soll, bis die Arbeiter heruntergefahren sind:

  • Wenn nicht angegeben, wartet rmprocs, bis alle angeforderten pids entfernt sind.
  • Eine ErrorException wird ausgelöst, wenn nicht alle Arbeiter vor den angeforderten waitfor Sekunden beendet werden können.
  • Mit einem waitfor-Wert von 0 gibt der Aufruf sofort zurück, wobei die Arbeiter in einer anderen Aufgabe zur Entfernung geplant sind. Das geplante Task Objekt wird zurückgegeben. Der Benutzer sollte wait auf der Aufgabe aufrufen, bevor er andere parallele Aufrufe tätigt.

Beispiele

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source
Distributed.interruptFunction
interrupt(pids::Integer...)

Unterbricht die derzeit ausgeführte Aufgabe auf den angegebenen Arbeitern. Dies entspricht dem Drücken von Ctrl-C auf dem lokalen Computer. Wenn keine Argumente angegeben sind, werden alle Arbeiter unterbrochen.

source
interrupt(pids::AbstractVector=workers())

Unterbrechen Sie die aktuell ausgeführte Aufgabe auf den angegebenen Arbeitern. Dies entspricht dem Drücken von Ctrl-C auf dem lokalen Computer. Wenn keine Argumente angegeben sind, werden alle Arbeiter unterbrochen.

source
Distributed.myidFunction
myid()

Holen Sie sich die ID des aktuellen Prozesses.

Beispiele

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source
Distributed.pmapFunction
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> Sammlung

Transformiere die Sammlung c, indem f auf jedes Element unter Verwendung verfügbarer Arbeiter und Aufgaben angewendet wird.

Bei mehreren Sammlungsargumenten wird f elementweise angewendet.

Beachte, dass f allen Arbeiterprozessen zur Verfügung stehen muss; siehe Codeverfügbarkeit und Laden von Paketen für Details.

Wenn kein Arbeiterpool angegeben ist, werden alle verfügbaren Arbeiter über einen CachingPool verwendet.

Standardmäßig verteilt pmap die Berechnung über alle angegebenen Arbeiter. Um nur den lokalen Prozess zu verwenden und über Aufgaben zu verteilen, gib distributed=false an. Dies entspricht der Verwendung von asyncmap. Zum Beispiel ist pmap(f, c; distributed=false) äquivalent zu asyncmap(f,c; ntasks=()->nworkers())

pmap kann auch eine Mischung aus Prozessen und Aufgaben über das Argument batch_size verwenden. Bei Batchgrößen größer als 1 wird die Sammlung in mehreren Batches verarbeitet, die jeweils die Länge batch_size oder weniger haben. Ein Batch wird als einzelne Anfrage an einen freien Arbeiter gesendet, wo eine lokale asyncmap Elemente aus dem Batch mit mehreren gleichzeitigen Aufgaben verarbeitet.

Ein Fehler stoppt pmap daran, den Rest der Sammlung zu verarbeiten. Um dieses Verhalten zu überschreiben, kannst du eine Fehlerbehandlungsfunktion über das Argument on_error angeben, die ein einzelnes Argument, d.h. die Ausnahme, entgegennimmt. Die Funktion kann die Verarbeitung stoppen, indem sie den Fehler erneut auslöst, oder, um fortzufahren, einen beliebigen Wert zurückgeben, der dann inline mit den Ergebnissen an den Aufrufer zurückgegeben wird.

Betrachte die folgenden zwei Beispiele. Das erste gibt das Ausnahmeobjekt inline zurück, das zweite eine 0 anstelle einer Ausnahme:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Fehler können auch behandelt werden, indem fehlgeschlagene Berechnungen erneut versucht werden. Die Schlüsselwortargumente retry_delays und retry_check werden als Schlüsselwortargumente delays und check an retry weitergegeben. Wenn das Batching angegeben ist und ein gesamter Batch fehlschlägt, werden alle Elemente im Batch erneut versucht.

Beachte, dass wenn sowohl on_error als auch retry_delays angegeben sind, der on_error Hook vor dem erneuten Versuch aufgerufen wird. Wenn on_error keine Ausnahme auslöst (oder erneut auslöst), wird das Element nicht erneut versucht.

Beispiel: Bei Fehlern, versuche f auf einem Element maximal 3 Mal ohne Verzögerung zwischen den Versuchen.

pmap(f, c; retry_delays = zeros(3))

Beispiel: Versuche f nur, wenn die Ausnahme nicht vom Typ InexactError ist, mit exponentiell zunehmenden Verzögerungen bis zu 3 Mal. Gib ein NaN anstelle aller InexactError-Vorkommen zurück.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
source
Distributed.RemoteExceptionType
RemoteException(captured)

Ausnahmen bei Remote-Berechnungen werden erfasst und lokal erneut ausgelöst. Eine RemoteException umschließt die pid des Arbeiters und eine erfasste Ausnahme. Eine CapturedException erfasst die Remote-Ausnahme und eine serialisierbare Form des Aufrufstapels, als die Ausnahme ausgelöst wurde.

source
Distributed.ProcessExitedExceptionType
ProcessExitedException(worker_id::Int)

Nachdem ein Client-Julia-Prozess beendet wurde, führen weitere Versuche, auf das tote Kind zuzugreifen, zu dieser Ausnahme.

source
Distributed.FutureType
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Ein Future ist ein Platzhalter für eine einzelne Berechnung mit unbekanntem Abschlussstatus und -zeit. Für mehrere potenzielle Berechnungen siehe RemoteChannel. Siehe remoteref_id zur Identifizierung eines AbstractRemoteRef.

source
Distributed.RemoteChannelType
RemoteChannel(pid::Integer=myid())

Erstellt eine Referenz zu einem Channel{Any}(1) im Prozess pid. Der Standardwert für pid ist der aktuelle Prozess.

RemoteChannel(f::Function, pid::Integer=myid())

Erstellt Referenzen zu Remote-Kanälen einer bestimmten Größe und Art. f ist eine Funktion, die bei Ausführung auf pid eine Implementierung eines AbstractChannel zurückgeben muss.

Zum Beispiel wird RemoteChannel(()->Channel{Int}(10), pid) eine Referenz zu einem Kanal vom Typ Int und der Größe 10 auf pid zurückgeben.

Der Standardwert für pid ist der aktuelle Prozess.

source
Base.fetchMethod
fetch(x::Future)

Warten Sie auf und erhalten Sie den Wert eines Future. Der abgerufene Wert wird lokal zwischengespeichert. Weitere Aufrufe von fetch auf demselben Verweis geben den zwischengespeicherten Wert zurück. Wenn der entfernte Wert eine Ausnahme ist, wird eine RemoteException ausgelöst, die die entfernte Ausnahme und den Rückverfolgungsstapel erfasst.

source
Base.fetchMethod
fetch(c::RemoteChannel)

Warten Sie auf und erhalten Sie einen Wert von einem RemoteChannel. Die ausgelösten Ausnahmen sind die gleichen wie bei einem Future. Entfernt das abgerufene Element nicht.

source
fetch(x::Any)

Gibt x zurück.

source
Distributed.remotecallMethod
remotecall(f, id::Integer, args...; kwargs...) -> Future

Rufe eine Funktion f asynchron mit den angegebenen Argumenten auf dem angegebenen Prozess auf. Gibt ein Future zurück. Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, id::Integer, args...; kwargs...)

Führen Sie ein schnelleres wait(remotecall(...)) in einer Nachricht auf dem Worker aus, der durch die Worker-ID id angegeben ist. Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben.

Siehe auch wait und remotecall. ```

source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, id::Integer, args...; kwargs...)

Führt fetch(remotecall(...)) in einer Nachricht aus. Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben. Alle Remote-Ausnahmen werden in einer RemoteException erfasst und ausgelöst.

Siehe auch fetch und remotecall.

Beispiele

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: Auf Arbeiter 2:
DomainError mit -4.0:
sqrt wurde mit einem negativen reellen Argument aufgerufen, gibt jedoch nur ein komplexes Ergebnis zurück, wenn es mit einem komplexen Argument aufgerufen wird. Versuchen Sie sqrt(Complex(x)).
...
source
Distributed.remote_doMethod
remote_do(f, id::Integer, args...; kwargs...) -> nothing

Führt f asynchron auf dem Worker id aus. Im Gegensatz zu remotecall wird das Ergebnis der Berechnung nicht gespeichert, noch gibt es eine Möglichkeit, auf dessen Abschluss zu warten.

Ein erfolgreicher Aufruf zeigt an, dass die Anfrage zur Ausführung auf dem Remote-Knoten akzeptiert wurde.

Während aufeinanderfolgende remotecalls an denselben Worker in der Reihenfolge, in der sie aufgerufen werden, serialisiert werden, ist die Reihenfolge der Ausführungen auf dem Remote-Worker unbestimmt. Zum Beispiel wird remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) den Aufruf von f1 serialisieren, gefolgt von f2 und f3 in dieser Reihenfolge. Es ist jedoch nicht garantiert, dass f1 vor f3 auf Worker 2 ausgeführt wird.

Alle Ausnahmen, die von f ausgelöst werden, werden auf stderr des Remote-Workers ausgegeben.

Schlüsselwortargumente, falls vorhanden, werden an f weitergegeben.

source
Base.put!Method
put!(rr::RemoteChannel, args...)

Speichern Sie eine Menge von Werten im RemoteChannel. Wenn der Kanal voll ist, blockiert er, bis Platz verfügbar ist. Gibt das erste Argument zurück.

source
Base.put!Method
put!(rr::Future, v)

Speichern Sie einen Wert in einer Future rr. Futures sind einmal schreibbare Remote-Referenzen. Ein put! auf einer bereits gesetzten Future wirft eine Exception. Alle asynchronen Remote-Aufrufe geben Futures zurück und setzen den Wert auf den Rückgabewert des Aufrufs nach Abschluss.

source
Base.isreadyMethod
isready(rr::RemoteChannel, args...)

Bestimmen Sie, ob ein RemoteChannel einen Wert gespeichert hat. Beachten Sie, dass diese Funktion zu Wettlaufbedingungen führen kann, da es sein kann, dass es nicht mehr zutrifft, bis Sie das Ergebnis erhalten. Es kann jedoch sicher auf einem Future verwendet werden, da diese nur einmal zugewiesen werden.

source
Base.isreadyMethod
isready(rr::Future)

Bestimmen Sie, ob ein Future einen Wert gespeichert hat.

Wenn das Argument Future von einem anderen Knoten besessen wird, wird dieser Aufruf blockieren, um auf die Antwort zu warten. Es wird empfohlen, auf rr in einer separaten Aufgabe zu warten oder einen lokalen Channel als Proxy zu verwenden:

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # wird nicht blockieren
source
Distributed.AbstractWorkerPoolType
AbstractWorkerPool

Supertyp für Arbeiterpools wie WorkerPool und CachingPool. Ein AbstractWorkerPool sollte implementieren:

  • push! - einen neuen Arbeiter zum gesamten Pool hinzufügen (verfügbar + beschäftigt)
  • put! - einen Arbeiter zurück in den verfügbaren Pool geben
  • take! - einen Arbeiter aus dem verfügbaren Pool nehmen (um ihn für die Ausführung von Remote-Funktionen zu verwenden)
  • length - Anzahl der im gesamten Pool verfügbaren Arbeiter
  • isready - false zurückgeben, wenn ein take! auf dem Pool blockieren würde, sonst true

Die Standardimplementierungen der oben genannten Methoden (auf einem AbstractWorkerPool) erfordern die Felder

  • channel::Channel{Int}
  • workers::Set{Int}

wobei channel die freien Arbeiter-pids enthält und workers die Menge aller Arbeiter ist, die mit diesem Pool verbunden sind.

source
Distributed.WorkerPoolType
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

Erstellen Sie einen WorkerPool aus einem Vektor oder Bereich von Arbeiter-IDs.

Beispiele

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
source
Distributed.CachingPoolType
CachingPool(workers::Vector{Int})

Eine Implementierung eines AbstractWorkerPool. remote, remotecall_fetch, pmap (und andere Remoteaufrufe, die Funktionen remote ausführen) profitieren davon, die serialisierten/deserialisierten Funktionen auf den Arbeitsknoten zwischenzuspeichern, insbesondere Closures (die große Datenmengen erfassen können).

Der Remote-Cache wird für die Lebensdauer des zurückgegebenen CachingPool-Objekts aufrechterhalten. Um den Cache früher zu leeren, verwenden Sie clear!(pool).

Für globale Variablen werden nur die Bindungen in einem Closure erfasst, nicht die Daten. let-Blöcke können verwendet werden, um globale Daten zu erfassen.

Beispiele

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

Das obige würde foo nur einmal an jeden Arbeiter übertragen.

source
Distributed.default_worker_poolFunction
default_worker_pool()

AbstractWorkerPool mit inaktiven workers - verwendet von remote(f) und pmap (standardmäßig). Es sei denn, es wird explizit über default_worker_pool!(pool) festgelegt, wird der Standard-Worker-Pool auf einen WorkerPool initialisiert.

Beispiele

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source
Distributed.clear!Function
clear!(syms, pids=workers(); mod=Main)

Löscht globale Bindungen in Modulen, indem sie auf nothing initialisiert werden. syms sollte vom Typ Symbol oder einer Sammlung von Symbols sein. pids und mod identifizieren die Prozesse und das Modul, in dem globale Variablen neu initialisiert werden sollen. Nur die Namen, die unter mod definiert sind, werden gelöscht.

Eine Ausnahme wird ausgelöst, wenn ein globaler Konstante angefordert wird, gelöscht zu werden.

source
clear!(pool::CachingPool) -> pool

Entfernt alle zwischengespeicherten Funktionen von allen beteiligten Arbeitern.

source
Distributed.remoteFunction
remote([p::AbstractWorkerPool], f) -> Funktion

Gibt eine anonyme Funktion zurück, die die Funktion f auf einem verfügbaren Arbeiter (aus dem WorkerPool p, falls angegeben) unter Verwendung von remotecall_fetch ausführt.

source
Distributed.remotecallMethod
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool Variante von remotecall(f, pid, ....). Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool und führen Sie einen remotecall darauf aus.

Beispiele

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

In diesem Beispiel wurde die Aufgabe auf pid 2 ausgeführt, aufgerufen von pid 1.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool Variante von remotecall_wait(f, pid, ....). Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool und führen Sie ein remotecall_wait darauf aus.

Beispiele

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool Variante von remotecall_fetch(f, pid, ....). Wartet auf und nimmt einen freien Arbeiter aus pool und führt ein remotecall_fetch darauf aus.

Beispiele

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source
Distributed.remote_doMethod
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nichts

WorkerPool Variante von remote_do(f, pid, ....). Warten Sie auf und nehmen Sie einen freien Arbeiter aus pool und führen Sie ein remote_do darauf aus.

source
Distributed.@spawnMacro
@spawn expr

Erstellt einen Closure um einen Ausdruck und führt ihn auf einem automatisch gewählten Prozess aus, wobei ein Future zum Ergebnis zurückgegeben wird. Dieses Makro ist veraltet; @spawnat :any expr sollte stattdessen verwendet werden.

Beispiele

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nichts)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nichts)

julia> fetch(f)
3
Julia 1.3

Ab Julia 1.3 ist dieses Makro veraltet. Verwenden Sie stattdessen @spawnat :any.

source
Distributed.@spawnatMacro
@spawnat p expr

Erstellen Sie einen Closure um einen Ausdruck und führen Sie den Closure asynchron auf dem Prozess p aus. Geben Sie ein Future zum Ergebnis zurück. Wenn p das zitierte literale Symbol :any ist, wählt das System automatisch einen Prozessor aus.

Beispiele

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

Das Argument :any ist seit Julia 1.3 verfügbar.

source
Distributed.@fetchMacro
@fetch expr

Entspricht fetch(@spawnat :any expr). Siehe fetch und @spawnat.

Beispiele

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source
Distributed.@distributedMacro
@distributed

Eine verteilte Speicher-, parallele Schleife der Form:

@distributed [reducer] for var = range
    body
end

Der angegebene Bereich wird partitioniert und lokal über alle Arbeiter ausgeführt. Falls eine optionale Reduzierfunktion angegeben ist, führt @distributed lokale Reduzierungen auf jedem Arbeiter durch, gefolgt von einer endgültigen Reduzierung im aufrufenden Prozess.

Beachten Sie, dass @distributed ohne eine Reduzierfunktion asynchron ausgeführt wird, d.h. es startet unabhängige Aufgaben auf allen verfügbaren Arbeitern und gibt sofort zurück, ohne auf den Abschluss zu warten. Um auf den Abschluss zu warten, setzen Sie den Aufruf mit @sync in der Form:

@sync @distributed for var = range
    body
end
source
Distributed.@everywhereMacro
@everywhere [procs()] expr

Führen Sie einen Ausdruck unter Main auf allen procs aus. Fehler in einem der Prozesse werden in eine CompositeException gesammelt und ausgelöst. Zum Beispiel:

@everywhere bar = 1

definiert Main.bar auf allen aktuellen Prozessen. Alle später hinzugefügten Prozesse (zum Beispiel mit addprocs()) haben den Ausdruck nicht definiert.

Im Gegensatz zu @spawnat erfasst @everywhere keine lokalen Variablen. Stattdessen können lokale Variablen mit Interpolation übertragen werden:

foo = 1
@everywhere bar = $foo

Das optionale Argument procs ermöglicht es, eine Teilmenge aller Prozesse anzugeben, die den Ausdruck ausführen sollen.

Ähnlich wie beim Aufruf von remotecall_eval(Main, procs, expr), jedoch mit zwei zusätzlichen Funktionen:

- `using` und `import` Anweisungen werden zuerst im aufrufenden Prozess ausgeführt, um sicherzustellen,
  dass Pakete vorcompiliert sind.
- Der aktuelle Quelldateipfad, der von `include` verwendet wird, wird an andere Prozesse weitergegeben.
source
Distributed.remoteref_idFunction
remoteref_id(r::AbstractRemoteRef) -> RRID

Futures und RemoteChannels werden durch Felder identifiziert:

  • where - bezieht sich auf den Knoten, an dem das zugrunde liegende Objekt/Speicher, auf das die Referenz verweist, tatsächlich existiert.
  • whence - bezieht sich auf den Knoten, von dem die Remote-Referenz erstellt wurde. Beachten Sie, dass dies sich von dem Knoten unterscheidet, an dem das zugrunde liegende Objekt, auf das verwiesen wird, tatsächlich existiert. Zum Beispiel würde der Aufruf von RemoteChannel(2) vom Master-Prozess zu einem where-Wert von 2 und einem whence-Wert von 1 führen.
  • id ist einzigartig für alle Referenzen, die von dem Worker erstellt wurden, der durch whence angegeben ist.

Zusammen identifizieren whence und id eindeutig eine Referenz über alle Worker hinweg.

remoteref_id ist eine Low-Level-API, die ein RRID-Objekt zurückgibt, das die whence- und id-Werte einer Remote-Referenz umschließt.

source
Distributed.channel_from_idFunction
channel_from_id(id) -> c

Eine Low-Level-API, die das zugrunde liegende AbstractChannel für eine id zurückgibt, die von remoteref_id zurückgegeben wird. Der Aufruf ist nur auf dem Knoten gültig, auf dem der zugrunde liegende Kanal existiert.

source
Distributed.worker_id_from_socketFunction
worker_id_from_socket(s) -> pid

Eine Low-Level-API, die, gegeben eine IO-Verbindung oder einen Worker, die pid des Workers zurückgibt, mit dem sie verbunden ist. Dies ist nützlich, wenn benutzerdefinierte serialize Methoden für einen Typ geschrieben werden, die die geschriebenen Daten je nach Prozess-ID des empfangenden Prozesses optimieren.

source

Cluster Manager Interface

Diese Schnittstelle bietet einen Mechanismus zum Starten und Verwalten von Julia-Arbeitern in verschiedenen Cluster-Umgebungen. Es gibt zwei Arten von Managern in Base: LocalManager, um zusätzliche Arbeiter auf demselben Host zu starten, und SSHManager, um auf entfernten Hosts über ssh zu starten. TCP/IP-Sockets werden verwendet, um Prozesse zu verbinden und Nachrichten zwischen ihnen zu transportieren. Es ist möglich, dass Cluster-Manager einen anderen Transport bereitstellen.

Distributed.ClusterManagerType
ClusterManager

Supertyp für Cluster-Manager, die Arbeitsprozesse als Cluster steuern. Cluster-Manager implementieren, wie Arbeiter hinzugefügt, entfernt und kommuniziert werden können. SSHManager und LocalManager sind Subtypen davon.

source
Distributed.WorkerConfigType
WorkerConfig

Typ, der von ClusterManagers verwendet wird, um die zu ihren Clustern hinzugefügten Worker zu steuern. Einige Felder werden von allen Cluster-Managern verwendet, um auf einen Host zuzugreifen:

  • io – die Verbindung, die verwendet wird, um auf den Worker zuzugreifen (ein Subtyp von IO oder Nothing)
  • host – die Hostadresse (entweder ein String oder Nothing)
  • port – der Port auf dem Host, der verwendet wird, um eine Verbindung zum Worker herzustellen (entweder ein Int oder Nothing)

Einige werden vom Cluster-Manager verwendet, um Worker zu einem bereits initialisierten Host hinzuzufügen:

  • count – die Anzahl der Worker, die auf dem Host gestartet werden sollen
  • exename – der Pfad zur Julia-Ausführungsdatei auf dem Host, standardmäßig "$(Sys.BINDIR)/julia" oder "$(Sys.BINDIR)/julia-debug"
  • exeflags – Flags, die beim Starten von Julia remote verwendet werden

Das Feld userdata wird verwendet, um Informationen für jeden Worker von externen Managern zu speichern.

Einige Felder werden von SSHManager und ähnlichen Managern verwendet:

  • tunneltrue (Tunneling verwenden), false (kein Tunneling verwenden) oder nothing (Standard für den Manager verwenden)
  • multiplextrue (SSH-Multiplexing für Tunneling verwenden) oder false
  • forward – die Weiterleitungsoption, die für die -L-Option von ssh verwendet wird
  • bind_addr – die Adresse auf dem Remote-Host, an die gebunden werden soll
  • sshflags – Flags, die beim Herstellen der SSH-Verbindung verwendet werden
  • max_parallel – die maximale Anzahl von Workern, die parallel auf dem Host verbunden werden sollen

Einige Felder werden sowohl von LocalManagers als auch von SSHManagers verwendet:

  • connect_at – bestimmt, ob dies ein Worker-zu-Worker- oder Driver-zu-Worker-Setup-Aufruf ist
  • process – der Prozess, der verbunden werden soll (normalerweise weist der Manager dies während addprocs zu)
  • ospid – die Prozess-ID gemäß dem Host-OS, die verwendet wird, um Worker-Prozesse zu unterbrechen
  • environ – privates Wörterbuch, das von Local/SSH-Managern verwendet wird, um temporäre Informationen zu speichern
  • ident – Worker, wie er vom ClusterManager identifiziert wird
  • connect_idents – Liste der Worker-IDs, mit denen der Worker verbunden werden muss, wenn eine benutzerdefinierte Topologie verwendet wird
  • enable_threaded_blastrue, false oder nothing, ob auf den Workern threaded BLAS verwendet werden soll oder nicht
source
Distributed.launchFunction
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Implementiert von Cluster-Managern. Für jeden Julia-Arbeiter, der von dieser Funktion gestartet wird, sollte ein WorkerConfig-Eintrag zu launched hinzugefügt und launch_ntfy benachrichtigt werden. Die Funktion MUSS beenden, sobald alle Arbeiter, die von manager angefordert wurden, gestartet wurden. params ist ein Wörterbuch aller Schlüsselargumente, mit denen addprocs aufgerufen wurde.

source
Distributed.manageFunction
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Implementiert von Cluster-Managern. Es wird im Master-Prozess während der Lebensdauer eines Workers mit den entsprechenden op-Werten aufgerufen:

  • mit :register/:deregister, wenn ein Worker zum Julia-Worker-Pool hinzugefügt / entfernt wird.
  • mit :interrupt, wenn interrupt(workers) aufgerufen wird. Der ClusterManager sollte den entsprechenden Worker mit einem Interrupt-Signal benachrichtigen.
  • mit :finalize zu Reinigungszwecken.
source
Base.killMethod
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Wird von Cluster-Managern implementiert. Es wird im Master-Prozess von rmprocs aufgerufen. Es sollte dazu führen, dass der entfernte Worker, der durch pid angegeben ist, beendet wird. kill(manager::ClusterManager.....) führt ein entferntes exit() auf pid aus.

source
Sockets.connectMethod
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implementiert von Cluster-Managern mit benutzerdefinierten Transporten. Es sollte eine logische Verbindung zum Worker mit der ID pid, die durch config angegeben ist, hergestellt werden und ein Paar von IO-Objekten zurückgeben. Nachrichten von pid an den aktuellen Prozess werden von instrm gelesen, während Nachrichten, die an pid gesendet werden sollen, in outstrm geschrieben werden. Die Implementierung des benutzerdefinierten Transports muss sicherstellen, dass Nachrichten vollständig und in der richtigen Reihenfolge zugestellt und empfangen werden. connect(manager::ClusterManager.....) richtet TCP/IP-Socket-Verbindungen zwischen den Workern ein.

source
Distributed.init_workerFunction
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Aufgerufen von Cluster-Managern, die benutzerdefinierte Transports implementieren. Es initialisiert einen neu gestarteten Prozess als Arbeiter. Das Befehlszeilenargument --worker[=<cookie>] hat die Wirkung, einen Prozess als Arbeiter zu initialisieren, der TCP/IP-Sockets für den Transport verwendet. cookie ist ein cluster_cookie.

source
Distributed.start_workerFunction
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker ist eine interne Funktion, die den Standard-Einstiegspunkt für Arbeitsprozesse darstellt, die sich über TCP/IP verbinden. Sie richtet den Prozess als Julia-Cluster-Arbeiter ein.

Host:Port-Informationen werden in den Stream out geschrieben (standardmäßig stdout).

Die Funktion liest das Cookie von stdin, falls erforderlich, und hört auf einem freien Port (oder, falls angegeben, dem Port in der --bind-to-Befehlszeilenoption) und plant Aufgaben zur Verarbeitung eingehender TCP-Verbindungen und -Anfragen. Sie schließt auch (optional) stdin und leitet stderr an stdout um.

Sie gibt nichts zurück.

source
Distributed.process_messagesFunction
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Wird von Cluster-Managern mit benutzerdefinierten Transporten aufgerufen. Es sollte aufgerufen werden, wenn die benutzerdefinierte Transportimplementierung die erste Nachricht von einem entfernten Worker erhält. Der benutzerdefinierte Transport muss eine logische Verbindung zum entfernten Worker verwalten und zwei IO-Objekte bereitstellen, eines für eingehende Nachrichten und das andere für an den entfernten Worker adressierte Nachrichten. Wenn incoming true ist, hat der entfernte Peer die Verbindung initiiert. Derjenige, der die Verbindung initiiert, sendet das Cluster-Cookie und seine Julia-Versionnummer, um den Authentifizierungs-Handschlag durchzuführen.

Siehe auch cluster_cookie.

source
Distributed.default_addprocs_paramsFunction
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

Implementiert von Cluster-Managern. Die Standard-Keyword-Parameter, die beim Aufruf von addprocs(mgr) übergeben werden. Die minimale Menge an Optionen ist verfügbar, indem default_addprocs_params() aufgerufen wird.

source