Distributed Computing
Distributed
— ModuleOutils pour le traitement parallèle distribué.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> Liste des identifiants de processus
Lance des processus de travail via le gestionnaire de cluster spécifié.
Par exemple, les clusters Beowulf sont pris en charge via un gestionnaire de cluster personnalisé implémenté dans le package ClusterManagers.jl
.
Le nombre de secondes qu'un nouveau travailleur attend pour établir une connexion avec le maître peut être spécifié via la variable JULIA_WORKER_TIMEOUT
dans l'environnement du processus de travailleur. Pertinent uniquement lors de l'utilisation de TCP/IP comme transport.
Pour lancer des travailleurs sans bloquer le REPL, ou la fonction contenant si vous lancez des travailleurs de manière programmatique, exécutez addprocs
dans sa propre tâche.
Exemples
# Sur des clusters occupés, appelez `addprocs` de manière asynchrone
t = @async addprocs(...)
# Utilisez les travailleurs au fur et à mesure qu'ils se connectent
if nprocs() > 1 # Assurez-vous qu'au moins un nouveau travailleur est disponible
.... # effectuez une exécution distribuée
end
# Récupérez les ID des travailleurs nouvellement lancés, ou tout message d'erreur
if istaskdone(t) # Vérifiez si `addprocs` a été complété pour s'assurer que `fetch` ne bloque pas
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Liste des identifiants de processus
Ajoutez des processus de travail sur des machines distantes via SSH. La configuration se fait avec des arguments de mots-clés (voir ci-dessous). En particulier, le mot-clé exename
peut être utilisé pour spécifier le chemin vers le binaire julia
sur la ou les machines distantes.
machines
est un vecteur de "spécifications de machine" qui sont données sous forme de chaînes du type [user@]host[:port] [bind_addr[:port]]
. user
par défaut est l'utilisateur actuel et port
le port SSH standard. Si [bind_addr[:port]]
est spécifié, d'autres travailleurs se connecteront à ce travailleur à l'adresse bind_addr
et au port spécifiés.
Il est possible de lancer plusieurs processus sur un hôte distant en utilisant un tuple dans le vecteur machines
ou la forme (machine_spec, count)
, où count
est le nombre de travailleurs à lancer sur l'hôte spécifié. Passer :auto
comme nombre de travailleurs lancera autant de travailleurs que le nombre de threads CPU sur l'hôte distant.
Exemples:
addprocs([
"remote1", # un travailleur sur 'remote1' se connectant avec le nom d'utilisateur actuel
"user@remote2", # un travailleur sur 'remote2' se connectant avec le nom d'utilisateur 'user'
"user@remote3:2222", # spécifiant le port SSH à '2222' pour 'remote3'
("user@remote4", 4), # lancer 4 travailleurs sur 'remote4'
("user@remote5", :auto), # lancer autant de travailleurs que de threads CPU sur 'remote5'
])
Arguments de mots-clés:
tunnel
: sitrue
, alors un tunnel SSH sera utilisé pour se connecter au travailleur depuis le processus maître. Par défaut, c'estfalse
.multiplex
: sitrue
, alors le multiplexage SSH est utilisé pour le tunnel SSH. Par défaut, c'estfalse
.ssh
: le nom ou le chemin de l'exécutable du client SSH utilisé pour démarrer les travailleurs. Par défaut, c'est"ssh"
.sshflags
: spécifie des options ssh supplémentaires, par exemplesshflags=`-i /home/foo/bar.pem`
max_parallel
: spécifie le nombre maximum de travailleurs connectés en parallèle à un hôte. Par défaut, c'est 10.shell
: spécifie le type de shell auquel ssh se connecte sur les travailleurs.shell=:posix
: un shell Unix/Linux compatible POSIX (sh, ksh, bash, dash, zsh, etc.). Par défaut.shell=:csh
: un shell C Unix (csh, tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: spécifie le répertoire de travail sur les travailleurs. Par défaut, c'est le répertoire actuel de l'hôte (tel que trouvé parpwd()
)enable_threaded_blas
: sitrue
, alors BLAS s'exécutera sur plusieurs threads dans les processus ajoutés. Par défaut, c'estfalse
.exename
: nom de l'exécutablejulia
. Par défaut, c'est"$(Sys.BINDIR)/julia"
ou"$(Sys.BINDIR)/julia-debug"
selon le cas. Il est recommandé d'utiliser une version commune de Julia sur toutes les machines distantes car la sérialisation et la distribution de code pourraient échouer sinon.exeflags
: drapeaux supplémentaires passés aux processus de travail.topology
: Spécifie comment les travailleurs se connectent les uns aux autres. L'envoi d'un message entre des travailleurs non connectés entraîne une erreur.topology=:all_to_all
: Tous les processus sont connectés les uns aux autres. Par défaut.topology=:master_worker
: Seul le processus pilote, c'est-à-direpid
1 se connecte aux travailleurs. Les travailleurs ne se connectent pas entre eux.topology=:custom
: La méthodelaunch
du gestionnaire de cluster spécifie la topologie de connexion via les champsident
etconnect_idents
dansWorkerConfig
. Un travailleur avec une identité de gestionnaire de clusterident
se connectera à tous les travailleurs spécifiés dansconnect_idents
.
lazy
: Applicable uniquement avectopology=:all_to_all
. Sitrue
, les connexions travailleur-travailleur sont configurées de manière paresseuse, c'est-à-dire qu'elles sont configurées lors de la première instance d'un appel distant entre travailleurs. Par défaut, c'est vrai.env
: fournir un tableau de paires de chaînes telles queenv=["JULIA_DEPOT_PATH"=>"/depot"]
pour demander que des variables d'environnement soient définies sur la machine distante. Par défaut, seule la variable d'environnementJULIA_WORKER_TIMEOUT
est passée automatiquement de l'environnement local à l'environnement distant.cmdline_cookie
: passez le cookie d'authentification via l'option de ligne de commande--worker
. Le comportement par défaut (plus sécurisé) de passer le cookie via ssh stdio peut se bloquer avec des travailleurs Windows utilisant des versions Julia ou Windows plus anciennes (avant ConPTY), auquel cascmdline_cookie=true
offre une solution de contournement.
Les arguments de mots-clés ssh
, shell
, env
et cmdline_cookie
ont été ajoutés dans Julia 1.6.
Variables d'environnement:
Si le processus maître échoue à établir une connexion avec un travailleur nouvellement lancé dans les 60,0 secondes, le travailleur considère cela comme une situation fatale et se termine. Ce délai peut être contrôlé via la variable d'environnement JULIA_WORKER_TIMEOUT
. La valeur de JULIA_WORKER_TIMEOUT
sur le processus maître spécifie le nombre de secondes qu'un travailleur nouvellement lancé attend pour établir une connexion.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Liste des identifiants de processus
Lancez des travailleurs np
sur l'hôte local en utilisant le LocalManager
intégré.
Les travailleurs locaux héritent de l'environnement de package actuel (c'est-à-dire le projet actif, LOAD_PATH
, et DEPOT_PATH
) du processus principal.
Notez que les travailleurs n'exécutent pas de script de démarrage ~/.julia/config/startup.jl
, ni ne synchronisent leur état global (tel que les options de ligne de commande, les variables globales, les définitions de nouvelles méthodes et les modules chargés) avec aucun des autres processus en cours d'exécution.
Arguments clés :
restrict::Bool
: sitrue
(par défaut), la liaison est restreinte à127.0.0.1
.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: même effet que pourSSHManager
, voir la documentation pouraddprocs(machines::AbstractVector)
.
L'héritage de l'environnement de package et l'argument clé env
ont été ajoutés dans Julia 1.9.
Distributed.nprocs
— Functionnprocs()
Obtenez le nombre de processus disponibles.
Exemples
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
Obtenez le nombre de processus de travail disponibles. Cela est un de moins que nprocs()
. Égal à nprocs()
si nprocs() == 1
.
Exemples
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
Renvoie une liste de tous les identifiants de processus, y compris le pid 1 (qui n'est pas inclus par workers()
).
Exemples
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
Renvoie une liste de tous les identifiants de processus sur le même nœud physique. En particulier, tous les travailleurs liés à la même adresse IP que pid
sont renvoyés.
Distributed.workers
— Functionworkers()
Retourne une liste de tous les identifiants de processus de travail.
Exemples
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
Supprime les travailleurs spécifiés. Notez que seul le processus 1 peut ajouter ou supprimer des travailleurs.
L'argument waitfor
spécifie combien de temps attendre que les travailleurs s'arrêtent :
- Si non spécifié,
rmprocs
attendra que tous lespids
demandés soient supprimés. - Une
ErrorException
est levée si tous les travailleurs ne peuvent pas être terminés avant les secondeswaitfor
demandées. - Avec une valeur
waitfor
de 0, l'appel retourne immédiatement avec les travailleurs programmés pour suppression dans une tâche différente. L'objetTask
programmé est retourné. L'utilisateur doit appelerwait
sur la tâche avant d'invoquer d'autres appels parallèles.
Exemples
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
Interrompt la tâche en cours d'exécution sur les travailleurs spécifiés. Cela équivaut à appuyer sur Ctrl-C sur la machine locale. Si aucun argument n'est donné, tous les travailleurs sont interrompus.
interrupt(pids::AbstractVector=workers())
Interrompt la tâche en cours d'exécution sur les travailleurs spécifiés. Cela équivaut à appuyer sur Ctrl-C sur la machine locale. Si aucun argument n'est donné, tous les travailleurs sont interrompus.
Distributed.myid
— Functionmyid()
Obtenez l'identifiant du processus actuel.
Exemples
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Transformez la collection c
en appliquant f
à chaque élément en utilisant les travailleurs et tâches disponibles.
Pour plusieurs arguments de collection, appliquez f
élément par élément.
Notez que f
doit être rendu disponible à tous les processus de travailleur ; voir Code Availability and Loading Packages pour plus de détails.
Si un pool de travailleurs n'est pas spécifié, tous les travailleurs disponibles seront utilisés via un CachingPool
.
Par défaut, pmap
distribue le calcul sur tous les travailleurs spécifiés. Pour utiliser uniquement le processus local et distribuer sur les tâches, spécifiez distributed=false
. Cela équivaut à utiliser asyncmap
. Par exemple, pmap(f, c; distributed=false)
est équivalent à asyncmap(f,c; ntasks=()->nworkers())
pmap
peut également utiliser un mélange de processus et de tâches via l'argument batch_size
. Pour des tailles de lot supérieures à 1, la collection est traitée en plusieurs lots, chacun d'une longueur de batch_size
ou moins. Un lot est envoyé comme une seule demande à un travailleur libre, où un asyncmap
local traite les éléments du lot en utilisant plusieurs tâches concurrentes.
Toute erreur empêche pmap
de traiter le reste de la collection. Pour remplacer ce comportement, vous pouvez spécifier une fonction de gestion des erreurs via l'argument on_error
qui prend un seul argument, c'est-à-dire l'exception. La fonction peut arrêter le traitement en relançant l'erreur, ou, pour continuer, retourner toute valeur qui est ensuite renvoyée en ligne avec les résultats à l'appelant.
Considérez les deux exemples suivants. Le premier renvoie l'objet d'exception en ligne, le second un 0 à la place de toute exception :
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Les erreurs peuvent également être gérées en réessayant les calculs échoués. Les arguments de mot-clé retry_delays
et retry_check
sont transmis à retry
en tant qu'arguments de mot-clé delays
et check
respectivement. Si le regroupement est spécifié, et qu'un lot entier échoue, tous les éléments du lot sont réessayés.
Notez que si à la fois on_error
et retry_delays
sont spécifiés, le crochet on_error
est appelé avant de réessayer. Si on_error
ne lance pas (ou ne relance pas) une exception, l'élément ne sera pas réessayé.
Exemple : En cas d'erreurs, réessayez f
sur un élément un maximum de 3 fois sans aucun délai entre les réessais.
pmap(f, c; retry_delays = zeros(3))
Exemple : Réessayez f
uniquement si l'exception n'est pas de type InexactError
, avec des délais augmentant de manière exponentielle jusqu'à 3 fois. Retournez un NaN
à la place de toutes les occurrences de InexactError
.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
Les exceptions lors des calculs distants sont capturées et relancées localement. Un RemoteException
enveloppe le pid
du travailleur et une exception capturée. Une CapturedException
capture l'exception distante et une forme sérialisable de la pile d'appels lorsque l'exception a été levée.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
Après qu'un processus client Julia ait quitté, toute tentative ultérieure de référencer l'enfant mort déclenchera cette exception.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Un Future
est un espace réservé pour un calcul unique dont le statut de terminaison et le temps sont inconnus. Pour plusieurs calculs potentiels, voir RemoteChannel
. Voir remoteref_id
pour identifier un AbstractRemoteRef
.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
Faites référence à un Channel{Any}(1)
sur le processus pid
. Le pid
par défaut est le processus actuel.
RemoteChannel(f::Function, pid::Integer=myid())
Créez des références à des canaux distants d'une taille et d'un type spécifiques. f
est une fonction qui, lorsqu'elle est exécutée sur pid
, doit renvoyer une implémentation d'un AbstractChannel
.
Par exemple, RemoteChannel(()->Channel{Int}(10), pid)
, renverra une référence à un canal de type Int
et de taille 10 sur pid
.
Le pid
par défaut est le processus actuel.
Base.fetch
— Methodfetch(x::Future)
Attendez et obtenez la valeur d'un Future
. La valeur récupérée est mise en cache localement. Les appels ultérieurs à fetch
sur la même référence renvoient la valeur mise en cache. Si la valeur distante est une exception, cela lance une RemoteException
qui capture l'exception distante et la trace de la pile.
Base.fetch
— Methodfetch(c::RemoteChannel)
Attendez et obtenez une valeur d'un RemoteChannel
. Les exceptions levées sont les mêmes que pour un Future
. Ne supprime pas l'élément récupéré.
fetch(x::Any)
Retourne x
.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
Appelle une fonction f
de manière asynchrone avec les arguments donnés sur le processus spécifié. Retourne un Future
. Les arguments de mot-clé, le cas échéant, sont transmis à f
.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
Effectuez un wait(remotecall(...))
plus rapide en un seul message sur le Worker
spécifié par l'identifiant du worker id
. Les arguments de mot-clé, le cas échéant, sont transmis à f
.
Voir aussi wait
et remotecall
.
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
Effectue fetch(remotecall(...))
en un seul message. Les arguments de mot-clé, le cas échéant, sont transmis à f
. Toute exception distante est capturée dans une RemoteException
et levée.
Voir aussi fetch
et remotecall
.
Exemples
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: Sur le worker 2 :
DomainError avec -4.0 :
sqrt a été appelé avec un argument réel négatif mais ne renverra qu'un résultat complexe s'il est appelé avec un argument complexe. Essayez sqrt(Complex(x)).
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
Exécute f
sur le travailleur id
de manière asynchrone. Contrairement à remotecall
, il ne stocke pas le résultat du calcul, ni il n'y a de moyen d'attendre son achèvement.
Une invocation réussie indique que la demande a été acceptée pour exécution sur le nœud distant.
Alors que les appels consécutifs à remotecall
vers le même travailleur sont sérialisés dans l'ordre où ils sont invoqués, l'ordre des exécutions sur le travailleur distant est indéterminé. Par exemple, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
va sérialiser l'appel à f1
, suivi de f2
et f3
dans cet ordre. Cependant, il n'est pas garanti que f1
soit exécuté avant f3
sur le travailleur 2.
Toutes les exceptions levées par f
sont imprimées sur stderr
sur le travailleur distant.
Les arguments de mot-clé, le cas échéant, sont transmis à f
.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
Stocke un ensemble de valeurs dans le RemoteChannel
. Si le canal est plein, il bloque jusqu'à ce qu'un espace soit disponible. Retourne le premier argument.
Base.put!
— Methodput!(rr::Future, v)
Stockez une valeur dans un Future
rr
. Les Future
s sont des références distantes en écriture unique. Un put!
sur un Future
déjà défini déclenche une Exception
. Tous les appels distants asynchrones renvoient des Future
s et définissent la valeur sur la valeur de retour de l'appel une fois terminé.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
Récupère la ou les valeur(s) d'un RemoteChannel
rr
, en supprimant la ou les valeur(s) dans le processus.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
Déterminez si un RemoteChannel
a une valeur stockée. Notez que cette fonction peut provoquer des conditions de concurrence, car au moment où vous recevez son résultat, cela peut ne plus être vrai. Cependant, elle peut être utilisée en toute sécurité sur un Future
car ils ne sont assignés qu'une seule fois.
Base.isready
— Methodisready(rr::Future)
Déterminez si un Future
a une valeur stockée.
Si l'argument Future
est détenu par un nœud différent, cet appel bloquera en attendant la réponse. Il est recommandé d'attendre rr
dans une tâche séparée ou d'utiliser un Channel
local comme proxy :
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # ne bloquera pas
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
Supertype pour les pools de travailleurs tels que WorkerPool
et CachingPool
. Un AbstractWorkerPool
doit implémenter :
push!
- ajouter un nouveau travailleur au pool global (disponible + occupé)put!
- remettre un travailleur dans le pool disponibletake!
- prendre un travailleur du pool disponible (à utiliser pour l'exécution de fonctions distantes)length
- nombre de travailleurs disponibles dans le pool globalisready
- retourner false si untake!
sur le pool bloquerait, sinon true
Les implémentations par défaut de ce qui précède (sur un AbstractWorkerPool
) nécessitent des champs
channel::Channel{Int}
workers::Set{Int}
où channel
contient les pids des travailleurs libres et workers
est l'ensemble de tous les travailleurs associés à ce pool.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Créez un WorkerPool
à partir d'un vecteur ou d'une plage d'identifiants de travailleurs.
Exemples
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
Une implémentation d'un AbstractWorkerPool
. remote
, remotecall_fetch
, pmap
(et d'autres appels distants qui exécutent des fonctions à distance) bénéficient de la mise en cache des fonctions sérialisées/désérialisées sur les nœuds de travail, en particulier les fermetures (qui peuvent capturer de grandes quantités de données).
Le cache distant est maintenu pendant la durée de vie de l'objet CachingPool
retourné. Pour vider le cache plus tôt, utilisez clear!(pool)
.
Pour les variables globales, seules les liaisons sont capturées dans une fermeture, pas les données. Des blocs let
peuvent être utilisés pour capturer des données globales.
Exemples
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
Ce qui précède transférerait foo
une seule fois à chaque travailleur.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
contenant des workers
inactifs - utilisé par remote(f)
et pmap
(par défaut). À moins qu'un ne soit explicitement défini via default_worker_pool!(pool)
, le pool de travailleurs par défaut est initialisé à un WorkerPool
.
Exemples
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
Efface les liaisons globales dans les modules en les initialisant à nothing
. syms
doit être de type Symbol
ou une collection de Symbol
s. pids
et mod
identifient les processus et le module dans lequel les variables globales doivent être réinitialisées. Seuls les noms trouvés définis sous mod
sont effacés.
Une exception est levée si une constante globale est demandée à être effacée.
clear!(pool::CachingPool) -> pool
Supprime toutes les fonctions mises en cache de tous les travailleurs participants.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
Retourne une fonction anonyme qui exécute la fonction f
sur un travailleur disponible (tiré de WorkerPool
p
si fourni) en utilisant remotecall_fetch
.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
variante de WorkerPool
de remotecall(f, pid, ....)
. Attendez et prenez un travailleur libre de pool
et effectuez un remotecall
dessus.
Exemples
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
Dans cet exemple, la tâche a été exécutée sur pid 2, appelée depuis pid 1.
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
variante de remotecall_wait(f, pid, ....)
. Attendez et prenez un travailleur libre de pool
et effectuez un remotecall_wait
dessus.
Exemples
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
variante de remotecall_fetch(f, pid, ....)
. Attend et prend un travailleur libre de pool
et effectue un remotecall_fetch
dessus.
Exemples
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> rien
WorkerPool
variante de remote_do(f, pid, ....)
. Attendez et prenez un travailleur libre de pool
et effectuez un remote_do
dessus.
Distributed.@spawn
— Macro@spawn expr
Créez une fermeture autour d'une expression et exécutez-la sur un processus choisi automatiquement, renvoyant un Future
vers le résultat. Ce macro est obsolète ; @spawnat :any expr
doit être utilisé à la place.
Exemples
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
À partir de Julia 1.3, ce macro est obsolète. Utilisez @spawnat :any
à la place.
Distributed.@spawnat
— Macro@spawnat p expr
Créez une fermeture autour d'une expression et exécutez la fermeture de manière asynchrone sur le processus p
. Retournez un Future
vers le résultat. Si p
est le symbole littéral cité :any
, alors le système choisira automatiquement un processeur à utiliser.
Exemples
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
L'argument :any
est disponible depuis Julia 1.3.
Distributed.@fetch
— Macro@fetch expr
Équivalent à fetch(@spawnat :any expr)
. Voir fetch
et @spawnat
.
Exemples
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
Équivalent à fetch(@spawnat p expr)
. Voir fetch
et @spawnat
.
Exemples
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
Une boucle for parallèle en mémoire distribuée de la forme :
@distributed [réducteur] pour var = plage
corps
fin
La plage spécifiée est partitionnée et exécutée localement sur tous les travailleurs. Dans le cas où une fonction réductrice optionnelle est spécifiée, @distributed
effectue des réductions locales sur chaque travailleur avec une réduction finale sur le processus appelant.
Notez que sans fonction réductrice, @distributed
s'exécute de manière asynchrone, c'est-à-dire qu'il génère des tâches indépendantes sur tous les travailleurs disponibles et retourne immédiatement sans attendre l'achèvement. Pour attendre l'achèvement, préfixez l'appel avec @sync
, comme suit :
@sync @distributed pour var = plage
corps
fin
Distributed.@everywhere
— Macro@everywhere [procs()] expr
Exécute une expression sous Main
sur tous les procs
. Les erreurs sur l'un des processus sont collectées dans une CompositeException
et lancées. Par exemple :
@everywhere bar = 1
définira Main.bar
sur tous les processus actuels. Les processus ajoutés plus tard (par exemple avec addprocs()
) n'auront pas l'expression définie.
Contrairement à @spawnat
, @everywhere
ne capture aucune variable locale. Au lieu de cela, les variables locales peuvent être diffusées en utilisant l'interpolation :
foo = 1
@everywhere bar = $foo
L'argument optionnel procs
permet de spécifier un sous-ensemble de tous les processus pour exécuter l'expression.
Semblable à l'appel de remotecall_eval(Main, procs, expr)
, mais avec deux fonctionnalités supplémentaires :
- Les instructions `using` et `import` s'exécutent d'abord sur le processus appelant, pour s'assurer que
les paquets sont précompilés.
- Le chemin du fichier source actuel utilisé par `include` est propagé aux autres processus.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Les Future
s et les RemoteChannel
s sont identifiés par des champs :
where
- fait référence au nœud où l'objet/de stockage sous-jacent auquel la référence fait référence existe réellement.whence
- fait référence au nœud à partir duquel la référence distante a été créée. Notez que cela est différent du nœud où l'objet sous-jacent auquel il est fait référence existe réellement. Par exemple, appelerRemoteChannel(2)
depuis le processus maître donnerait une valeurwhere
de 2 et une valeurwhence
de 1.id
est unique parmi toutes les références créées à partir du travailleur spécifié parwhence
.
Ensemble, whence
et id
identifient de manière unique une référence parmi tous les travailleurs.
remoteref_id
est une API de bas niveau qui renvoie un objet RRID
qui encapsule les valeurs whence
et id
d'une référence distante.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
Une API de bas niveau qui retourne le AbstractChannel
de support pour un id
retourné par remoteref_id
. L'appel est valide uniquement sur le nœud où le canal de support existe.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
Une API de bas niveau qui, étant donné une connexion IO
ou un Worker
, renvoie le pid
du worker auquel il est connecté. Cela est utile lors de l'écriture de méthodes serialize
personnalisées pour un type, qui optimisent les données écrites en fonction de l'identifiant du processus récepteur.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
Retourne le cookie de cluster.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
Définit le cookie passé comme le cookie de cluster, puis le retourne.
Cluster Manager Interface
Cette interface fournit un mécanisme pour lancer et gérer des travailleurs Julia sur différents environnements de cluster. Il existe deux types de gestionnaires présents dans Base : LocalManager
, pour lancer des travailleurs supplémentaires sur le même hôte, et SSHManager
, pour lancer sur des hôtes distants via ssh
. Des sockets TCP/IP sont utilisés pour se connecter et transporter des messages entre les processus. Il est possible que les gestionnaires de cluster fournissent un transport différent.
Distributed.ClusterManager
— TypeClusterManager
Supertype pour les gestionnaires de clusters, qui contrôlent les processus de travail en tant que cluster. Les gestionnaires de clusters mettent en œuvre comment les travailleurs peuvent être ajoutés, supprimés et communiqués. SSHManager
et LocalManager
sont des sous-types de cela.
Distributed.WorkerConfig
— TypeWorkerConfig
Type utilisé par ClusterManager
s pour contrôler les travailleurs ajoutés à leurs clusters. Certains champs sont utilisés par tous les gestionnaires de clusters pour accéder à un hôte :
io
– la connexion utilisée pour accéder au travailleur (un sous-type deIO
ouNothing
)host
– l'adresse de l'hôte (soit uneString
ouNothing
)port
– le port sur l'hôte utilisé pour se connecter au travailleur (soit unInt
ouNothing
)
Certains sont utilisés par le gestionnaire de cluster pour ajouter des travailleurs à un hôte déjà initialisé :
count
– le nombre de travailleurs à lancer sur l'hôteexename
– le chemin vers l'exécutable Julia sur l'hôte, par défaut"$(Sys.BINDIR)/julia"
ou"$(Sys.BINDIR)/julia-debug"
exeflags
– options à utiliser lors du lancement de Julia à distance
Le champ userdata
est utilisé pour stocker des informations pour chaque travailleur par des gestionnaires externes.
Certains champs sont utilisés par SSHManager
et des gestionnaires similaires :
tunnel
–true
(utiliser le tunneling),false
(ne pas utiliser le tunneling), ounothing
(utiliser la valeur par défaut pour le gestionnaire)multiplex
–true
(utiliser le multiplexage SSH pour le tunneling) oufalse
forward
– l'option de transfert utilisée pour l'option-L
de sshbind_addr
– l'adresse sur l'hôte distant à laquelle se liersshflags
– options à utiliser pour établir la connexion SSHmax_parallel
– le nombre maximum de travailleurs à connecter en parallèle sur l'hôte
Certains champs sont utilisés à la fois par les LocalManager
s et les SSHManager
s :
connect_at
– détermine s'il s'agit d'un appel de configuration travailleur-à-travailleur ou pilote-à-travailleurprocess
– le processus qui sera connecté (généralement le gestionnaire assignera cela lors deaddprocs
)ospid
– l'ID de processus selon le système d'exploitation de l'hôte, utilisé pour interrompre les processus des travailleursenviron
– dictionnaire privé utilisé pour stocker des informations temporaires par les gestionnaires Local/SSHident
– travailleur tel qu'identifié par leClusterManager
connect_idents
– liste des identifiants de travailleurs auxquels le travailleur doit se connecter s'il utilise une topologie personnaliséeenable_threaded_blas
–true
,false
, ounothing
, s'il faut utiliser BLAS multithreadé ou non sur les travailleurs
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implémenté par les gestionnaires de cluster. Pour chaque travailleur Julia lancé par cette fonction, il doit ajouter une entrée WorkerConfig
à launched
et notifier launch_ntfy
. La fonction DOIT se terminer une fois que tous les travailleurs, demandés par manager
, ont été lancés. params
est un dictionnaire de tous les arguments de mot-clé avec lesquels addprocs
a été appelé.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Implémenté par les gestionnaires de cluster. Il est appelé sur le processus maître, pendant la durée de vie d'un travailleur, avec des valeurs op
appropriées :
- avec
:register
/:deregister
lorsqu'un travailleur est ajouté / retiré du pool de travailleurs Julia. - avec
:interrupt
lorsqueinterrupt(workers)
est appelé. LeClusterManager
doit signaler au travailleur approprié avec un signal d'interruption. - avec
:finalize
à des fins de nettoyage.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Implémenté par les gestionnaires de cluster. Il est appelé sur le processus maître, par rmprocs
. Cela devrait amener le travailleur distant spécifié par pid
à quitter. kill(manager::ClusterManager.....)
exécute un exit()
distant sur pid
.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Implémenté par des gestionnaires de cluster utilisant des transports personnalisés. Cela doit établir une connexion logique avec le travailleur ayant l'identifiant pid
, spécifié par config
, et retourner une paire d'objets IO
. Les messages de pid
vers le processus actuel seront lus à partir de instrm
, tandis que les messages à envoyer à pid
seront écrits dans outstrm
. L'implémentation du transport personnalisé doit garantir que les messages sont livrés et reçus complètement et dans l'ordre. connect(manager::ClusterManager.....)
configure des connexions de socket TCP/IP entre les travailleurs.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Appelé par les gestionnaires de cluster implémentant des transports personnalisés. Il initialise un processus nouvellement lancé en tant que travailleur. L'argument de ligne de commande --worker[=<cookie>]
a pour effet d'initialiser un processus en tant que travailleur utilisant des sockets TCP/IP pour le transport. cookie
est un cluster_cookie
.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
est une fonction interne qui est le point d'entrée par défaut pour les processus de travail se connectant via TCP/IP. Elle configure le processus en tant que travailleur du cluster Julia.
Les informations host:port sont écrites dans le flux out
(par défaut stdout).
La fonction lit le cookie depuis stdin si nécessaire, et écoute sur un port libre (ou si spécifié, le port dans l'option de ligne de commande --bind-to
) et planifie des tâches pour traiter les connexions TCP entrantes et les demandes. Elle ferme également (optionnellement) stdin et redirige stderr vers stdout.
Elle ne retourne pas.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Appelé par les gestionnaires de cluster utilisant des transports personnalisés. Il doit être appelé lorsque l'implémentation du transport personnalisé reçoit le premier message d'un travailleur distant. Le transport personnalisé doit gérer une connexion logique au travailleur distant et fournir deux objets IO
, un pour les messages entrants et l'autre pour les messages adressés au travailleur distant. Si incoming
est true
, le pair distant a initié la connexion. Celui des deux qui initie la connexion envoie le cookie du cluster et son numéro de version Julia pour effectuer la poignée de main d'authentification.
Voir aussi cluster_cookie
.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Implémenté par les gestionnaires de cluster. Les paramètres de mot-clé par défaut passés lors de l'appel de addprocs(mgr)
. L'ensemble minimal d'options est disponible en appelant default_addprocs_params()
.