Distributed Computing

Distributed.addprocsFunction
addprocs(manager::ClusterManager; kwargs...) -> Liste des identifiants de processus

Lance des processus de travail via le gestionnaire de cluster spécifié.

Par exemple, les clusters Beowulf sont pris en charge via un gestionnaire de cluster personnalisé implémenté dans le package ClusterManagers.jl.

Le nombre de secondes qu'un nouveau travailleur attend pour établir une connexion avec le maître peut être spécifié via la variable JULIA_WORKER_TIMEOUT dans l'environnement du processus de travailleur. Pertinent uniquement lors de l'utilisation de TCP/IP comme transport.

Pour lancer des travailleurs sans bloquer le REPL, ou la fonction contenant si vous lancez des travailleurs de manière programmatique, exécutez addprocs dans sa propre tâche.

Exemples

# Sur des clusters occupés, appelez `addprocs` de manière asynchrone
t = @async addprocs(...)
# Utilisez les travailleurs au fur et à mesure qu'ils se connectent
if nprocs() > 1   # Assurez-vous qu'au moins un nouveau travailleur est disponible
   ....   # effectuez une exécution distribuée
end
# Récupérez les ID des travailleurs nouvellement lancés, ou tout message d'erreur
if istaskdone(t)   # Vérifiez si `addprocs` a été complété pour s'assurer que `fetch` ne bloque pas
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Liste des identifiants de processus

Ajoutez des processus de travail sur des machines distantes via SSH. La configuration se fait avec des arguments de mots-clés (voir ci-dessous). En particulier, le mot-clé exename peut être utilisé pour spécifier le chemin vers le binaire julia sur la ou les machines distantes.

machines est un vecteur de "spécifications de machine" qui sont données sous forme de chaînes du type [user@]host[:port] [bind_addr[:port]]. user par défaut est l'utilisateur actuel et port le port SSH standard. Si [bind_addr[:port]] est spécifié, d'autres travailleurs se connecteront à ce travailleur à l'adresse bind_addr et au port spécifiés.

Il est possible de lancer plusieurs processus sur un hôte distant en utilisant un tuple dans le vecteur machines ou la forme (machine_spec, count), où count est le nombre de travailleurs à lancer sur l'hôte spécifié. Passer :auto comme nombre de travailleurs lancera autant de travailleurs que le nombre de threads CPU sur l'hôte distant.

Exemples:

addprocs([
    "remote1",               # un travailleur sur 'remote1' se connectant avec le nom d'utilisateur actuel
    "user@remote2",          # un travailleur sur 'remote2' se connectant avec le nom d'utilisateur 'user'
    "user@remote3:2222",     # spécifiant le port SSH à '2222' pour 'remote3'
    ("user@remote4", 4),     # lancer 4 travailleurs sur 'remote4'
    ("user@remote5", :auto), # lancer autant de travailleurs que de threads CPU sur 'remote5'
])

