Distributed Computing
Distributed — ModuleOutils pour le traitement parallèle distribué.
Distributed.addprocs — Functionaddprocs(manager::ClusterManager; kwargs...) -> Liste des identifiants de processusLance des processus de travail via le gestionnaire de cluster spécifié.
Par exemple, les clusters Beowulf sont pris en charge via un gestionnaire de cluster personnalisé implémenté dans le package ClusterManagers.jl.
Le nombre de secondes qu'un nouveau travailleur attend pour établir une connexion avec le maître peut être spécifié via la variable JULIA_WORKER_TIMEOUT dans l'environnement du processus de travailleur. Pertinent uniquement lors de l'utilisation de TCP/IP comme transport.
Pour lancer des travailleurs sans bloquer le REPL, ou la fonction contenant si vous lancez des travailleurs de manière programmatique, exécutez addprocs dans sa propre tâche.
Exemples
# Sur des clusters occupés, appelez `addprocs` de manière asynchrone
t = @async addprocs(...)# Utilisez les travailleurs au fur et à mesure qu'ils se connectent
if nprocs() > 1 # Assurez-vous qu'au moins un nouveau travailleur est disponible
.... # effectuez une exécution distribuée
end# Récupérez les ID des travailleurs nouvellement lancés, ou tout message d'erreur
if istaskdone(t) # Vérifiez si `addprocs` a été complété pour s'assurer que `fetch` ne bloque pas
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Liste des identifiants de processusAjoutez des processus de travail sur des machines distantes via SSH. La configuration se fait avec des arguments de mots-clés (voir ci-dessous). En particulier, le mot-clé exename peut être utilisé pour spécifier le chemin vers le binaire julia sur la ou les machines distantes.
machines est un vecteur de "spécifications de machine" qui sont données sous forme de chaînes du type [user@]host[:port] [bind_addr[:port]]. user par défaut est l'utilisateur actuel et port le port SSH standard. Si [bind_addr[:port]] est spécifié, d'autres travailleurs se connecteront à ce travailleur à l'adresse bind_addr et au port spécifiés.
Il est possible de lancer plusieurs processus sur un hôte distant en utilisant un tuple dans le vecteur machines ou la forme (machine_spec, count), où count est le nombre de travailleurs à lancer sur l'hôte spécifié. Passer :auto comme nombre de travailleurs lancera autant de travailleurs que le nombre de threads CPU sur l'hôte distant.
Exemples:
addprocs([
"remote1", # un travailleur sur 'remote1' se connectant avec le nom d'utilisateur actuel
"user@remote2", # un travailleur sur 'remote2' se connectant avec le nom d'utilisateur 'user'
"user@remote3:2222", # spécifiant le port SSH à '2222' pour 'remote3'
("user@remote4", 4), # lancer 4 travailleurs sur 'remote4'
("user@remote5", :auto), # lancer autant de travailleurs que de threads CPU sur 'remote5'
])Arguments de mots-clés:
tunnel: sitrue, alors un tunnel SSH sera utilisé pour se connecter au travailleur depuis le processus maître. Par défaut, c'estfalse.multiplex: sitrue, alors le multiplexage SSH est utilisé pour le tunnel SSH. Par défaut, c'estfalse.ssh: le nom ou le chemin de l'exécutable du client SSH utilisé pour démarrer les travailleurs. Par défaut, c'est"ssh".sshflags: spécifie des options ssh supplémentaires, par exemplesshflags=`-i /home/foo/bar.pem`max_parallel: spécifie le nombre maximum de travailleurs connectés en parallèle à un hôte. Par défaut, c'est 10.shell: spécifie le type de shell auquel ssh se connecte sur les travailleurs.shell=:posix: un shell Unix/Linux compatible POSIX (sh, ksh, bash, dash, zsh, etc.). Par défaut.shell=:csh: un shell C Unix (csh, tcsh).shell=:wincmd: Microsoft Windowscmd.exe.
dir: spécifie le répertoire de travail sur les travailleurs. Par défaut, c'est le répertoire actuel de l'hôte (tel que trouvé parpwd())enable_threaded_blas: sitrue, alors BLAS s'exécutera sur plusieurs threads dans les processus ajoutés. Par défaut, c'estfalse.exename: nom de l'exécutablejulia. Par défaut, c'est"$(Sys.BINDIR)/julia"ou"$(Sys.BINDIR)/julia-debug"selon le cas. Il est recommandé d'utiliser une version commune de Julia sur toutes les machines distantes car la sérialisation et la distribution de code pourraient échouer sinon.exeflags: drapeaux supplémentaires passés aux processus de travail.topology: Spécifie comment les travailleurs se connectent les uns aux autres. L'envoi d'un message entre des travailleurs non connectés entraîne une erreur.topology=:all_to_all: Tous les processus sont connectés les uns aux autres. Par défaut.topology=:master_worker: Seul le processus pilote, c'est-à-direpid1 se connecte aux travailleurs. Les travailleurs ne se connectent pas entre eux.topology=:custom: La méthodelaunchdu gestionnaire de cluster spécifie la topologie de connexion via les champsidentetconnect_identsdansWorkerConfig. Un travailleur avec une identité de gestionnaire de clusteridentse connectera à tous les travailleurs spécifiés dansconnect_idents.
lazy: Applicable uniquement avectopology=:all_to_all. Sitrue, les connexions travailleur-travailleur sont configurées de manière paresseuse, c'est-à-dire qu'elles sont configurées lors de la première instance d'un appel distant entre travailleurs. Par défaut, c'est vrai.env: fournir un tableau de paires de chaînes telles queenv=["JULIA_DEPOT_PATH"=>"/depot"]pour demander que des variables d'environnement soient définies sur la machine distante. Par défaut, seule la variable d'environnementJULIA_WORKER_TIMEOUTest passée automatiquement de l'environnement local à l'environnement distant.cmdline_cookie: passez le cookie d'authentification via l'option de ligne de commande--worker. Le comportement par défaut (plus sécurisé) de passer le cookie via ssh stdio peut se bloquer avec des travailleurs Windows utilisant des versions Julia ou Windows plus anciennes (avant ConPTY), auquel cascmdline_cookie=trueoffre une solution de contournement.
Les arguments de mots-clés ssh, shell, env et cmdline_cookie ont été ajoutés dans Julia 1.6.
Variables d'environnement:
Si le processus maître échoue à établir une connexion avec un travailleur nouvellement lancé dans les 60,0 secondes, le travailleur considère cela comme une situation fatale et se termine. Ce délai peut être contrôlé via la variable d'environnement JULIA_WORKER_TIMEOUT. La valeur de JULIA_WORKER_TIMEOUT sur le processus maître spécifie le nombre de secondes qu'un travailleur nouvellement lancé attend pour établir une connexion.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Liste des identifiants de processusLancez des travailleurs np sur l'hôte local en utilisant le LocalManager intégré.
Les travailleurs locaux héritent de l'environnement de package actuel (c'est-à-dire le projet actif, LOAD_PATH, et DEPOT_PATH) du processus principal.
Notez que les travailleurs n'exécutent pas de script de démarrage ~/.julia/config/startup.jl, ni ne synchronisent leur état global (tel que les options de ligne de commande, les variables globales, les définitions de nouvelles méthodes et les modules chargés) avec aucun des autres processus en cours d'exécution.
Arguments clés :
restrict::Bool: sitrue(par défaut), la liaison est restreinte à127.0.0.1.dir,exename,exeflags,env,topology,lazy,enable_threaded_blas: même effet que pourSSHManager, voir la documentation pouraddprocs(machines::AbstractVector).
L'héritage de l'environnement de package et l'argument clé env ont été ajoutés dans Julia 1.9.
Distributed.nprocs — Functionnprocs()Obtenez le nombre de processus disponibles.
Exemples
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — Functionnworkers()Obtenez le nombre de processus de travail disponibles. Cela est un de moins que nprocs(). Égal à nprocs() si nprocs() == 1.
Exemples
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — Methodprocs()Renvoie une liste de tous les identifiants de processus, y compris le pid 1 (qui n'est pas inclus par workers()).
Exemples
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — Methodprocs(pid::Integer)Renvoie une liste de tous les identifiants de processus sur le même nœud physique. En particulier, tous les travailleurs liés à la même adresse IP que pid sont renvoyés.
Distributed.workers — Functionworkers()Retourne une liste de tous les identifiants de processus de travail.
Exemples
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — Functionrmprocs(pids...; waitfor=typemax(Int))Supprime les travailleurs spécifiés. Notez que seul le processus 1 peut ajouter ou supprimer des travailleurs.
L'argument waitfor spécifie combien de temps attendre que les travailleurs s'arrêtent :
- Si non spécifié,
rmprocsattendra que tous lespidsdemandés soient supprimés. - Une
ErrorExceptionest levée si tous les travailleurs ne peuvent pas être terminés avant les secondeswaitfordemandées. - Avec une valeur
waitforde 0, l'appel retourne immédiatement avec les travailleurs programmés pour suppression dans une tâche différente. L'objetTaskprogrammé est retourné. L'utilisateur doit appelerwaitsur la tâche avant d'invoquer d'autres appels parallèles.
Exemples
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — Functioninterrupt(pids::Integer...)Interrompt la tâche en cours d'exécution sur les travailleurs spécifiés. Cela équivaut à appuyer sur Ctrl-C sur la machine locale. Si aucun argument n'est donné, tous les travailleurs sont interrompus.
interrupt(pids::AbstractVector=workers())Interrompt la tâche en cours d'exécution sur les travailleurs spécifiés. Cela équivaut à appuyer sur Ctrl-C sur la machine locale. Si aucun argument n'est donné, tous les travailleurs sont interrompus.
Distributed.myid — Functionmyid()Obtenez l'identifiant du processus actuel.
Exemples
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collectionTransformez la collection c en appliquant f à chaque élément en utilisant les travailleurs et tâches disponibles.
Pour plusieurs arguments de collection, appliquez f élément par élément.
Notez que f doit être rendu disponible à tous les processus de travailleur ; voir Code Availability and Loading Packages pour plus de détails.
Si un pool de travailleurs n'est pas spécifié, tous les travailleurs disponibles seront utilisés via un CachingPool.
Par défaut, pmap distribue le calcul sur tous les travailleurs spécifiés. Pour utiliser uniquement le processus local et distribuer sur les tâches, spécifiez distributed=false. Cela équivaut à utiliser asyncmap. Par exemple, pmap(f, c; distributed=false) est équivalent à asyncmap(f,c; ntasks=()->nworkers())
pmap peut également utiliser un mélange de processus et de tâches via l'argument batch_size. Pour des tailles de lot supérieures à 1, la collection est traitée en plusieurs lots, chacun d'une longueur de batch_size ou moins. Un lot est envoyé comme une seule demande à un travailleur libre, où un asyncmap local traite les éléments du lot en utilisant plusieurs tâches concurrentes.
Toute erreur empêche pmap de traiter le reste de la collection. Pour remplacer ce comportement, vous pouvez spécifier une fonction de gestion des erreurs via l'argument on_error qui prend un seul argument, c'est-à-dire l'exception. La fonction peut arrêter le traitement en relançant l'erreur, ou, pour continuer, retourner toute valeur qui est ensuite renvoyée en ligne avec les résultats à l'appelant.
Considérez les deux exemples suivants. Le premier renvoie l'objet d'exception en ligne, le second un 0 à la place de toute exception :
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0Les erreurs peuvent également être gérées en réessayant les calculs échoués. Les arguments de mot-clé retry_delays et retry_check sont transmis à retry en tant qu'arguments de mot-clé delays et check respectivement. Si le regroupement est spécifié, et qu'un lot entier échoue, tous les éléments du lot sont réessayés.
Notez que si à la fois on_error et retry_delays sont spécifiés, le crochet on_error est appelé avant de réessayer. Si on_error ne lance pas (ou ne relance pas) une exception, l'élément ne sera pas réessayé.
Exemple : En cas d'erreurs, réessayez f sur un élément un maximum de 3 fois sans aucun délai entre les réessais.
pmap(f, c; retry_delays = zeros(3))Exemple : Réessayez f uniquement si l'exception n'est pas de type InexactError, avec des délais augmentant de manière exponentielle jusqu'à 3 fois. Retournez un NaN à la place de toutes les occurrences de InexactError.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — TypeRemoteException(captured)Les exceptions lors des calculs distants sont capturées et relancées localement. Un RemoteException enveloppe le pid du travailleur et une exception capturée. Une CapturedException capture l'exception distante et une forme sérialisable de la pile d'appels lorsque l'exception a été levée.
Distributed.ProcessExitedException — TypeProcessExitedException(worker_id::Int)Après qu'un processus client Julia ait quitté, toute tentative ultérieure de référencer l'enfant mort déclenchera cette exception.
Distributed.Future — TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Un Future est un espace réservé pour un calcul unique dont le statut de terminaison et le temps sont inconnus. Pour plusieurs calculs potentiels, voir RemoteChannel. Voir remoteref_id pour identifier un AbstractRemoteRef.
Distributed.RemoteChannel — TypeRemoteChannel(pid::Integer=myid())Faites référence à un Channel{Any}(1) sur le processus pid. Le pid par défaut est le processus actuel.
RemoteChannel(f::Function, pid::Integer=myid())Créez des références à des canaux distants d'une taille et d'un type spécifiques. f est une fonction qui, lorsqu'elle est exécutée sur pid, doit renvoyer une implémentation d'un AbstractChannel.
Par exemple, RemoteChannel(()->Channel{Int}(10), pid), renverra une référence à un canal de type Int et de taille 10 sur pid.
Le pid par défaut est le processus actuel.
Base.fetch — Methodfetch(x::Future)Attendez et obtenez la valeur d'un Future. La valeur récupérée est mise en cache localement. Les appels ultérieurs à fetch sur la même référence renvoient la valeur mise en cache. Si la valeur distante est une exception, cela lance une RemoteException qui capture l'exception distante et la trace de la pile.
Base.fetch — Methodfetch(c::RemoteChannel)Attendez et obtenez une valeur d'un RemoteChannel. Les exceptions levées sont les mêmes que pour un Future. Ne supprime pas l'élément récupéré.
fetch(x::Any)Retourne x.
Distributed.remotecall — Methodremotecall(f, id::Integer, args...; kwargs...) -> FutureAppelle une fonction f de manière asynchrone avec les arguments donnés sur le processus spécifié. Retourne un Future. Les arguments de mot-clé, le cas échéant, sont transmis à f.
Distributed.remotecall_wait — Methodremotecall_wait(f, id::Integer, args...; kwargs...)Effectuez un wait(remotecall(...)) plus rapide en un seul message sur le Worker spécifié par l'identifiant du worker id. Les arguments de mot-clé, le cas échéant, sont transmis à f.
Voir aussi wait et remotecall.
Distributed.remotecall_fetch — Methodremotecall_fetch(f, id::Integer, args...; kwargs...)Effectue fetch(remotecall(...)) en un seul message. Les arguments de mot-clé, le cas échéant, sont transmis à f. Toute exception distante est capturée dans une RemoteException et levée.
Voir aussi fetch et remotecall.
Exemples
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: Sur le worker 2 :
DomainError avec -4.0 :
sqrt a été appelé avec un argument réel négatif mais ne renverra qu'un résultat complexe s'il est appelé avec un argument complexe. Essayez sqrt(Complex(x)).
...Distributed.remote_do — Methodremote_do(f, id::Integer, args...; kwargs...) -> nothingExécute f sur le travailleur id de manière asynchrone. Contrairement à remotecall, il ne stocke pas le résultat du calcul, ni il n'y a de moyen d'attendre son achèvement.
Une invocation réussie indique que la demande a été acceptée pour exécution sur le nœud distant.
Alors que les appels consécutifs à remotecall vers le même travailleur sont sérialisés dans l'ordre où ils sont invoqués, l'ordre des exécutions sur le travailleur distant est indéterminé. Par exemple, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) va sérialiser l'appel à f1, suivi de f2 et f3 dans cet ordre. Cependant, il n'est pas garanti que f1 soit exécuté avant f3 sur le travailleur 2.
Toutes les exceptions levées par f sont imprimées sur stderr sur le travailleur distant.
Les arguments de mot-clé, le cas échéant, sont transmis à f.
Base.put! — Methodput!(rr::RemoteChannel, args...)Stocke un ensemble de valeurs dans le RemoteChannel. Si le canal est plein, il bloque jusqu'à ce qu'un espace soit disponible. Retourne le premier argument.
Base.put! — Methodput!(rr::Future, v)Stockez une valeur dans un Future rr. Les Futures sont des références distantes en écriture unique. Un put! sur un Future déjà défini déclenche une Exception. Tous les appels distants asynchrones renvoient des Futures et définissent la valeur sur la valeur de retour de l'appel une fois terminé.
Base.take! — Methodtake!(rr::RemoteChannel, args...)Récupère la ou les valeur(s) d'un RemoteChannel rr, en supprimant la ou les valeur(s) dans le processus.
Base.isready — Methodisready(rr::RemoteChannel, args...)Déterminez si un RemoteChannel a une valeur stockée. Notez que cette fonction peut provoquer des conditions de concurrence, car au moment où vous recevez son résultat, cela peut ne plus être vrai. Cependant, elle peut être utilisée en toute sécurité sur un Future car ils ne sont assignés qu'une seule fois.
Base.isready — Methodisready(rr::Future)Déterminez si un Future a une valeur stockée.
Si l'argument Future est détenu par un nœud différent, cet appel bloquera en attendant la réponse. Il est recommandé d'attendre rr dans une tâche séparée ou d'utiliser un Channel local comme proxy :
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # ne bloquera pasDistributed.AbstractWorkerPool — TypeAbstractWorkerPoolSupertype pour les pools de travailleurs tels que WorkerPool et CachingPool. Un AbstractWorkerPool doit implémenter :
push!- ajouter un nouveau travailleur au pool global (disponible + occupé)put!- remettre un travailleur dans le pool disponibletake!- prendre un travailleur du pool disponible (à utiliser pour l'exécution de fonctions distantes)length- nombre de travailleurs disponibles dans le pool globalisready- retourner false si untake!sur le pool bloquerait, sinon true
Les implémentations par défaut de ce qui précède (sur un AbstractWorkerPool) nécessitent des champs
channel::Channel{Int}workers::Set{Int}
où channel contient les pids des travailleurs libres et workers est l'ensemble de tous les travailleurs associés à ce pool.
Distributed.WorkerPool — TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})Créez un WorkerPool à partir d'un vecteur ou d'une plage d'identifiants de travailleurs.
Exemples
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — TypeCachingPool(workers::Vector{Int})Une implémentation d'un AbstractWorkerPool. remote, remotecall_fetch, pmap (et d'autres appels distants qui exécutent des fonctions à distance) bénéficient de la mise en cache des fonctions sérialisées/désérialisées sur les nœuds de travail, en particulier les fermetures (qui peuvent capturer de grandes quantités de données).
Le cache distant est maintenu pendant la durée de vie de l'objet CachingPool retourné. Pour vider le cache plus tôt, utilisez clear!(pool).
Pour les variables globales, seules les liaisons sont capturées dans une fermeture, pas les données. Des blocs let peuvent être utilisés pour capturer des données globales.
Exemples
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
endCe qui précède transférerait foo une seule fois à chaque travailleur.
Distributed.default_worker_pool — Functiondefault_worker_pool()AbstractWorkerPool contenant des workers inactifs - utilisé par remote(f) et pmap (par défaut). À moins qu'un ne soit explicitement défini via default_worker_pool!(pool), le pool de travailleurs par défaut est initialisé à un WorkerPool.
Exemples
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — Functionclear!(syms, pids=workers(); mod=Main)Efface les liaisons globales dans les modules en les initialisant à nothing. syms doit être de type Symbol ou une collection de Symbols. pids et mod identifient les processus et le module dans lequel les variables globales doivent être réinitialisées. Seuls les noms trouvés définis sous mod sont effacés.
Une exception est levée si une constante globale est demandée à être effacée.
clear!(pool::CachingPool) -> poolSupprime toutes les fonctions mises en cache de tous les travailleurs participants.
Distributed.remote — Functionremote([p::AbstractWorkerPool], f) -> FunctionRetourne une fonction anonyme qui exécute la fonction f sur un travailleur disponible (tiré de WorkerPool p si fourni) en utilisant remotecall_fetch.
Distributed.remotecall — Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Futurevariante de WorkerPool de remotecall(f, pid, ....). Attendez et prenez un travailleur libre de pool et effectuez un remotecall dessus.
Exemples
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)Dans cet exemple, la tâche a été exécutée sur pid 2, appelée depuis pid 1.
Distributed.remotecall_wait — Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool variante de remotecall_wait(f, pid, ....). Attendez et prenez un travailleur libre de pool et effectuez un remotecall_wait dessus.
Exemples
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> resultWorkerPool variante de remotecall_fetch(f, pid, ....). Attend et prend un travailleur libre de pool et effectue un remotecall_fetch dessus.
Exemples
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> rienWorkerPool variante de remote_do(f, pid, ....). Attendez et prenez un travailleur libre de pool et effectuez un remote_do dessus.
Distributed.@spawn — Macro@spawn exprCréez une fermeture autour d'une expression et exécutez-la sur un processus choisi automatiquement, renvoyant un Future vers le résultat. Ce macro est obsolète ; @spawnat :any expr doit être utilisé à la place.
Exemples
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3À partir de Julia 1.3, ce macro est obsolète. Utilisez @spawnat :any à la place.
Distributed.@spawnat — Macro@spawnat p exprCréez une fermeture autour d'une expression et exécutez la fermeture de manière asynchrone sur le processus p. Retournez un Future vers le résultat. Si p est le symbole littéral cité :any, alors le système choisira automatiquement un processeur à utiliser.
Exemples
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3L'argument :any est disponible depuis Julia 1.3.
Distributed.@fetch — Macro@fetch exprÉquivalent à fetch(@spawnat :any expr). Voir fetch et @spawnat.
Exemples
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — Macro@fetchfromÉquivalent à fetch(@spawnat p expr). Voir fetch et @spawnat.
Exemples
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — Macro@distributedUne boucle for parallèle en mémoire distribuée de la forme :
@distributed [réducteur] pour var = plage
corps
finLa plage spécifiée est partitionnée et exécutée localement sur tous les travailleurs. Dans le cas où une fonction réductrice optionnelle est spécifiée, @distributed effectue des réductions locales sur chaque travailleur avec une réduction finale sur le processus appelant.
Notez que sans fonction réductrice, @distributed s'exécute de manière asynchrone, c'est-à-dire qu'il génère des tâches indépendantes sur tous les travailleurs disponibles et retourne immédiatement sans attendre l'achèvement. Pour attendre l'achèvement, préfixez l'appel avec @sync, comme suit :
@sync @distributed pour var = plage
corps
finDistributed.@everywhere — Macro@everywhere [procs()] exprExécute une expression sous Main sur tous les procs. Les erreurs sur l'un des processus sont collectées dans une CompositeException et lancées. Par exemple :
@everywhere bar = 1définira Main.bar sur tous les processus actuels. Les processus ajoutés plus tard (par exemple avec addprocs()) n'auront pas l'expression définie.
Contrairement à @spawnat, @everywhere ne capture aucune variable locale. Au lieu de cela, les variables locales peuvent être diffusées en utilisant l'interpolation :
foo = 1
@everywhere bar = $fooL'argument optionnel procs permet de spécifier un sous-ensemble de tous les processus pour exécuter l'expression.
Semblable à l'appel de remotecall_eval(Main, procs, expr), mais avec deux fonctionnalités supplémentaires :
- Les instructions `using` et `import` s'exécutent d'abord sur le processus appelant, pour s'assurer que
les paquets sont précompilés.
- Le chemin du fichier source actuel utilisé par `include` est propagé aux autres processus.Distributed.remoteref_id — Functionremoteref_id(r::AbstractRemoteRef) -> RRIDLes Futures et les RemoteChannels sont identifiés par des champs :
where- fait référence au nœud où l'objet/de stockage sous-jacent auquel la référence fait référence existe réellement.whence- fait référence au nœud à partir duquel la référence distante a été créée. Notez que cela est différent du nœud où l'objet sous-jacent auquel il est fait référence existe réellement. Par exemple, appelerRemoteChannel(2)depuis le processus maître donnerait une valeurwherede 2 et une valeurwhencede 1.idest unique parmi toutes les références créées à partir du travailleur spécifié parwhence.
Ensemble, whence et id identifient de manière unique une référence parmi tous les travailleurs.
remoteref_id est une API de bas niveau qui renvoie un objet RRID qui encapsule les valeurs whence et id d'une référence distante.
Distributed.channel_from_id — Functionchannel_from_id(id) -> cUne API de bas niveau qui retourne le AbstractChannel de support pour un id retourné par remoteref_id. L'appel est valide uniquement sur le nœud où le canal de support existe.
Distributed.worker_id_from_socket — Functionworker_id_from_socket(s) -> pidUne API de bas niveau qui, étant donné une connexion IO ou un Worker, renvoie le pid du worker auquel il est connecté. Cela est utile lors de l'écriture de méthodes serialize personnalisées pour un type, qui optimisent les données écrites en fonction de l'identifiant du processus récepteur.
Distributed.cluster_cookie — Methodcluster_cookie() -> cookieRetourne le cookie de cluster.
Distributed.cluster_cookie — Methodcluster_cookie(cookie) -> cookieDéfinit le cookie passé comme le cookie de cluster, puis le retourne.
Cluster Manager Interface
Cette interface fournit un mécanisme pour lancer et gérer des travailleurs Julia sur différents environnements de cluster. Il existe deux types de gestionnaires présents dans Base : LocalManager, pour lancer des travailleurs supplémentaires sur le même hôte, et SSHManager, pour lancer sur des hôtes distants via ssh. Des sockets TCP/IP sont utilisés pour se connecter et transporter des messages entre les processus. Il est possible que les gestionnaires de cluster fournissent un transport différent.
Distributed.ClusterManager — TypeClusterManagerSupertype pour les gestionnaires de clusters, qui contrôlent les processus de travail en tant que cluster. Les gestionnaires de clusters mettent en œuvre comment les travailleurs peuvent être ajoutés, supprimés et communiqués. SSHManager et LocalManager sont des sous-types de cela.
Distributed.WorkerConfig — TypeWorkerConfigType utilisé par ClusterManagers pour contrôler les travailleurs ajoutés à leurs clusters. Certains champs sont utilisés par tous les gestionnaires de clusters pour accéder à un hôte :
io– la connexion utilisée pour accéder au travailleur (un sous-type deIOouNothing)host– l'adresse de l'hôte (soit uneStringouNothing)port– le port sur l'hôte utilisé pour se connecter au travailleur (soit unIntouNothing)
Certains sont utilisés par le gestionnaire de cluster pour ajouter des travailleurs à un hôte déjà initialisé :
count– le nombre de travailleurs à lancer sur l'hôteexename– le chemin vers l'exécutable Julia sur l'hôte, par défaut"$(Sys.BINDIR)/julia"ou"$(Sys.BINDIR)/julia-debug"exeflags– options à utiliser lors du lancement de Julia à distance
Le champ userdata est utilisé pour stocker des informations pour chaque travailleur par des gestionnaires externes.
Certains champs sont utilisés par SSHManager et des gestionnaires similaires :
tunnel–true(utiliser le tunneling),false(ne pas utiliser le tunneling), ounothing(utiliser la valeur par défaut pour le gestionnaire)multiplex–true(utiliser le multiplexage SSH pour le tunneling) oufalseforward– l'option de transfert utilisée pour l'option-Lde sshbind_addr– l'adresse sur l'hôte distant à laquelle se liersshflags– options à utiliser pour établir la connexion SSHmax_parallel– le nombre maximum de travailleurs à connecter en parallèle sur l'hôte
Certains champs sont utilisés à la fois par les LocalManagers et les SSHManagers :
connect_at– détermine s'il s'agit d'un appel de configuration travailleur-à-travailleur ou pilote-à-travailleurprocess– le processus qui sera connecté (généralement le gestionnaire assignera cela lors deaddprocs)ospid– l'ID de processus selon le système d'exploitation de l'hôte, utilisé pour interrompre les processus des travailleursenviron– dictionnaire privé utilisé pour stocker des informations temporaires par les gestionnaires Local/SSHident– travailleur tel qu'identifié par leClusterManagerconnect_idents– liste des identifiants de travailleurs auxquels le travailleur doit se connecter s'il utilise une topologie personnaliséeenable_threaded_blas–true,false, ounothing, s'il faut utiliser BLAS multithreadé ou non sur les travailleurs
Distributed.launch — Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)Implémenté par les gestionnaires de cluster. Pour chaque travailleur Julia lancé par cette fonction, il doit ajouter une entrée WorkerConfig à launched et notifier launch_ntfy. La fonction DOIT se terminer une fois que tous les travailleurs, demandés par manager, ont été lancés. params est un dictionnaire de tous les arguments de mot-clé avec lesquels addprocs a été appelé.
Distributed.manage — Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)Implémenté par les gestionnaires de cluster. Il est appelé sur le processus maître, pendant la durée de vie d'un travailleur, avec des valeurs op appropriées :
- avec
:register/:deregisterlorsqu'un travailleur est ajouté / retiré du pool de travailleurs Julia. - avec
:interruptlorsqueinterrupt(workers)est appelé. LeClusterManagerdoit signaler au travailleur approprié avec un signal d'interruption. - avec
:finalizeà des fins de nettoyage.
Base.kill — Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)Implémenté par les gestionnaires de cluster. Il est appelé sur le processus maître, par rmprocs. Cela devrait amener le travailleur distant spécifié par pid à quitter. kill(manager::ClusterManager.....) exécute un exit() distant sur pid.
Sockets.connect — Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)Implémenté par des gestionnaires de cluster utilisant des transports personnalisés. Cela doit établir une connexion logique avec le travailleur ayant l'identifiant pid, spécifié par config, et retourner une paire d'objets IO. Les messages de pid vers le processus actuel seront lus à partir de instrm, tandis que les messages à envoyer à pid seront écrits dans outstrm. L'implémentation du transport personnalisé doit garantir que les messages sont livrés et reçus complètement et dans l'ordre. connect(manager::ClusterManager.....) configure des connexions de socket TCP/IP entre les travailleurs.
Distributed.init_worker — Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())Appelé par les gestionnaires de cluster implémentant des transports personnalisés. Il initialise un processus nouvellement lancé en tant que travailleur. L'argument de ligne de commande --worker[=<cookie>] a pour effet d'initialiser un processus en tant que travailleur utilisant des sockets TCP/IP pour le transport. cookie est un cluster_cookie.
Distributed.start_worker — Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker est une fonction interne qui est le point d'entrée par défaut pour les processus de travail se connectant via TCP/IP. Elle configure le processus en tant que travailleur du cluster Julia.
Les informations host:port sont écrites dans le flux out (par défaut stdout).
La fonction lit le cookie depuis stdin si nécessaire, et écoute sur un port libre (ou si spécifié, le port dans l'option de ligne de commande --bind-to) et planifie des tâches pour traiter les connexions TCP entrantes et les demandes. Elle ferme également (optionnellement) stdin et redirige stderr vers stdout.
Elle ne retourne pas.
Distributed.process_messages — Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)Appelé par les gestionnaires de cluster utilisant des transports personnalisés. Il doit être appelé lorsque l'implémentation du transport personnalisé reçoit le premier message d'un travailleur distant. Le transport personnalisé doit gérer une connexion logique au travailleur distant et fournir deux objets IO, un pour les messages entrants et l'autre pour les messages adressés au travailleur distant. Si incoming est true, le pair distant a initié la connexion. Celui des deux qui initie la connexion envoie le cookie du cluster et son numéro de version Julia pour effectuer la poignée de main d'authentification.
Voir aussi cluster_cookie.
Distributed.default_addprocs_params — Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}Implémenté par les gestionnaires de cluster. Les paramètres de mot-clé par défaut passés lors de l'appel de addprocs(mgr). L'ensemble minimal d'options est disponible en appelant default_addprocs_params().