Multi-processing and Distributed Computing
Une implémentation de calcul parallèle en mémoire distribuée est fournie par le module Distributed
dans le cadre de la bibliothèque standard livrée avec Julia.
La plupart des ordinateurs modernes possèdent plus d'un CPU, et plusieurs ordinateurs peuvent être combinés dans un cluster. Exploiter la puissance de ces multiples CPU permet d'effectuer de nombreux calculs plus rapidement. Il y a deux facteurs majeurs qui influencent la performance : la vitesse des CPU eux-mêmes et la vitesse de leur accès à la mémoire. Dans un cluster, il est assez évident qu'un CPU donné aura le plus rapide accès à la RAM au sein du même ordinateur (nœud). Peut-être plus surprenant, des problèmes similaires sont pertinents sur un ordinateur portable multicœur typique, en raison des différences de vitesse de la mémoire principale et du cache. Par conséquent, un bon environnement de multiprocessus devrait permettre de contrôler la "propriété" d'un morceau de mémoire par un CPU particulier. Julia fournit un environnement de multiprocessus basé sur le passage de messages pour permettre aux programmes de s'exécuter sur plusieurs processus dans des domaines de mémoire séparés en même temps.
L'implémentation du passage de messages de Julia est différente des autres environnements tels que MPI[1]. La communication en Julia est généralement "unilatérale", ce qui signifie que le programmeur doit gérer explicitement un seul processus dans une opération à deux processus. De plus, ces opérations ne ressemblent généralement pas à "l'envoi de message" et "la réception de message", mais ressemblent plutôt à des opérations de niveau supérieur comme des appels à des fonctions utilisateur.
La programmation distribuée en Julia repose sur deux primitives : références distantes et appels distants. Une référence distante est un objet qui peut être utilisé par n'importe quel processus pour faire référence à un objet stocké sur un processus particulier. Un appel distant est une demande d'un processus pour appeler une certaine fonction avec certains arguments sur un autre processus (éventuellement le même).
Les références distantes se présentent sous deux formes : Future
et RemoteChannel
.
Un appel distant renvoie un Future
à son résultat. Les appels distants retournent immédiatement ; le processus qui a effectué l'appel passe à son opération suivante pendant que l'appel distant se produit ailleurs. Vous pouvez attendre qu'un appel distant se termine en appelant wait
sur le 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
retourné, et vous pouvez obtenir la valeur complète du résultat en utilisant fetch
.
D'autre part, RemoteChannel
s sont réécrits. Par exemple, plusieurs processus peuvent coordonner leur traitement en faisant référence au même Channel
distant.
Chaque processus a un identifiant associé. Le processus fournissant l'invite interactive de Julia a toujours un id
égal à 1. Les processus utilisés par défaut pour les opérations parallèles sont appelés "travailleurs". Lorsqu'il n'y a qu'un seul processus, le processus 1 est considéré comme un travailleur. Sinon, les travailleurs sont considérés comme tous les processus autres que le processus 1. En conséquence, l'ajout de 2 processus ou plus est nécessaire pour tirer parti des méthodes de traitement parallèle comme pmap
. L'ajout d'un seul processus est bénéfique si vous souhaitez simplement faire d'autres choses dans le processus principal pendant qu'un long calcul s'exécute sur le travailleur.
Commençons par cela. Démarrer avec julia -p n
fournit n
processus de travail sur la machine locale. En général, il est logique que n
soit égal au nombre de threads CPU (cœurs logiques) sur la machine. Notez que l'argument -p
charge implicitement le module Distributed
.
$ julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
Le premier argument de remotecall
est la fonction à appeler. La plupart de la programmation parallèle en Julia ne fait pas référence à des processus spécifiques ou au nombre de processus disponibles, mais 4d61726b646f776e2e436f64652822222c202272656d6f746563616c6c2229_40726566
est considéré comme une interface de bas niveau offrant un contrôle plus précis. Le deuxième argument de 4d61726b646f776e2e436f64652822222c202272656d6f746563616c6c2229_40726566
est l'id
du processus qui effectuera le travail, et les arguments restants seront passés à la fonction appelée.
Comme vous pouvez le voir, dans la première ligne, nous avons demandé au processus 2 de construire une matrice aléatoire 2 par 2, et dans la deuxième ligne, nous lui avons demandé d'y ajouter 1. Le résultat des deux calculs est disponible dans les deux futurs, r
et s
. La macro @spawnat
évalue l'expression dans le deuxième argument sur le processus spécifié par le premier argument.
Occasionnellement, vous pourriez vouloir une valeur calculée à distance immédiatement. Cela se produit généralement lorsque vous lisez à partir d'un objet distant pour obtenir des données nécessaires à la prochaine opération locale. La fonction remotecall_fetch
existe à cet effet. Elle est équivalente à fetch(remotecall(...))
mais est plus efficace.
julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085
Cela récupère le tableau sur le travailleur 2 et renvoie la première valeur. Notez que fetch
ne déplace aucune donnée dans ce cas, car il est exécuté sur le travailleur qui possède le tableau. On peut également écrire :
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866
Rappelez-vous que getindex(r,1,1)
est equivalent à r[1,1]
, donc cet appel récupère le premier élément du futur r
.
Pour faciliter les choses, le symbole :any
peut être passé à @spawnat
, qui choisit où effectuer l'opération pour vous :
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
Notez que nous avons utilisé 1 .+ fetch(r)
au lieu de 1 .+ r
. Cela est dû au fait que nous ne savons pas où le code sera exécuté, donc en général un fetch
pourrait être nécessaire pour déplacer r
vers le processus effectuant l'addition. Dans ce cas, @spawnat
est suffisamment intelligent pour effectuer le calcul sur le processus qui possède r
, donc le 4d61726b646f776e2e436f64652822222c202266657463682229_40726566
sera une opération nulle (aucun travail n'est effectué).
(Il est à noter que @spawnat
n'est pas intégré mais défini en Julia comme un macro. Il est possible de définir vos propres constructions de ce type.)
Une chose importante à retenir est qu'une fois récupéré, un Future
mettra en cache sa valeur localement. D'autres appels fetch
n'entraînent pas de saut réseau. Une fois que toutes les références à 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
ont été récupérées, la valeur stockée à distance est supprimée.
@async
est similaire à @spawnat
, mais exécute uniquement des tâches sur le processus local. Nous l'utilisons pour créer une tâche "alimenteuse" pour chaque processus. Chaque tâche prend l'index suivant qui doit être calculé, puis attend que son processus se termine, puis répète jusqu'à ce que nous manquions d'indices. Notez que les tâches alimentatrices ne commencent pas à s'exécuter tant que la tâche principale n'atteint pas la fin du bloc @sync
, à quel point elle abandonne le contrôle et attend que toutes les tâches locales soient terminées avant de revenir de la fonction. En ce qui concerne v0.7 et au-delà, les tâches alimentatrices peuvent partager l'état via nextidx
car elles s'exécutent toutes sur le même processus. Même si les Tasks
sont planifiées de manière coopérative, un verrouillage peut encore être nécessaire dans certains contextes, comme dans asynchronous I/O. Cela signifie que les changements de contexte ne se produisent qu'à des points bien définis : dans ce cas, lorsque remotecall_fetch
est appelé. C'est l'état actuel de l'implémentation et il peut changer pour les futures versions de Julia, car il est destiné à rendre possible l'exécution de jusqu'à N Tasks
sur M Process
, alias M:N Threading. Ensuite, un modèle d'acquisition/libération de verrou pour nextidx
sera nécessaire, car il n'est pas sûr de laisser plusieurs processus lire-écrire une ressource en même temps.
Code Availability and Loading Packages
Votre code doit être disponible sur tout processus qui l'exécute. Par exemple, tapez ce qui suit dans l'invite Julia :
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))))
Stacktrace:
[...]
Le processus 1 connaissait la fonction rand2
, mais le processus 2 ne la connaissait pas.
La plupart du temps, vous chargerez du code à partir de fichiers ou de packages, et vous avez une flexibilité considérable pour contrôler quels processus chargent le code. Considérez un fichier, DummyModule.jl
, contenant le code suivant :
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
Pour faire référence à MyType
dans tous les processus, DummyModule.jl
doit être chargé sur chaque processus. Appeler include("DummyModule.jl")
ne le charge que sur un seul processus. Pour le charger sur chaque processus, utilisez le macro @everywhere
(en démarrant Julia avec julia -p 2
) :
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
Comme d'habitude, cela ne met pas DummyModule
dans le champ d'application de l'un des processus, ce qui nécessite using
ou import
. De plus, lorsque DummyModule
est mis dans le champ d'application d'un processus, il ne l'est pas dans les autres :
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined in `Main`
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
Cependant, il est toujours possible, par exemple, d'envoyer un MyType
à un processus qui a chargé DummyModule
, même s'il n'est pas dans le champ d'application :
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
Un fichier peut également être préchargé sur plusieurs processus au démarrage avec le drapeau -L
, et un script de pilote peut être utilisé pour piloter le calcul :
julia -p <n> -L file1.jl -L file2.jl driver.jl
Le processus Julia exécutant le script du pilote dans l'exemple ci-dessus a un id
égal à 1, tout comme un processus fournissant un invite interactive.
Enfin, si DummyModule.jl
n'est pas un fichier autonome mais un package, alors using DummyModule
va charger DummyModule.jl
sur tous les processus, mais ne l'apportera dans le scope que sur le processus où using
a été appelé.
Starting and managing worker processes
L'installation de base de Julia prend en charge deux types de clusters :
- Un cluster local spécifié avec l'option
-p
comme indiqué ci-dessus. - Un cluster s'étendant sur des machines en utilisant l'option
--machine-file
. Cela utilise une connexionssh
sans mot de passe pour démarrer des processus de travail Julia (à partir du même chemin que l'hôte actuel) sur les machines spécifiées. Chaque définition de machine prend la forme[count*][user@]host[:port] [bind_addr[:port]]
.user
par défaut est l'utilisateur actuel,port
est le port ssh standard.count
est le nombre de travailleurs à lancer sur le nœud, et par défaut, il est de 1. L'optionbind-to bind_addr[:port]
spécifie l'adresse IP et le port que les autres travailleurs doivent utiliser pour se connecter à ce travailleur.
Bien que Julia s'efforce généralement de maintenir la compatibilité ascendante, la distribution de code aux processus de travail repose sur Serialization.serialize
. Comme indiqué dans la documentation correspondante, cela ne peut pas être garanti de fonctionner à travers différentes versions de Julia, il est donc conseillé que tous les travailleurs sur toutes les machines utilisent la même version.
Les fonctions addprocs
, rmprocs
, workers
, et d'autres sont disponibles comme un moyen programmatique d'ajouter, de supprimer et de interroger les processus dans un cluster.
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
Le module Distributed
doit être explicitement chargé sur le processus maître avant d'invoquer addprocs
. Il est automatiquement mis à disposition sur les processus de travail.
Notez que les travailleurs ne exécutent pas de script de démarrage ~/.julia/config/startup.jl
, ni ne synchronisent leur état global (comme les options de ligne de commande, les variables globales, les définitions de nouvelles méthodes et les modules chargés) avec les autres processus en cours d'exécution. Vous pouvez utiliser addprocs(exeflags="--project")
pour initialiser un travailleur avec un environnement particulier, puis @everywhere using <modulename>
ou @everywhere include("file.jl")
.
D'autres types de clusters peuvent être pris en charge en écrivant votre propre ClusterManager
, comme décrit ci-dessous dans la section ClusterManagers.
Data Movement
L'envoi de messages et le transfert de données constituent la majeure partie de la surcharge dans un programme distribué. Réduire le nombre de messages et la quantité de données envoyées est essentiel pour atteindre des performances et une évolutivité optimales. Dans ce but, il est important de comprendre le mouvement des données effectué par les différents constructs de programmation distribuée de Julia.
fetch
peut être considéré comme une opération de mouvement de données explicite, car il demande directement qu'un objet soit déplacé vers la machine locale. @spawnat
(et quelques constructions connexes) déplace également des données, mais ce n'est pas aussi évident, donc on peut l'appeler une opération de mouvement de données implicite. Considérez ces deux approches pour construire et élever au carré une matrice aléatoire :
Méthode 1 :
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
Méthode 2 :
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
La différence semble triviale, mais en fait elle est assez significative en raison du comportement de @spawnat
. Dans la première méthode, une matrice aléatoire est construite localement, puis envoyée à un autre processus où elle est mise au carré. Dans la deuxième méthode, une matrice aléatoire est à la fois construite et mise au carré sur un autre processus. Par conséquent, la deuxième méthode envoie beaucoup moins de données que la première.
Dans cet exemple simplifié, les deux méthodes sont faciles à distinguer et à choisir. Cependant, dans un programme réel, la conception du mouvement des données peut nécessiter plus de réflexion et probablement quelques mesures. Par exemple, si le premier processus a besoin de la matrice A
, alors la première méthode pourrait être meilleure. Ou, si le calcul de A
est coûteux et que seul le processus actuel l'a, alors le déplacer vers un autre processus pourrait être inévitable. Ou, si le processus actuel a très peu à faire entre le @spawnat
et fetch(Bref)
, il pourrait être préférable d'éliminer complètement le parallélisme. Ou imaginez que rand(1000,1000)
soit remplacé par une opération plus coûteuse. Dans ce cas, il pourrait être judicieux d'ajouter une autre instruction 4d61726b646f776e2e436f64652822222c202240737061776e61742229_40726566
juste pour cette étape.
Global variables
Les expressions exécutées à distance via @spawnat
, ou les fermetures spécifiées pour une exécution à distance en utilisant remotecall
peuvent faire référence à des variables globales. Les liaisons globales sous le module Main
sont traitées un peu différemment par rapport aux liaisons globales dans d'autres modules. Considérez le code suivant :
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
Dans ce cas, sum
DOIT être défini dans le processus distant. Notez que A
est une variable globale définie dans l'espace de travail local. Le travailleur 2 n'a pas de variable appelée A
sous Main
. L'acte d'expédier la fermeture ()->sum(A)
au travailleur 2 entraîne la définition de Main.A
sur 2. Main.A
continue d'exister sur le travailleur 2 même après que l'appel remotecall_fetch
retourne. Les appels distants avec des références globales intégrées (sous le module Main
uniquement) gèrent les globals comme suit :
De nouveaux liens globaux sont créés sur les travailleurs de destination s'ils sont référencés dans le cadre d'un appel à distance.
Les constantes globales sont déclarées comme constantes sur les nœuds distants également.
Les globals sont renvoyés à un travailleur de destination uniquement dans le contexte d'un appel à distance, et seulement si leur valeur a changé. De plus, le cluster ne synchronise pas les liaisons globales entre les nœuds. Par exemple :
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # worker 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # worker 3 A = nothing
L'exécution du snippet ci-dessus entraîne que
Main.A
sur le worker 2 a une valeur différente deMain.A
sur le worker 3, tandis que la valeur deMain.A
sur le nœud 1 est définie surnothing
.
Comme vous l'avez peut-être réalisé, bien que la mémoire associée aux variables globales puisse être collectée lorsqu'elles sont réaffectées sur le maître, aucune action de ce type n'est effectuée sur les travailleurs car les liaisons continuent d'être valides. clear!
peut être utilisé pour réaffecter manuellement des variables globales spécifiques sur des nœuds distants à nothing
une fois qu'elles ne sont plus nécessaires. Cela libérera toute mémoire qui leur est associée dans le cadre d'un cycle de collecte des ordures régulier.
Ainsi, les programmes doivent être prudents lorsqu'ils font référence à des variables globales dans des appels distants. En fait, il est préférable de les éviter complètement si possible. Si vous devez faire référence à des variables globales, envisagez d'utiliser des blocs let
pour localiser les variables globales.
Pour exemple :
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
Comme on peut le voir, la variable globale A
est définie sur le travailleur 2, mais B
est capturée en tant que variable locale et donc un lien pour B
n'existe pas sur le travailleur 2.
Parallel Map and Loops
Heureusement, de nombreux calculs parallèles utiles ne nécessitent pas de déplacement de données. Un exemple courant est une simulation de Monte Carlo, où plusieurs processus peuvent gérer des essais de simulation indépendants simultanément. Nous pouvons utiliser @spawnat
pour lancer des pièces sur deux processus. Tout d'abord, écrivez la fonction suivante dans count_heads.jl
:
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
La fonction count_heads
additionne simplement n
bits aléatoires. Voici comment nous pouvons effectuer quelques essais sur deux machines et additionner les résultats :
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
Cet exemple démontre un modèle de programmation parallèle puissant et souvent utilisé. De nombreuses itérations s'exécutent indépendamment sur plusieurs processus, puis leurs résultats sont combinés à l'aide d'une fonction. Le processus de combinaison est appelé réduction, car il réduit généralement le rang du tenseur : un vecteur de nombres est réduit à un seul nombre, ou une matrice est réduite à une seule ligne ou colonne, etc. Dans le code, cela ressemble généralement au modèle x = f(x,v[i])
, où x
est l'accumulateur, f
est la fonction de réduction, et les v[i]
sont les éléments à réduire. Il est souhaitable que f
soit associative, de sorte que l'ordre dans lequel les opérations sont effectuées n'ait pas d'importance.
Remarquez que notre utilisation de ce modèle avec count_heads
peut être généralisée. Nous avons utilisé deux déclarations explicites @spawnat
, ce qui limite le parallélisme à deux processus. Pour exécuter sur n'importe quel nombre de processus, nous pouvons utiliser une boucle for parallèle, fonctionnant en mémoire distribuée, qui peut être écrite en Julia en utilisant @distributed
comme ceci :
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
Cette construction implémente le modèle d'attribution d'itérations à plusieurs processus et de combinaison de celles-ci avec une réduction spécifiée (dans ce cas, (+)
). Le résultat de chaque itération est pris comme la valeur de la dernière expression à l'intérieur de la boucle. L'ensemble de l'expression de boucle parallèle elle-même évalue à la réponse finale.
Notez que bien que les boucles for parallèles ressemblent à des boucles for sérielles, leur comportement est radicalement différent. En particulier, les itérations ne se déroulent pas dans un ordre spécifié, et les écritures dans des variables ou des tableaux ne seront pas visibles globalement puisque les itérations s'exécutent sur différents processus. Toutes les variables utilisées à l'intérieur de la boucle parallèle seront copiées et diffusées à chaque processus.
Par exemple, le code suivant ne fonctionnera pas comme prévu :
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
Ce code n'initialisera pas tout a
, car chaque processus aura une copie séparée de celui-ci. Les boucles parallèles comme celles-ci doivent être évitées. Heureusement, Shared Arrays peut être utilisé pour contourner cette limitation :
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
Utiliser des variables "externes" dans des boucles parallèles est tout à fait raisonnable si les variables sont en lecture seule :
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
Ici, chaque itération applique f
à un échantillon choisi au hasard d'un vecteur a
partagé par tous les processus.
Comme vous pouvez le voir, l'opérateur de réduction peut être omis s'il n'est pas nécessaire. Dans ce cas, la boucle s'exécute de manière asynchrone, c'est-à-dire qu'elle génère des tâches indépendantes sur tous les travailleurs disponibles et renvoie immédiatement un tableau de Future
sans attendre la fin. L'appelant peut attendre les complétions de 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
à un moment ultérieur en appelant fetch
sur celles-ci, ou attendre la fin à la fin de la boucle en le préfixant avec @sync
, comme @sync @distributed for
.
Dans certains cas, aucun opérateur de réduction n'est nécessaire, et nous souhaitons simplement appliquer une fonction à tous les entiers dans une certaine plage (ou, plus généralement, à tous les éléments d'une collection). C'est une autre opération utile appelée carte parallèle, implémentée en Julia sous la forme de la fonction pmap
. Par exemple, nous pourrions calculer les valeurs singulières de plusieurs grandes matrices aléatoires en parallèle comme suit :
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Le pmap
de Julia est conçu pour les cas où chaque appel de fonction effectue une grande quantité de travail. En revanche, @distributed for
peut gérer des situations où chaque itération est minuscule, peut-être simplement la somme de deux nombres. Seuls les processus de travail sont utilisés par 4d61726b646f776e2e436f64652822222c2022706d61702229_40726566
et @distributed for
pour le calcul parallèle. Dans le cas de @distributed for
, la réduction finale est effectuée sur le processus appelant.
Remote References and AbstractChannels
Les références distantes font toujours référence à une implémentation d'un AbstractChannel
.
Une implémentation concrète d'un AbstractChannel
(comme Channel
), doit implémenter put!
, take!
, fetch
, isready
et wait
. L'objet distant référencé par un Future
est stocké dans un Channel{Any}(1)
, c'est-à-dire un Channel
de taille 1 capable de contenir des objets de type Any
.
RemoteChannel
, qui est réécrivable, peut pointer vers tout type et taille de canaux, ou toute autre implémentation d'un AbstractChannel
.
Le constructeur RemoteChannel(f::Function, pid)()
nous permet de construire des références à des canaux contenant plus d'une valeur d'un type spécifique. f
est une fonction exécutée sur pid
et elle doit retourner un AbstractChannel
.
Par exemple, RemoteChannel(()->Channel{Int}(10), pid)
, renverra une référence à un canal de type Int
et de taille 10. Le canal existe sur le travailleur pid
.
Les méthodes put!
, take!
, fetch
, isready
et wait
sur un RemoteChannel
sont proxy sur le magasin de données dans le processus distant.
RemoteChannel
peut donc être utilisé pour faire référence aux objets AbstractChannel
implémentés par l'utilisateur. Un exemple simple de cela est le DictChannel
suivant qui utilise un dictionnaire comme son stockage distant :
julia> struct DictChannel{T} <: AbstractChannel{T}
d::Dict
cond_take::Threads.Condition # waiting for data to become available
DictChannel{T}() where {T} = new(Dict(), Threads.Condition())
DictChannel() = DictChannel{Any}()
end
julia> begin
function Base.put!(D::DictChannel, k, v)
@lock D.cond_take begin
D.d[k] = v
notify(D.cond_take)
end
return D
end
function Base.take!(D::DictChannel, k)
@lock D.cond_take begin
v = fetch(D, k)
delete!(D.d, k)
return v
end
end
Base.isready(D::DictChannel) = @lock D.cond_take !isempty(D.d)
Base.isready(D::DictChannel, k) = @lock D.cond_take haskey(D.d, k)
function Base.fetch(D::DictChannel, k)
@lock D.cond_take begin
wait(D, k)
return D.d[k]
end
end
function Base.wait(D::DictChannel, k)
@lock D.cond_take begin
while !isready(D, k)
wait(D.cond_take)
end
end
end
end;
julia> d = DictChannel();
julia> isready(d)
false
julia> put!(d, :k, :v);
julia> isready(d, :k)
true
julia> fetch(d, :k)
:v
julia> wait(d, :k)
julia> take!(d, :k)
:v
julia> isready(d, :k)
false
Channels and RemoteChannels
- Un
Channel
est local à un processus. Le travailleur 2 ne peut pas se référer directement à un4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
sur le travailleur 3 et vice-versa. UnRemoteChannel
, cependant, peut mettre et prendre des valeurs entre les travailleurs. - Un
RemoteChannel
peut être considéré comme un handle pour unChannel
. - L'identifiant de processus,
pid
, associé à unRemoteChannel
identifie le processus où le magasin de sauvegarde, c'est-à-dire leChannel
existe. - Tout processus faisant référence à un
RemoteChannel
peut mettre et prendre des éléments du canal. Les données sont automatiquement envoyées à (ou récupérées depuis) le processus auquel un4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566
est associé. - La sérialisation d'un
Channel
sérialise également toutes les données présentes dans le canal. La désérialisation en fait donc effectivement une copie de l'objet original. - D'autre part, la sérialisation d'un
RemoteChannel
implique uniquement la sérialisation d'un identifiant qui identifie l'emplacement et l'instance deChannel
référencé par le handle. Un objet4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566
désérialisé (sur n'importe quel worker) pointe donc également vers le même magasin de données que l'original.
L'exemple de canaux ci-dessus peut être modifié pour la communication interprocessus, comme indiqué ci-dessous.
Nous démarrons 4 travailleurs pour traiter un seul canal distant jobs
. Les travaux, identifiés par un id (job_id
), sont écrits dans le canal. Chaque tâche exécutée à distance dans cette simulation lit un job_id
, attend un temps aléatoire et écrit un tuple de job_id
, temps pris et son propre pid
dans le canal des résultats. Enfin, tous les results
sont imprimés dans le processus maître.
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
Remote References and Distributed Garbage Collection
Les objets référencés par des références distantes ne peuvent être libérés que lorsque toutes les références détenues dans le cluster sont supprimées.
Le nœud où la valeur est stockée garde une trace de quels travailleurs ont une référence à celle-ci. Chaque fois qu'un RemoteChannel
ou un Future
(non récupéré) est sérialisé vers un travailleur, le nœud pointé par la référence est notifié. Et chaque fois qu'un 4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566
ou un 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
(non récupéré) est collecté comme déchet localement, le nœud possédant la valeur est à nouveau notifié. Cela est implémenté dans un sérialiseur interne conscient du cluster. Les références distantes ne sont valides que dans le contexte d'un cluster en cours d'exécution. La sérialisation et la désérialisation de références vers et depuis des objets IO
réguliers ne sont pas prises en charge.
Les notifications se font par l'envoi de messages de "suivi" – un message "ajouter référence" lorsqu'une référence est sérialisée vers un autre processus et un message "supprimer référence" lorsqu'une référence est collectée localement par le ramasse-miettes.
Puisque Future
sont en écriture unique et mis en cache localement, l'acte de fetch
un 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
met également à jour les informations de suivi de référence sur le nœud possédant la valeur.
Le nœud qui possède la valeur la libère une fois que toutes les références à celle-ci sont supprimées.
Avec Future
, la sérialisation d'un 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
déjà récupéré vers un nœud différent envoie également la valeur, car le magasin distant d'origine peut avoir collecté la valeur d'ici là.
Il est important de noter que quand un objet est collecté par le ramasse-miettes localement dépend de la taille de l'objet et de la pression mémoire actuelle dans le système.
En cas de références distantes, la taille de l'objet de référence local est assez petite, tandis que la valeur stockée sur le nœud distant peut être assez grande. Étant donné que l'objet local peut ne pas être collecté immédiatement, il est bon de pratiquer d'appeler explicitement finalize
sur les instances locales d'un RemoteChannel
, ou sur des Future
non récupérées. Étant donné que l'appel de fetch
sur un 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
supprime également sa référence du magasin distant, cela n'est pas nécessaire sur les 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
récupérés. L'appel explicite de 4d61726b646f776e2e436f64652822222c202266696e616c697a652229_40726566
entraîne l'envoi immédiat d'un message au nœud distant pour procéder à la suppression de sa référence à la valeur.
Une fois finalisée, une référence devient invalide et ne peut plus être utilisée dans d'autres appels.
Local invocations
Les données doivent nécessairement être copiées sur le nœud distant pour exécution. C'est le cas à la fois pour les appels distants et lorsque les données sont stockées dans un RemoteChannel
/ Future
sur un nœud différent. Comme prévu, cela entraîne une copie des objets sérialisés sur le nœud distant. Cependant, lorsque le nœud de destination est le nœud local, c'est-à-dire que l'identifiant du processus appelant est le même que l'identifiant du nœud distant, il est exécuté comme un appel local. Il est généralement (mais pas toujours) exécuté dans une tâche différente - mais il n'y a pas de sérialisation/désérialisation des données. Par conséquent, l'appel fait référence aux mêmes instances d'objet que celles passées - aucune copie n'est créée. Ce comportement est mis en évidence ci-dessous :
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel created on local node
julia> v = [0];
julia> for i in 1:3
v[1] = i # Reusing `v`
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel created on remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
Comme on peut le voir, put!
sur un RemoteChannel
appartenant localement avec le même objet v
modifié entre les appels résulte en la même instance d'objet unique stockée. Contrairement à des copies de v
étant créées lorsque le nœud possédant rc
est un nœud différent.
Il convient de noter que cela n'est généralement pas un problème. C'est quelque chose à prendre en compte uniquement si l'objet est à la fois stocké localement et modifié après l'appel. Dans de tels cas, il peut être approprié de stocker un deepcopy
de l'objet.
Ceci est également vrai pour les appels distants sur le nœud local comme le montre l'exemple suivant :
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Executed on local node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
Comme on peut le voir une fois de plus, un appel distant sur le nœud local se comporte exactement comme une invocation directe. L'appel modifie les objets locaux passés en arguments. Dans l'invocation distante, il opère sur une copie des arguments.
Pour répéter, en général, ce n'est pas un problème. Si le nœud local est également utilisé comme nœud de calcul, et que les arguments utilisés après l'appel, ce comportement doit être pris en compte et, si nécessaire, des copies profondes des arguments doivent être transmises à l'appel invoqué sur le nœud local. Les appels sur des nœuds distants fonctionneront toujours sur des copies des arguments.
Shared Arrays
Les tableaux partagés utilisent la mémoire partagée du système pour mapper le même tableau à travers plusieurs processus. Un SharedArray
est un bon choix lorsque vous souhaitez avoir une grande quantité de données accessibles conjointement à deux processus ou plus sur la même machine. Le support des tableaux partagés est disponible via le module SharedArrays
, qui doit être explicitement chargé sur tous les travailleurs participants.
Une structure de données complémentaire est fournie par le package externe DistributedArrays.jl
sous la forme d'un DArray
. Bien qu'il existe certaines similitudes avec un SharedArray
, le comportement d'un DArray
est assez différent. Dans un 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566
, chaque processus "participant" a accès à l'ensemble du tableau ; en revanche, dans un 4d61726b646f776e2e436f64652822222c20224441727261792229_68747470733a2f2f6769746875622e636f6d2f4a756c6961506172616c6c656c2f44697374726962757465644172726179732e6a6c
, chaque processus a un accès local à juste un morceau des données, et aucun des deux processus ne partage le même morceau.
SharedArray
l'indexation (attribution et accès aux valeurs) fonctionne exactement comme avec des tableaux réguliers, et est efficace car la mémoire sous-jacente est disponible pour le processus local. Par conséquent, la plupart des algorithmes fonctionnent naturellement sur 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566
, bien que dans un mode à processus unique. Dans les cas où un algorithme insiste sur une entrée Array
, le tableau sous-jacent peut être récupéré d'un 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566
en appelant sdata
. Pour d'autres types AbstractArray
, 4d61726b646f776e2e436f64652822222c202273646174612229_40726566
renvoie simplement l'objet lui-même, il est donc sûr d'utiliser 4d61726b646f776e2e436f64652822222c202273646174612229_40726566
sur tout objet de type Array
.
Le constructeur pour un tableau partagé est de la forme :
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
qui crée un tableau partagé N
-dimensionnel de type T
de bits et de taille dims
à travers les processus spécifiés par pids
. Contrairement aux tableaux distribués, un tableau partagé n'est accessible que par les travailleurs participants spécifiés par l'argument nommé pids
(et le processus créateur également, s'il est sur le même hôte). Notez que seuls les éléments qui sont isbits
sont pris en charge dans un SharedArray.
Si une fonction init
, de signature initfn(S::SharedArray)
, est spécifiée, elle est appelée sur tous les travailleurs participants. Vous pouvez spécifier que chaque travailleur exécute la fonction init
sur une portion distincte du tableau, parallélisant ainsi l'initialisation.
Voici un bref exemple :
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
fournit des plages d'indices unidimensionnelles disjointes, et il est parfois pratique de diviser les tâches entre les processus. Vous pouvez, bien sûr, diviser le travail comme vous le souhaitez :
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
Puisque tous les processus ont accès aux données sous-jacentes, vous devez faire attention à ne pas créer de conflits. Par exemple :
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
cela entraînerait un comportement indéfini. Parce que chaque processus remplit le total du tableau avec son propre pid
, quel que soit le processus qui s'exécute en dernier (pour un élément particulier de S
), son pid
sera conservé.
En tant qu'exemple plus étendu et complexe, envisagez d'exécuter le "noyau" suivant en parallèle :
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
Dans ce cas, si nous essayons de diviser le travail en utilisant un index unidimensionnel, nous risquons de rencontrer des problèmes : si q[i,j,t]
est proche de la fin du bloc attribué à un travailleur et q[i,j,t+1]
est proche du début du bloc attribué à un autre, il est très probable que q[i,j,t]
ne soit pas prêt au moment où il est nécessaire pour le calcul de q[i,j,t+1]
. Dans de tels cas, il est préférable de diviser manuellement le tableau. Divisons le long de la deuxième dimension. Définissez une fonction qui renvoie les indices (irange, jrange)
attribués à ce travailleur :
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
Ensuite, définissez le noyau :
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
Nous définissons également un wrapper de commodité pour une implémentation de SharedArray
.
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
Maintenant, comparons trois versions différentes, dont une qui s'exécute dans un seul processus :
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
un qui utilise @distributed
:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
et un qui délègue par morceaux :
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
Si nous créons des SharedArray
s et chronométrons ces fonctions, nous obtenons les résultats suivants (avec julia -p 4
) :
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
Exécutez les fonctions une fois pour les compiler en JIT et @time
les lors de la deuxième exécution :
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
Le plus grand avantage de advection_shared!
est qu'il minimise le trafic entre les travailleurs, permettant à chacun de calculer pendant une période prolongée sur la partie assignée.
Shared Arrays and Distributed Garbage Collection
Comme les références distantes, les tableaux partagés dépendent également de la collecte des ordures sur le nœud créateur pour libérer les références de tous les travailleurs participants. Le code qui crée de nombreux objets de tableau partagé à courte durée de vie bénéficierait d'une finalisation explicite de ces objets dès que possible. Cela permet de libérer plus tôt à la fois la mémoire et les descripteurs de fichiers mappant le segment partagé.
ClusterManagers
Le lancement, la gestion et le mise en réseau des processus Julia dans un cluster logique se font via des gestionnaires de cluster. Un ClusterManager
est responsable de
- lancement de processus de travail dans un environnement de cluster
- gérer les événements pendant la durée de vie de chaque travailleur
- optionnellement, fournissant le transport de données
Un cluster Julia a les caractéristiques suivantes :
- Le processus Julia initial, également appelé
maître
, est spécial et a unid
de 1. - Seul le processus
master
peut ajouter ou supprimer des processus de travail. - Tous les processus peuvent communiquer directement entre eux.
Les connexions entre les travailleurs (en utilisant le transport TCP/IP intégré) sont établies de la manière suivante :
addprocs
est appelé sur le processus maître avec un objetClusterManager
.addprocs
appelle la méthode appropriéelaunch
qui génère le nombre requis de processus de travail sur les machines appropriées.- Chaque travailleur commence à écouter sur un port libre et écrit ses informations d'hôte et de port dans
stdout
. - Le gestionnaire de cluster capture le
stdout
de chaque travailleur et le rend disponible au processus maître. - Le processus maître analyse ces informations et établit des connexions TCP/IP avec chaque travailleur.
- Chaque travailleur est également informé des autres travailleurs dans le cluster.
- Chaque travailleur se connecte à tous les travailleurs dont l'
id
est inférieur à son propreid
. - De cette manière, un réseau maillé est établi, dans lequel chaque travailleur est directement connecté à chaque autre travailleur.
Bien que la couche de transport par défaut utilise TCPSocket
en clair, il est possible pour un cluster Julia de fournir son propre transport.
Julia fournit deux gestionnaires de clusters intégrés :
LocalManager
, utilisé lorsqueaddprocs()
ouaddprocs(np::Integer)
sont appelésSSHManager
, utilisé lorsqueaddprocs(hostnames::Array)
est appelé avec une liste de noms d'hôtes.
LocalManager
est utilisé pour lancer des travailleurs supplémentaires sur le même hôte, tirant ainsi parti du matériel multi-cœur et multi-processeur.
Ainsi, un gestionnaire de cluster minimal devrait :
- être un sous-type de l'abstrait
ClusterManager
- implémenter
launch
, une méthode responsable du lancement de nouveaux travailleurs - implémenter
manage
, qui est appelé à divers événements au cours de la vie d'un travailleur (par exemple, l'envoi d'un signal d'interruption)
addprocs(manager::FooManager)
nécessite que FooManager
implémente :
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
En tant qu'exemple, voyons comment le LocalManager
, le gestionnaire responsable du démarrage des travailleurs sur le même hôte, est implémenté :
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
La méthode launch
prend les arguments suivants :
manager::ClusterManager
: le gestionnaire de cluster qui est appelé avecaddprocs
params::Dict
: tous les arguments de mot-clé passés àaddprocs
launched::Array
: le tableau auquel ajouter un ou plusieurs objetsWorkerConfig
c::Condition
: la variable de condition à notifier lorsque des travailleurs sont lancés.
La méthode launch
est appelée de manière asynchrone dans une tâche séparée. La terminaison de cette tâche signale que tous les travailleurs demandés ont été lancés. Par conséquent, la fonction 4d61726b646f776e2e436f64652822222c20226c61756e63682229_40726566
DOIT se terminer dès que tous les travailleurs demandés ont été lancés.
Les travailleurs nouvellement lancés sont connectés les uns aux autres et au processus maître de manière tous à tous. La spécification de l'argument de ligne de commande --worker[=<cookie>]
entraîne l'initialisation des processus lancés en tant que travailleurs et l'établissement de connexions via des sockets TCP/IP.
Tous les travailleurs dans un cluster partagent le même cookie que le maître. Lorsque le cookie n'est pas spécifié, c'est-à-dire avec l'option --worker
, le travailleur essaie de le lire à partir de son entrée standard. LocalManager
et SSHManager
transmettent tous deux le cookie aux travailleurs nouvellement lancés via leurs entrées standard.
Par défaut, un worker écoutera sur un port libre à l'adresse retournée par un appel à getipaddr()
. Une adresse spécifique sur laquelle écouter peut être spécifiée par l'argument optionnel --bind-to bind_addr[:port]
. Cela est utile pour les hôtes multi-homés.
En tant qu'exemple d'un transport non-TCP/IP, une implémentation peut choisir d'utiliser MPI, auquel cas --worker
ne doit PAS être spécifié. Au lieu de cela, les travailleurs nouvellement lancés doivent appeler init_worker(cookie)
avant d'utiliser l'un des constructs parallèles.
Pour chaque travailleur lancé, la méthode launch
doit ajouter un objet WorkerConfig
(avec les champs appropriés initialisés) à launched
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
La plupart des champs dans WorkerConfig
sont utilisés par les gestionnaires intégrés. Les gestionnaires de cluster personnalisés spécifieraient généralement uniquement io
ou host
/ port
:
Si
io
est spécifié, il est utilisé pour lire les informations d'hôte/port. Un travailleur Julia imprime son adresse de liaison et son port au démarrage. Cela permet aux travailleurs Julia d'écouter sur n'importe quel port libre disponible au lieu d'exiger que les ports des travailleurs soient configurés manuellement.Si
io
n'est pas spécifié,host
etport
sont utilisés pour se connecter.count
,exename
etexeflags
sont pertinents pour lancer des travailleurs supplémentaires à partir d'un travailleur. Par exemple, un gestionnaire de cluster peut lancer un seul travailleur par nœud et l'utiliser pour lancer des travailleurs supplémentaires.count
avec une valeur entièren
lancera un total den
travailleurs.count
avec une valeur de:auto
lancera autant de travailleurs que le nombre de threads CPU (cœurs logiques) sur cette machine.exename
est le nom de l'exécutablejulia
, y compris le chemin complet.exeflags
doit être défini sur les arguments de ligne de commande requis pour les nouveaux travailleurs.
tunnel
,bind_addr
,sshflags
etmax_parallel
sont utilisés lorsqu'un tunnel ssh est nécessaire pour se connecter aux travailleurs depuis le processus maître.userdata
est fourni pour que les gestionnaires de clusters personnalisés puissent stocker leurs propres informations spécifiques aux travailleurs.
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
est appelé à différents moments au cours de la vie du travailleur avec des valeurs op
appropriées :
- avec
:register
/:deregister
lorsque un travailleur est ajouté / retiré du pool de travailleurs Julia. - avec
:interrupt
lorsqueinterrupt(workers)
est appelé. LeClusterManager
doit signaler le travailleur approprié avec un signal d'interruption. - avec
:finalize
à des fins de nettoyage.
Cluster Managers with Custom Transports
Remplacer les connexions de socket TCP/IP par défaut par une couche de transport personnalisée est un peu plus complexe. Chaque processus Julia a autant de tâches de communication que de travailleurs auxquels il est connecté. Par exemple, considérons un cluster Julia de 32 processus dans un réseau maillé tout-à-tous :
- Chaque processus Julia a donc 31 tâches de communication.
- Chaque tâche gère tous les messages entrants d'un seul travailleur à distance dans une boucle de traitement des messages.
- La boucle de traitement des messages attend sur un objet
IO
(par exemple, unTCPSocket
dans l'implémentation par défaut), lit un message entier, le traite et attend le suivant. - L'envoi de messages à un processus se fait directement depuis n'importe quelle tâche Julia – pas seulement les tâches de communication – encore une fois, via l'objet
IO
approprié.
Remplacer le transport par défaut nécessite que la nouvelle implémentation établisse des connexions avec des travailleurs distants et fournisse des objets IO
appropriés sur lesquels les boucles de traitement des messages peuvent attendre. Les rappels spécifiques au gestionnaire à mettre en œuvre sont :
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
L'implémentation par défaut (qui utilise des sockets TCP/IP) est implémentée comme connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
.
connect
doit renvoyer une paire d'objets IO
, l'un pour lire les données envoyées par le pid
du travailleur, et l'autre pour écrire les données qui doivent être envoyées au pid
du travailleur. Les gestionnaires de cluster personnalisés peuvent utiliser un BufferStream
en mémoire comme tuyauterie pour faire passer les données entre le transport personnalisé, éventuellement non IO
, et l'infrastructure parallèle intégrée de Julia.
Un BufferStream
est un IOBuffer
en mémoire qui se comporte comme un IO
– c'est un flux qui peut être géré de manière asynchrone.
Le dossier clustermanager/0mq
dans le Examples repository contient un exemple d'utilisation de ZeroMQ pour connecter des travailleurs Julia dans une topologie en étoile avec un courtier 0MQ au milieu. Remarque : Les processus Julia sont toujours tous logiquement connectés les uns aux autres – tout travailleur peut envoyer un message à tout autre travailleur directement sans avoir conscience que 0MQ est utilisé comme couche de transport.
Lors de l'utilisation de transports personnalisés :
- Les travailleurs Julia ne doivent PAS être démarrés avec
--worker
. Démarrer avec--worker
entraînera le fait que les nouveaux travailleurs lancés utiliseront par défaut l'implémentation de transport par socket TCP/IP. - Pour chaque connexion logique entrante avec un travailleur,
Base.process_messages(rd::IO, wr::IO)()
doit être appelé. Cela lance une nouvelle tâche qui gère la lecture et l'écriture de messages depuis/vers le travailleur représenté par les objetsIO
. init_worker(cookie, manager::FooManager)
doit être appelé dans le cadre de l'initialisation du processus de travail.- Le champ
connect_at::Any
dansWorkerConfig
peut être défini par le gestionnaire de cluster lorsquelaunch
est appelé. La valeur de ce champ est transmise dans tous les rappelsconnect
. En général, il contient des informations sur comment se connecter à un travailleur. Par exemple, le transport par socket TCP/IP utilise ce champ pour spécifier le tuple(hôte, port)
auquel se connecter à un travailleur.
kill(manager, pid, config)
est appelé pour retirer un travailleur du cluster. Dans le processus maître, les objets IO
correspondants doivent être fermés par l'implémentation pour garantir un nettoyage approprié. L'implémentation par défaut exécute simplement un appel exit()
sur le travailleur distant spécifié.
Le dossier Exemples clustermanager/simple
est un exemple qui montre une implémentation simple utilisant des sockets de domaine UNIX pour la configuration du cluster.
Network Requirements for LocalManager and SSHManager
Les clusters Julia sont conçus pour être exécutés dans des environnements déjà sécurisés sur des infrastructures telles que des ordinateurs portables locaux, des clusters départementaux ou même le cloud. Cette section couvre les exigences de sécurité réseau pour le LocalManager
et le SSHManager
intégrés :
Le processus maître n'écoute sur aucun port. Il se connecte uniquement aux travailleurs.
Chaque travailleur se lie à l'une des interfaces locales et écoute sur un numéro de port éphémère attribué par le système d'exploitation.
LocalManager
, utilisé paraddprocs(N)
, lie par défaut uniquement à l'interface de boucle locale. Cela signifie que les travailleurs démarrés plus tard sur des hôtes distants (ou par quiconque ayant de mauvaises intentions) ne peuvent pas se connecter au cluster. Unaddprocs(4)
suivi d'unaddprocs(["remote_host"])
échouera. Certains utilisateurs peuvent avoir besoin de créer un cluster comprenant leur système local et quelques systèmes distants. Cela peut être fait en demandant explicitement àLocalManager
de se lier à une interface réseau externe via l'argument clérestrict
:addprocs(4; restrict=false)
.SSHManager
, utilisé paraddprocs(list_of_remote_hosts)
, lance des travailleurs sur des hôtes distants via SSH. Par défaut, SSH est uniquement utilisé pour lancer des travailleurs Julia. Les connexions ultérieures entre le maître et les travailleurs, ainsi qu'entre les travailleurs, utilisent des sockets TCP/IP non chiffrés. Les hôtes distants doivent avoir l'authentification sans mot de passe activée. Des options ou des identifiants SSH supplémentaires peuvent être spécifiés via l'argument clésshflags
.addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
est utile lorsque nous souhaitons utiliser des connexions SSH pour le maître-travailleur également. Un scénario typique pour cela est un ordinateur portable local exécutant le REPL Julia (c'est-à-dire, le maître) avec le reste du cluster dans le cloud, par exemple sur Amazon EC2. Dans ce cas, seul le port 22 doit être ouvert sur le cluster distant associé à un client SSH authentifié via une infrastructure à clé publique (PKI). Les informations d'identification d'authentification peuvent être fournies viasshflags
, par exemplesshflags=`-i <keyfile>`
.Dans une topologie tous-à-tous (par défaut), tous les travailleurs se connectent entre eux via des sockets TCP simples. La politique de sécurité sur les nœuds du cluster doit donc garantir une connectivité libre entre les travailleurs pour la plage de ports éphémères (varie selon le système d'exploitation).
Sécuriser et chiffrer tout le trafic entre travailleurs (via SSH) ou chiffrer des messages individuels peut être réalisé via un
ClusterManager
personnalisé.Si vous spécifiez
multiplex=true
comme option àaddprocs
, le multiplexage SSH est utilisé pour créer un tunnel entre le maître et les travailleurs. Si vous avez configuré le multiplexage SSH vous-même et que la connexion a déjà été établie, le multiplexage SSH est utilisé indépendamment de l'optionmultiplex
. Si le multiplexage est activé, le transfert est configuré en utilisant la connexion existante (option-O forward
dans ssh). Cela est bénéfique si vos serveurs nécessitent une authentification par mot de passe ; vous pouvez éviter l'authentification dans Julia en vous connectant au serveur avant4d61726b646f776e2e436f64652822222c202261646470726f63732229_40726566
. Le socket de contrôle sera situé à~/.ssh/julia-%r@%h:%p
pendant la session, sauf si la connexion de multiplexage existante est utilisée. Notez que la bande passante peut être limitée si vous créez plusieurs processus sur un nœud et activez le multiplexage, car dans ce cas, les processus partagent une seule connexion TCP de multiplexage.
Cluster Cookie
Tous les processus dans un cluster partagent le même cookie qui, par défaut, est une chaîne générée aléatoirement sur le processus maître :
cluster_cookie()
renvoie le cookie, tandis quecluster_cookie(cookie)()
le définit et renvoie le nouveau cookie.- Toutes les connexions sont authentifiées des deux côtés pour garantir que seuls les travailleurs démarrés par le maître sont autorisés à se connecter les uns aux autres.
- Le cookie peut être passé aux travailleurs au démarrage via l'argument
--worker=<cookie>
. Si l'argument--worker
est spécifié sans le cookie, le travailleur essaie de lire le cookie à partir de son entrée standard (stdin
). L'stdin
est fermé immédiatement après que le cookie a été récupéré. - Les
ClusterManager
s peuvent récupérer le cookie sur le maître en appelantcluster_cookie()
. Les gestionnaires de clusters ne utilisant pas le transport TCP/IP par défaut (et donc ne spécifiant pas--worker
) doivent appelerinit_worker(cookie, manager)
avec le même cookie que sur le maître.
Notez que les environnements nécessitant des niveaux de sécurité plus élevés peuvent mettre en œuvre cela via un ClusterManager
personnalisé. Par exemple, les cookies peuvent être pré-partagés et donc non spécifiés comme un argument de démarrage.
Specifying Network Topology (Experimental)
L'argument clé topology
passé à addprocs
est utilisé pour spécifier comment les travailleurs doivent être connectés les uns aux autres :
:all_to_all
, la valeur par défaut : tous les travailleurs sont connectés les uns aux autres.:master_worker
: seul le processus pilote, c'est-à-direpid
1, a des connexions avec les travailleurs.:custom
: la méthodelaunch
du gestionnaire de cluster spécifie la topologie de connexion via les champsident
etconnect_idents
dansWorkerConfig
. Un travailleur avec une identitéident
fournie par le gestionnaire de cluster se connectera à tous les travailleurs spécifiés dansconnect_idents
.
L'argument clé lazy=true|false
n'affecte que l'option topology
:all_to_all
. Si true
, le cluster commence avec le maître connecté à tous les travailleurs. Des connexions spécifiques entre travailleurs sont établies lors de la première invocation distante entre deux travailleurs. Cela aide à réduire les ressources initiales allouées pour la communication intra-cluster. Les connexions sont configurées en fonction des exigences d'exécution d'un programme parallèle. La valeur par défaut pour lazy
est true
.
Actuellement, l'envoi d'un message entre des travailleurs non connectés entraîne une erreur. Ce comportement, tout comme la fonctionnalité et l'interface, doit être considéré comme expérimental et peut changer dans les futures versions.
Noteworthy external packages
Outside of Julia parallelism there are plenty of external packages that should be mentioned. For example, MPI.jl
is a Julia wrapper for the MPI
protocol, Dagger.jl
provides functionality similar to Python's Dask, and DistributedArrays.jl
provides array operations distributed across workers, as outlined above.
Une mention doit être faite de l'écosystème de programmation GPU de Julia, qui comprend :
CUDA.jl enveloppe les différentes bibliothèques CUDA et prend en charge la compilation des noyaux Julia pour les GPU Nvidia.
oneAPI.jl enveloppe le modèle de programmation unifié oneAPI et prend en charge l'exécution des noyaux Julia sur les accélérateurs pris en charge. Actuellement, seul Linux est pris en charge.
AMDGPU.jl enveloppe les bibliothèques AMD ROCm et prend en charge la compilation des noyaux Julia pour les GPU AMD. Actuellement, seul Linux est pris en charge.
Des bibliothèques de haut niveau comme KernelAbstractions.jl, Tullio.jl et ArrayFire.jl.
Dans l'exemple suivant, nous utiliserons à la fois DistributedArrays.jl
et CUDA.jl
pour distribuer un tableau sur plusieurs processus en le convertissant d'abord via distribute()
et CuArray()
.
Rappelez-vous qu'en important DistributedArrays.jl
, il faut l'importer sur tous les processus en utilisant @everywhere
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CUDA
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
Dans l'exemple suivant, nous utiliserons à la fois DistributedArrays.jl
et CUDA.jl
pour distribuer un tableau sur plusieurs processus et appeler une fonction générique dessus.
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
crée à plusieurs reprises un nouveau vecteur et le normalise. Nous n'avons pas spécifié de signature de type dans la déclaration de fonction, voyons si cela fonctionne avec les types de données mentionnés ci-dessus :
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
Pour mettre fin à cette brève exposition aux packages externes, nous pouvons considérer MPI.jl
, un wrapper Julia du protocole MPI. Comme il serait trop long de considérer chaque fonction interne, il serait préférable d'apprécier simplement l'approche utilisée pour implémenter le protocole.
Considérez ce script de démonstration qui appelle simplement chaque sous-processus, instancie son rang et lorsque le processus maître est atteint, effectue la somme des rangs.
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
- 1In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.