Arguments de mots-clés:

  • tunnel: si true, alors un tunnel SSH sera utilisé pour se connecter au travailleur depuis le processus maître. Par défaut, c'est false.

  • multiplex: si true, alors le multiplexage SSH est utilisé pour le tunnel SSH. Par défaut, c'est false.

  • ssh: le nom ou le chemin de l'exécutable du client SSH utilisé pour démarrer les travailleurs. Par défaut, c'est "ssh".

  • sshflags: spécifie des options ssh supplémentaires, par exemple sshflags=`-i /home/foo/bar.pem`

  • max_parallel: spécifie le nombre maximum de travailleurs connectés en parallèle à un hôte. Par défaut, c'est 10.

  • shell: spécifie le type de shell auquel ssh se connecte sur les travailleurs.

    • shell=:posix: un shell Unix/Linux compatible POSIX (sh, ksh, bash, dash, zsh, etc.). Par défaut.
    • shell=:csh: un shell C Unix (csh, tcsh).
    • shell=:wincmd: Microsoft Windows cmd.exe.
  • dir: spécifie le répertoire de travail sur les travailleurs. Par défaut, c'est le répertoire actuel de l'hôte (tel que trouvé par pwd())

  • enable_threaded_blas: si true, alors BLAS s'exécutera sur plusieurs threads dans les processus ajoutés. Par défaut, c'est false.

  • exename: nom de l'exécutable julia. Par défaut, c'est "$(Sys.BINDIR)/julia" ou "$(Sys.BINDIR)/julia-debug" selon le cas. Il est recommandé d'utiliser une version commune de Julia sur toutes les machines distantes car la sérialisation et la distribution de code pourraient échouer sinon.

  • exeflags: drapeaux supplémentaires passés aux processus de travail.

  • topology: Spécifie comment les travailleurs se connectent les uns aux autres. L'envoi d'un message entre des travailleurs non connectés entraîne une erreur.

    • topology=:all_to_all: Tous les processus sont connectés les uns aux autres. Par défaut.
    • topology=:master_worker: Seul le processus pilote, c'est-à-dire pid 1 se connecte aux travailleurs. Les travailleurs ne se connectent pas entre eux.
    • topology=:custom: La méthode launch du gestionnaire de cluster spécifie la topologie de connexion via les champs ident et connect_idents dans WorkerConfig. Un travailleur avec une identité de gestionnaire de cluster ident se connectera à tous les travailleurs spécifiés dans connect_idents.
  • lazy: Applicable uniquement avec topology=:all_to_all. Si true, les connexions travailleur-travailleur sont configurées de manière paresseuse, c'est-à-dire qu'elles sont configurées lors de la première instance d'un appel distant entre travailleurs. Par défaut, c'est vrai.

  • env: fournir un tableau de paires de chaînes telles que env=["JULIA_DEPOT_PATH"=>"/depot"] pour demander que des variables d'environnement soient définies sur la machine distante. Par défaut, seule la variable d'environnement JULIA_WORKER_TIMEOUT est passée automatiquement de l'environnement local à l'environnement distant.

  • cmdline_cookie: passez le cookie d'authentification via l'option de ligne de commande --worker. Le comportement par défaut (plus sécurisé) de passer le cookie via ssh stdio peut se bloquer avec des travailleurs Windows utilisant des versions Julia ou Windows plus anciennes (avant ConPTY), auquel cas cmdline_cookie=true offre une solution de contournement.

Julia 1.6

Les arguments de mots-clés ssh, shell, env et cmdline_cookie ont été ajoutés dans Julia 1.6.

Variables d'environnement:

Si le processus maître échoue à établir une connexion avec un travailleur nouvellement lancé dans les 60,0 secondes, le travailleur considère cela comme une situation fatale et se termine. Ce délai peut être contrôlé via la variable d'environnement JULIA_WORKER_TIMEOUT. La valeur de JULIA_WORKER_TIMEOUT sur le processus maître spécifie le nombre de secondes qu'un travailleur nouvellement lancé attend pour établir une connexion.

source
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Liste des identifiants de processus

Lancez des travailleurs np sur l'hôte local en utilisant le LocalManager intégré.

Les travailleurs locaux héritent de l'environnement de package actuel (c'est-à-dire le projet actif, LOAD_PATH, et DEPOT_PATH) du processus principal.

Warning

Notez que les travailleurs n'exécutent pas de script de démarrage ~/.julia/config/startup.jl, ni ne synchronisent leur état global (tel que les options de ligne de commande, les variables globales, les définitions de nouvelles méthodes et les modules chargés) avec aucun des autres processus en cours d'exécution.

Arguments clés :

  • restrict::Bool : si true (par défaut), la liaison est restreinte à 127.0.0.1.
  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas : même effet que pour SSHManager, voir la documentation pour addprocs(machines::AbstractVector).
Julia 1.9

L'héritage de l'environnement de package et l'argument clé env ont été ajoutés dans Julia 1.9.

source
Distributed.nprocsFunction
nprocs()

Obtenez le nombre de processus disponibles.

Exemples

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.nworkersFunction
nworkers()

Obtenez le nombre de processus de travail disponibles. Cela est un de moins que nprocs(). Égal à nprocs() si nprocs() == 1.

Exemples

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
source
Distributed.procsMethod
procs()

Renvoie une liste de tous les identifiants de processus, y compris le pid 1 (qui n'est pas inclus par workers()).

Exemples

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source
Distributed.procsMethod
procs(pid::Integer)

Renvoie une liste de tous les identifiants de processus sur le même nœud physique. En particulier, tous les travailleurs liés à la même adresse IP que pid sont renvoyés.

source
Distributed.workersFunction
workers()

Retourne une liste de tous les identifiants de processus de travail.

Exemples

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.rmprocsFunction
rmprocs(pids...; waitfor=typemax(Int))

Supprime les travailleurs spécifiés. Notez que seul le processus 1 peut ajouter ou supprimer des travailleurs.

L'argument waitfor spécifie combien de temps attendre que les travailleurs s'arrêtent :

  • Si non spécifié, rmprocs attendra que tous les pids demandés soient supprimés.
  • Une ErrorException est levée si tous les travailleurs ne peuvent pas être terminés avant les secondes waitfor demandées.
  • Avec une valeur waitfor de 0, l'appel retourne immédiatement avec les travailleurs programmés pour suppression dans une tâche différente. L'objet Task programmé est retourné. L'utilisateur doit appeler wait sur la tâche avant d'invoquer d'autres appels parallèles.

Exemples

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source
Distributed.interruptFunction
interrupt(pids::Integer...)

Interrompt la tâche en cours d'exécution sur les travailleurs spécifiés. Cela équivaut à appuyer sur Ctrl-C sur la machine locale. Si aucun argument n'est donné, tous les travailleurs sont interrompus.

source
interrupt(pids::AbstractVector=workers())

Interrompt la tâche en cours d'exécution sur les travailleurs spécifiés. Cela équivaut à appuyer sur Ctrl-C sur la machine locale. Si aucun argument n'est donné, tous les travailleurs sont interrompus.

source
Distributed.myidFunction
myid()

Obtenez l'identifiant du processus actuel.

Exemples

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source
Distributed.pmapFunction
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

Transformez la collection c en appliquant f à chaque élément en utilisant les travailleurs et tâches disponibles.

Pour plusieurs arguments de collection, appliquez f élément par élément.

Notez que f doit être rendu disponible à tous les processus de travailleur ; voir Code Availability and Loading Packages pour plus de détails.

Si un pool de travailleurs n'est pas spécifié, tous les travailleurs disponibles seront utilisés via un CachingPool.

Par défaut, pmap distribue le calcul sur tous les travailleurs spécifiés. Pour utiliser uniquement le processus local et distribuer sur les tâches, spécifiez distributed=false. Cela équivaut à utiliser asyncmap. Par exemple, pmap(f, c; distributed=false) est équivalent à asyncmap(f,c; ntasks=()->nworkers())

pmap peut également utiliser un mélange de processus et de tâches via l'argument batch_size. Pour des tailles de lot supérieures à 1, la collection est traitée en plusieurs lots, chacun d'une longueur de batch_size ou moins. Un lot est envoyé comme une seule demande à un travailleur libre, où un asyncmap local traite les éléments du lot en utilisant plusieurs tâches concurrentes.

Toute erreur empêche pmap de traiter le reste de la collection. Pour remplacer ce comportement, vous pouvez spécifier une fonction de gestion des erreurs via l'argument on_error qui prend un seul argument, c'est-à-dire l'exception. La fonction peut arrêter le traitement en relançant l'erreur, ou, pour continuer, retourner toute valeur qui est ensuite renvoyée en ligne avec les résultats à l'appelant.

Considérez les deux exemples suivants. Le premier renvoie l'objet d'exception en ligne, le second un 0 à la place de toute exception :

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Les erreurs peuvent également être gérées en réessayant les calculs échoués. Les arguments de mot-clé retry_delays et retry_check sont transmis à retry en tant qu'arguments de mot-clé delays et check respectivement. Si le regroupement est spécifié, et qu'un lot entier échoue, tous les éléments du lot sont réessayés.

Notez que si à la fois on_error et retry_delays sont spécifiés, le crochet on_error est appelé avant de réessayer. Si on_error ne lance pas (ou ne relance pas) une exception, l'élément ne sera pas réessayé.

Exemple : En cas d'erreurs, réessayez f sur un élément un maximum de 3 fois sans aucun délai entre les réessais.

pmap(f, c; retry_delays = zeros(3))

Exemple : Réessayez f uniquement si l'exception n'est pas de type InexactError, avec des délais augmentant de manière exponentielle jusqu'à 3 fois. Retournez un NaN à la place de toutes les occurrences de InexactError.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
source
Distributed.RemoteExceptionType
RemoteException(captured)

Les exceptions lors des calculs distants sont capturées et relancées localement. Un RemoteException enveloppe le pid du travailleur et une exception capturée. Une CapturedException capture l'exception distante et une forme sérialisable de la pile d'appels lorsque l'exception a été levée.

source
Distributed.ProcessExitedExceptionType
ProcessExitedException(worker_id::Int)

Après qu'un processus client Julia ait quitté, toute tentative ultérieure de référencer l'enfant mort déclenchera cette exception.

source
Distributed.FutureType
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Un Future est un espace réservé pour un calcul unique dont le statut de terminaison et le temps sont inconnus. Pour plusieurs calculs potentiels, voir RemoteChannel. Voir remoteref_id pour identifier un AbstractRemoteRef.

source
Distributed.RemoteChannelType
RemoteChannel(pid::Integer=myid())

Faites référence à un Channel{Any}(1) sur le processus pid. Le pid par défaut est le processus actuel.

RemoteChannel(f::Function, pid::Integer=myid())

Créez des références à des canaux distants d'une taille et d'un type spécifiques. f est une fonction qui, lorsqu'elle est exécutée sur pid, doit renvoyer une implémentation d'un AbstractChannel.

Par exemple, RemoteChannel(()->Channel{Int}(10), pid), renverra une référence à un canal de type Int et de taille 10 sur pid.

Le pid par défaut est le processus actuel.

source
Base.fetchMethod
fetch(x::Future)

Attendez et obtenez la valeur d'un Future. La valeur récupérée est mise en cache localement. Les appels ultérieurs à fetch sur la même référence renvoient la valeur mise en cache. Si la valeur distante est une exception, cela lance une RemoteException qui capture l'exception distante et la trace de la pile.

source
Base.fetchMethod
fetch(c::RemoteChannel)

Attendez et obtenez une valeur d'un RemoteChannel. Les exceptions levées sont les mêmes que pour un Future. Ne supprime pas l'élément récupéré.

source
fetch(x::Any)

Retourne x.

source
Distributed.remotecallMethod
remotecall(f, id::Integer, args...; kwargs...) -> Future

Appelle une fonction f de manière asynchrone avec les arguments donnés sur le processus spécifié. Retourne un Future. Les arguments de mot-clé, le cas échéant, sont transmis à f.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, id::Integer, args...; kwargs...)

Effectuez un wait(remotecall(...)) plus rapide en un seul message sur le Worker spécifié par l'identifiant du worker id. Les arguments de mot-clé, le cas échéant, sont transmis à f.

Voir aussi wait et remotecall.

source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, id::Integer, args...; kwargs...)

Effectue fetch(remotecall(...)) en un seul message. Les arguments de mot-clé, le cas échéant, sont transmis à f. Toute exception distante est capturée dans une RemoteException et levée.

Voir aussi fetch et remotecall.

Exemples

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: Sur le worker 2 :
DomainError avec -4.0 :
sqrt a été appelé avec un argument réel négatif mais ne renverra qu'un résultat complexe s'il est appelé avec un argument complexe. Essayez sqrt(Complex(x)).
...
source
Distributed.remote_doMethod
remote_do(f, id::Integer, args...; kwargs...) -> nothing

Exécute f sur le travailleur id de manière asynchrone. Contrairement à remotecall, il ne stocke pas le résultat du calcul, ni il n'y a de moyen d'attendre son achèvement.

Une invocation réussie indique que la demande a été acceptée pour exécution sur le nœud distant.

Alors que les appels consécutifs à remotecall vers le même travailleur sont sérialisés dans l'ordre où ils sont invoqués, l'ordre des exécutions sur le travailleur distant est indéterminé. Par exemple, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) va sérialiser l'appel à f1, suivi de f2 et f3 dans cet ordre. Cependant, il n'est pas garanti que f1 soit exécuté avant f3 sur le travailleur 2.

Toutes les exceptions levées par f sont imprimées sur stderr sur le travailleur distant.

Les arguments de mot-clé, le cas échéant, sont transmis à f.

source
Base.put!Method
put!(rr::RemoteChannel, args...)

Stocke un ensemble de valeurs dans le RemoteChannel. Si le canal est plein, il bloque jusqu'à ce qu'un espace soit disponible. Retourne le premier argument.

source
Base.put!Method
put!(rr::Future, v)

Stockez une valeur dans un Future rr. Les Futures sont des références distantes en écriture unique. Un put! sur un Future déjà défini déclenche une Exception. Tous les appels distants asynchrones renvoient des Futures et définissent la valeur sur la valeur de retour de l'appel une fois terminé.

source
Base.take!Method
take!(rr::RemoteChannel, args...)

Récupère la ou les valeur(s) d'un RemoteChannel rr, en supprimant la ou les valeur(s) dans le processus.

source
Base.isreadyMethod
isready(rr::RemoteChannel, args...)

Déterminez si un RemoteChannel a une valeur stockée. Notez que cette fonction peut provoquer des conditions de concurrence, car au moment où vous recevez son résultat, cela peut ne plus être vrai. Cependant, elle peut être utilisée en toute sécurité sur un Future car ils ne sont assignés qu'une seule fois.

source
Base.isreadyMethod
isready(rr::Future)

Déterminez si un Future a une valeur stockée.

Si l'argument Future est détenu par un nœud différent, cet appel bloquera en attendant la réponse. Il est recommandé d'attendre rr dans une tâche séparée ou d'utiliser un Channel local comme proxy :

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # ne bloquera pas
source
Distributed.AbstractWorkerPoolType
AbstractWorkerPool

Supertype pour les pools de travailleurs tels que WorkerPool et CachingPool. Un AbstractWorkerPool doit implémenter :

  • push! - ajouter un nouveau travailleur au pool global (disponible + occupé)
  • put! - remettre un travailleur dans le pool disponible
  • take! - prendre un travailleur du pool disponible (à utiliser pour l'exécution de fonctions distantes)
  • length - nombre de travailleurs disponibles dans le pool global
  • isready - retourner false si un take! sur le pool bloquerait, sinon true

Les implémentations par défaut de ce qui précède (sur un AbstractWorkerPool) nécessitent des champs

  • channel::Channel{Int}
  • workers::Set{Int}

channel contient les pids des travailleurs libres et workers est l'ensemble de tous les travailleurs associés à ce pool.

source
Distributed.WorkerPoolType
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

Créez un WorkerPool à partir d'un vecteur ou d'une plage d'identifiants de travailleurs.

Exemples

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
source
Distributed.CachingPoolType
CachingPool(workers::Vector{Int})

Une implémentation d'un AbstractWorkerPool. remote, remotecall_fetch, pmap (et d'autres appels distants qui exécutent des fonctions à distance) bénéficient de la mise en cache des fonctions sérialisées/désérialisées sur les nœuds de travail, en particulier les fermetures (qui peuvent capturer de grandes quantités de données).

Le cache distant est maintenu pendant la durée de vie de l'objet CachingPool retourné. Pour vider le cache plus tôt, utilisez clear!(pool).

Pour les variables globales, seules les liaisons sont capturées dans une fermeture, pas les données. Des blocs let peuvent être utilisés pour capturer des données globales.

Exemples

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

Ce qui précède transférerait foo une seule fois à chaque travailleur.

source
Distributed.default_worker_poolFunction
default_worker_pool()

AbstractWorkerPool contenant des workers inactifs - utilisé par remote(f) et pmap (par défaut). À moins qu'un ne soit explicitement défini via default_worker_pool!(pool), le pool de travailleurs par défaut est initialisé à un WorkerPool.

Exemples

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source
Distributed.clear!Function
clear!(syms, pids=workers(); mod=Main)

Efface les liaisons globales dans les modules en les initialisant à nothing. syms doit être de type Symbol ou une collection de Symbols. pids et mod identifient les processus et le module dans lequel les variables globales doivent être réinitialisées. Seuls les noms trouvés définis sous mod sont effacés.

Une exception est levée si une constante globale est demandée à être effacée.

source
clear!(pool::CachingPool) -> pool

Supprime toutes les fonctions mises en cache de tous les travailleurs participants.

source
Distributed.remotecallMethod
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

variante de WorkerPool de remotecall(f, pid, ....). Attendez et prenez un travailleur libre de pool et effectuez un remotecall dessus.

Exemples

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

Dans cet exemple, la tâche a été exécutée sur pid 2, appelée depuis pid 1.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool variante de remotecall_wait(f, pid, ....). Attendez et prenez un travailleur libre de pool et effectuez un remotecall_wait dessus.

Exemples

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool variante de remotecall_fetch(f, pid, ....). Attend et prend un travailleur libre de pool et effectue un remotecall_fetch dessus.

Exemples

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source
Distributed.remote_doMethod
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> rien

WorkerPool variante de remote_do(f, pid, ....). Attendez et prenez un travailleur libre de pool et effectuez un remote_do dessus.

source
Distributed.@spawnMacro
@spawn expr

Créez une fermeture autour d'une expression et exécutez-la sur un processus choisi automatiquement, renvoyant un Future vers le résultat. Ce macro est obsolète ; @spawnat :any expr doit être utilisé à la place.

Exemples

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

À partir de Julia 1.3, ce macro est obsolète. Utilisez @spawnat :any à la place.

source
Distributed.@spawnatMacro
@spawnat p expr

Créez une fermeture autour d'une expression et exécutez la fermeture de manière asynchrone sur le processus p. Retournez un Future vers le résultat. Si p est le symbole littéral cité :any, alors le système choisira automatiquement un processeur à utiliser.

Exemples

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

L'argument :any est disponible depuis Julia 1.3.

source
Distributed.@fetchMacro
@fetch expr

Équivalent à fetch(@spawnat :any expr). Voir fetch et @spawnat.

Exemples

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source
Distributed.@distributedMacro
@distributed

Une boucle for parallèle en mémoire distribuée de la forme :

@distributed [réducteur] pour var = plage
    corps
fin

La plage spécifiée est partitionnée et exécutée localement sur tous les travailleurs. Dans le cas où une fonction réductrice optionnelle est spécifiée, @distributed effectue des réductions locales sur chaque travailleur avec une réduction finale sur le processus appelant.

Notez que sans fonction réductrice, @distributed s'exécute de manière asynchrone, c'est-à-dire qu'il génère des tâches indépendantes sur tous les travailleurs disponibles et retourne immédiatement sans attendre l'achèvement. Pour attendre l'achèvement, préfixez l'appel avec @sync, comme suit :

@sync @distributed pour var = plage
    corps
fin
source
Distributed.@everywhereMacro
@everywhere [procs()] expr

Exécute une expression sous Main sur tous les procs. Les erreurs sur l'un des processus sont collectées dans une CompositeException et lancées. Par exemple :

@everywhere bar = 1

définira Main.bar sur tous les processus actuels. Les processus ajoutés plus tard (par exemple avec addprocs()) n'auront pas l'expression définie.

Contrairement à @spawnat, @everywhere ne capture aucune variable locale. Au lieu de cela, les variables locales peuvent être diffusées en utilisant l'interpolation :

foo = 1
@everywhere bar = $foo

L'argument optionnel procs permet de spécifier un sous-ensemble de tous les processus pour exécuter l'expression.

Semblable à l'appel de remotecall_eval(Main, procs, expr), mais avec deux fonctionnalités supplémentaires :

- Les instructions `using` et `import` s'exécutent d'abord sur le processus appelant, pour s'assurer que
  les paquets sont précompilés.
- Le chemin du fichier source actuel utilisé par `include` est propagé aux autres processus.
source
Distributed.remoteref_idFunction
remoteref_id(r::AbstractRemoteRef) -> RRID

Les Futures et les RemoteChannels sont identifiés par des champs :

  • where - fait référence au nœud où l'objet/de stockage sous-jacent auquel la référence fait référence existe réellement.
  • whence - fait référence au nœud à partir duquel la référence distante a été créée. Notez que cela est différent du nœud où l'objet sous-jacent auquel il est fait référence existe réellement. Par exemple, appeler RemoteChannel(2) depuis le processus maître donnerait une valeur where de 2 et une valeur whence de 1.
  • id est unique parmi toutes les références créées à partir du travailleur spécifié par whence.

Ensemble, whence et id identifient de manière unique une référence parmi tous les travailleurs.

remoteref_id est une API de bas niveau qui renvoie un objet RRID qui encapsule les valeurs whence et id d'une référence distante.

source
Distributed.channel_from_idFunction
channel_from_id(id) -> c

Une API de bas niveau qui retourne le AbstractChannel de support pour un id retourné par remoteref_id. L'appel est valide uniquement sur le nœud où le canal de support existe.

source
Distributed.worker_id_from_socketFunction
worker_id_from_socket(s) -> pid

Une API de bas niveau qui, étant donné une connexion IO ou un Worker, renvoie le pid du worker auquel il est connecté. Cela est utile lors de l'écriture de méthodes serialize personnalisées pour un type, qui optimisent les données écrites en fonction de l'identifiant du processus récepteur.

source

Cluster Manager Interface

Cette interface fournit un mécanisme pour lancer et gérer des travailleurs Julia sur différents environnements de cluster. Il existe deux types de gestionnaires présents dans Base : LocalManager, pour lancer des travailleurs supplémentaires sur le même hôte, et SSHManager, pour lancer sur des hôtes distants via ssh. Des sockets TCP/IP sont utilisés pour se connecter et transporter des messages entre les processus. Il est possible que les gestionnaires de cluster fournissent un transport différent.

Distributed.ClusterManagerType
ClusterManager

Supertype pour les gestionnaires de clusters, qui contrôlent les processus de travail en tant que cluster. Les gestionnaires de clusters mettent en œuvre comment les travailleurs peuvent être ajoutés, supprimés et communiqués. SSHManager et LocalManager sont des sous-types de cela.

source
Distributed.WorkerConfigType
WorkerConfig

Type utilisé par ClusterManagers pour contrôler les travailleurs ajoutés à leurs clusters. Certains champs sont utilisés par tous les gestionnaires de clusters pour accéder à un hôte :

  • io – la connexion utilisée pour accéder au travailleur (un sous-type de IO ou Nothing)
  • host – l'adresse de l'hôte (soit une String ou Nothing)
  • port – le port sur l'hôte utilisé pour se connecter au travailleur (soit un Int ou Nothing)

Certains sont utilisés par le gestionnaire de cluster pour ajouter des travailleurs à un hôte déjà initialisé :

  • count – le nombre de travailleurs à lancer sur l'hôte
  • exename – le chemin vers l'exécutable Julia sur l'hôte, par défaut "$(Sys.BINDIR)/julia" ou "$(Sys.BINDIR)/julia-debug"
  • exeflags – options à utiliser lors du lancement de Julia à distance

Le champ userdata est utilisé pour stocker des informations pour chaque travailleur par des gestionnaires externes.

Certains champs sont utilisés par SSHManager et des gestionnaires similaires :

  • tunneltrue (utiliser le tunneling), false (ne pas utiliser le tunneling), ou nothing (utiliser la valeur par défaut pour le gestionnaire)
  • multiplextrue (utiliser le multiplexage SSH pour le tunneling) ou false
  • forward – l'option de transfert utilisée pour l'option -L de ssh
  • bind_addr – l'adresse sur l'hôte distant à laquelle se lier
  • sshflags – options à utiliser pour établir la connexion SSH
  • max_parallel – le nombre maximum de travailleurs à connecter en parallèle sur l'hôte

Certains champs sont utilisés à la fois par les LocalManagers et les SSHManagers :

  • connect_at – détermine s'il s'agit d'un appel de configuration travailleur-à-travailleur ou pilote-à-travailleur
  • process – le processus qui sera connecté (généralement le gestionnaire assignera cela lors de addprocs)
  • ospid – l'ID de processus selon le système d'exploitation de l'hôte, utilisé pour interrompre les processus des travailleurs
  • environ – dictionnaire privé utilisé pour stocker des informations temporaires par les gestionnaires Local/SSH
  • ident – travailleur tel qu'identifié par le ClusterManager
  • connect_idents – liste des identifiants de travailleurs auxquels le travailleur doit se connecter s'il utilise une topologie personnalisée
  • enable_threaded_blastrue, false, ou nothing, s'il faut utiliser BLAS multithreadé ou non sur les travailleurs
source
Distributed.launchFunction
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Implémenté par les gestionnaires de cluster. Pour chaque travailleur Julia lancé par cette fonction, il doit ajouter une entrée WorkerConfig à launched et notifier launch_ntfy. La fonction DOIT se terminer une fois que tous les travailleurs, demandés par manager, ont été lancés. params est un dictionnaire de tous les arguments de mot-clé avec lesquels addprocs a été appelé.

source
Distributed.manageFunction
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Implémenté par les gestionnaires de cluster. Il est appelé sur le processus maître, pendant la durée de vie d'un travailleur, avec des valeurs op appropriées :

  • avec :register/:deregister lorsqu'un travailleur est ajouté / retiré du pool de travailleurs Julia.
  • avec :interrupt lorsque interrupt(workers) est appelé. Le ClusterManager doit signaler au travailleur approprié avec un signal d'interruption.
  • avec :finalize à des fins de nettoyage.
source
Base.killMethod
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Implémenté par les gestionnaires de cluster. Il est appelé sur le processus maître, par rmprocs. Cela devrait amener le travailleur distant spécifié par pid à quitter. kill(manager::ClusterManager.....) exécute un exit() distant sur pid.

source
Sockets.connectMethod
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implémenté par des gestionnaires de cluster utilisant des transports personnalisés. Cela doit établir une connexion logique avec le travailleur ayant l'identifiant pid, spécifié par config, et retourner une paire d'objets IO. Les messages de pid vers le processus actuel seront lus à partir de instrm, tandis que les messages à envoyer à pid seront écrits dans outstrm. L'implémentation du transport personnalisé doit garantir que les messages sont livrés et reçus complètement et dans l'ordre. connect(manager::ClusterManager.....) configure des connexions de socket TCP/IP entre les travailleurs.

source
Distributed.init_workerFunction
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Appelé par les gestionnaires de cluster implémentant des transports personnalisés. Il initialise un processus nouvellement lancé en tant que travailleur. L'argument de ligne de commande --worker[=<cookie>] a pour effet d'initialiser un processus en tant que travailleur utilisant des sockets TCP/IP pour le transport. cookie est un cluster_cookie.

source
Distributed.start_workerFunction
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker est une fonction interne qui est le point d'entrée par défaut pour les processus de travail se connectant via TCP/IP. Elle configure le processus en tant que travailleur du cluster Julia.

Les informations host:port sont écrites dans le flux out (par défaut stdout).

La fonction lit le cookie depuis stdin si nécessaire, et écoute sur un port libre (ou si spécifié, le port dans l'option de ligne de commande --bind-to) et planifie des tâches pour traiter les connexions TCP entrantes et les demandes. Elle ferme également (optionnellement) stdin et redirige stderr vers stdout.

Elle ne retourne pas.

source
Distributed.process_messagesFunction
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Appelé par les gestionnaires de cluster utilisant des transports personnalisés. Il doit être appelé lorsque l'implémentation du transport personnalisé reçoit le premier message d'un travailleur distant. Le transport personnalisé doit gérer une connexion logique au travailleur distant et fournir deux objets IO, un pour les messages entrants et l'autre pour les messages adressés au travailleur distant. Si incoming est true, le pair distant a initié la connexion. Celui des deux qui initie la connexion envoie le cookie du cluster et son numéro de version Julia pour effectuer la poignée de main d'authentification.

Voir aussi cluster_cookie.

source
Distributed.default_addprocs_paramsFunction
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

Implémenté par les gestionnaires de cluster. Les paramètres de mot-clé par défaut passés lors de l'appel de addprocs(mgr). L'ensemble minimal d'options est disponible en appelant default_addprocs_params().

source