Distributed Computing
Distributed
— ModuleHerramientas para el procesamiento paralelo distribuido.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> Lista de identificadores de procesos
Lanza procesos de trabajo a través del administrador de clústeres especificado.
Por ejemplo, los clústeres Beowulf son compatibles a través de un administrador de clústeres personalizado implementado en el paquete ClusterManagers.jl
.
El número de segundos que un nuevo trabajador espera para el establecimiento de la conexión desde el maestro se puede especificar a través de la variable JULIA_WORKER_TIMEOUT
en el entorno del proceso de trabajo. Relevante solo cuando se utiliza TCP/IP como transporte.
Para lanzar trabajadores sin bloquear el REPL, o la función que contiene si se lanzan trabajadores programáticamente, ejecuta addprocs
en su propia tarea.
Ejemplos
# En clústeres ocupados, llama a `addprocs` de forma asíncrona
t = @async addprocs(...)
# Utiliza trabajadores a medida que se conectan
if nprocs() > 1 # Asegúrate de que al menos un nuevo trabajador esté disponible
.... # realiza ejecución distribuida
end
# Recupera los IDs de los trabajadores recién lanzados, o cualquier mensaje de error
if istaskdone(t) # Verifica si `addprocs` ha completado para asegurar que `fetch` no bloquee
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Lista de identificadores de procesos
Agrega procesos de trabajo en máquinas remotas a través de SSH. La configuración se realiza con argumentos de palabra clave (ver más abajo). En particular, se puede usar la palabra clave exename
para especificar la ruta al binario julia
en la(s) máquina(s) remota(s).
machines
es un vector de "especificaciones de máquina" que se dan como cadenas en la forma [user@]host[:port] [bind_addr[:port]]
. user
se establece de forma predeterminada como el usuario actual y port
como el puerto SSH estándar. Si se especifica [bind_addr[:port]]
, otros trabajadores se conectarán a este trabajador en el bind_addr
y port
especificados.
Es posible lanzar múltiples procesos en un host remoto utilizando una tupla en el vector machines
o la forma (machine_spec, count)
, donde count
es el número de trabajadores que se lanzarán en el host especificado. Pasar :auto
como el conteo de trabajadores lanzará tantos trabajadores como el número de hilos de CPU en el host remoto.
Ejemplos:
addprocs([
"remote1", # un trabajador en 'remote1' iniciando sesión con el nombre de usuario actual
"user@remote2", # un trabajador en 'remote2' iniciando sesión con el nombre de usuario 'user'
"user@remote3:2222", # especificando el puerto SSH a '2222' para 'remote3'
("user@remote4", 4), # lanzar 4 trabajadores en 'remote4'
("user@remote5", :auto), # lanzar tantos trabajadores como hilos de CPU en 'remote5'
])
Argumentos de palabra clave:
tunnel
: sitrue
, se utilizará un túnel SSH para conectarse al trabajador desde el proceso maestro. El valor predeterminado esfalse
.multiplex
: sitrue
, se utiliza multiplexión SSH para el túnel SSH. El valor predeterminado esfalse
.ssh
: el nombre o la ruta del ejecutable del cliente SSH utilizado para iniciar los trabajadores. El valor predeterminado es"ssh"
.sshflags
: especifica opciones ssh adicionales, p. ej.sshflags=`-i /home/foo/bar.pem`
max_parallel
: especifica el número máximo de trabajadores conectados en paralelo en un host. El valor predeterminado es 10.shell
: especifica el tipo de shell al que ssh se conecta en los trabajadores.shell=:posix
: un shell Unix/Linux compatible con POSIX (sh, ksh, bash, dash, zsh, etc.). El valor predeterminado.shell=:csh
: un shell C de Unix (csh, tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: especifica el directorio de trabajo en los trabajadores. El valor predeterminado es el directorio actual del host (como se encuentra conpwd()
)enable_threaded_blas
: sitrue
, entonces BLAS se ejecutará en múltiples hilos en los procesos añadidos. El valor predeterminado esfalse
.exename
: nombre del ejecutablejulia
. El valor predeterminado es"$(Sys.BINDIR)/julia"
o"$(Sys.BINDIR)/julia-debug"
según sea el caso. Se recomienda que se use una versión común de Julia en todas las máquinas remotas porque la serialización y la distribución de código podrían fallar de lo contrario.exeflags
: banderas adicionales pasadas a los procesos de trabajo.topology
: Especifica cómo los trabajadores se conectan entre sí. Enviar un mensaje entre trabajadores no conectados resulta en un error.topology=:all_to_all
: Todos los procesos están conectados entre sí. El valor predeterminado.topology=:master_worker
: Solo el proceso controlador, es decir,pid
1 se conecta a los trabajadores. Los trabajadores no se conectan entre sí.topology=:custom
: El métodolaunch
del administrador de clústeres especifica la topología de conexión a través de los camposident
yconnect_idents
enWorkerConfig
. Un trabajador con una identidad de administrador de clústerident
se conectará a todos los trabajadores especificados enconnect_idents
.
lazy
: Aplicable solo contopology=:all_to_all
. Sitrue
, las conexiones trabajador-trabajador se configuran de manera perezosa, es decir, se configuran en la primera instancia de una llamada remota entre trabajadores. El valor predeterminado es true.env
: proporciona un array de pares de cadenas comoenv=["JULIA_DEPOT_PATH"=>"/depot"]
para solicitar que se establezcan variables de entorno en la máquina remota. Por defecto, solo la variable de entornoJULIA_WORKER_TIMEOUT
se pasa automáticamente del entorno local al remoto.cmdline_cookie
: pasa la cookie de autenticación a través de la opción de línea de comandos--worker
. El comportamiento predeterminado (más seguro) de pasar la cookie a través de ssh stdio puede colgarse con trabajadores de Windows que utilizan versiones anteriores de Julia o Windows (pre-ConPTY), en cuyo casocmdline_cookie=true
ofrece una solución alternativa.
Los argumentos de palabra clave ssh
, shell
, env
y cmdline_cookie
se añadieron en Julia 1.6.
Variables de entorno:
Si el proceso maestro no logra establecer una conexión con un trabajador recién lanzado dentro de 60.0 segundos, el trabajador lo considera una situación fatal y termina. Este tiempo de espera se puede controlar a través de la variable de entorno JULIA_WORKER_TIMEOUT
. El valor de JULIA_WORKER_TIMEOUT
en el proceso maestro especifica el número de segundos que un trabajador recién lanzado espera para establecer la conexión.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Lista de identificadores de procesos
Lanza np
trabajadores en el host local utilizando el LocalManager
incorporado.
Los trabajadores locales heredan el entorno de paquetes actual (es decir, proyecto activo, LOAD_PATH
y DEPOT_PATH
) del proceso principal.
Tenga en cuenta que los trabajadores no ejecutan un script de inicio ~/.julia/config/startup.jl
, ni sincronizan su estado global (como opciones de línea de comandos, variables globales, definiciones de nuevos métodos y módulos cargados) con ninguno de los otros procesos en ejecución.
Argumentos clave:
restrict::Bool
: si estrue
(por defecto) la vinculación está restringida a127.0.0.1
.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: mismo efecto que paraSSHManager
, consulte la documentación paraaddprocs(machines::AbstractVector)
.
La herencia del entorno de paquetes y el argumento clave env
se añadieron en Julia 1.9.
Distributed.nprocs
— Functionnprocs()
Obtiene el número de procesos disponibles.
Ejemplos
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
Obtiene el número de procesos de trabajo disponibles. Esto es uno menos que nprocs()
. Igual a nprocs()
si nprocs() == 1
.
Ejemplos
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
Devuelve una lista de todos los identificadores de proceso, incluyendo el pid 1 (que no está incluido por workers()
).
Ejemplos
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
Devuelve una lista de todos los identificadores de proceso en el mismo nodo físico. Específicamente, se devuelven todos los trabajadores vinculados a la misma dirección IP que pid
.
Distributed.workers
— Functionworkers()
Devuelve una lista de todos los identificadores de procesos de trabajo.
Ejemplos
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
Elimina los trabajadores especificados. Ten en cuenta que solo el proceso 1 puede agregar o eliminar trabajadores.
El argumento waitfor
especifica cuánto tiempo esperar para que los trabajadores se apaguen:
- Si no se especifica,
rmprocs
esperará hasta que todos lospids
solicitados sean eliminados. - Se genera una
ErrorException
si no se pueden terminar todos los trabajadores antes de los segundos solicitados enwaitfor
. - Con un valor de
waitfor
de 0, la llamada devuelve inmediatamente con los trabajadores programados para ser eliminados en una tarea diferente. Se devuelve el objetoTask
programado. El usuario debe llamar await
en la tarea antes de invocar cualquier otra llamada paralela.
Ejemplos
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
Interrumpe la tarea que se está ejecutando actualmente en los trabajadores especificados. Esto es equivalente a presionar Ctrl-C en la máquina local. Si no se dan argumentos, se interrumpen todos los trabajadores.
interrupt(pids::AbstractVector=workers())
Interrumpe la tarea que se está ejecutando actualmente en los trabajadores especificados. Esto es equivalente a presionar Ctrl-C en la máquina local. Si no se proporcionan argumentos, se interrumpen todos los trabajadores.
Distributed.myid
— Functionmyid()
Obtiene el id del proceso actual.
Ejemplos
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> colección
Transforma la colección c
aplicando f
a cada elemento utilizando los trabajadores y tareas disponibles.
Para múltiples argumentos de colección, aplica f
elemento por elemento.
Ten en cuenta que f
debe estar disponible para todos los procesos de trabajo; consulta Disponibilidad de Código y Carga de Paquetes para más detalles.
Si no se especifica un grupo de trabajadores, se utilizarán todos los trabajadores disponibles a través de un CachingPool
.
Por defecto, pmap
distribuye la computación entre todos los trabajadores especificados. Para usar solo el proceso local y distribuir entre tareas, especifica distributed=false
. Esto es equivalente a usar asyncmap
. Por ejemplo, pmap(f, c; distributed=false)
es equivalente a asyncmap(f,c; ntasks=()->nworkers())
pmap
también puede usar una mezcla de procesos y tareas a través del argumento batch_size
. Para tamaños de lote mayores que 1, la colección se procesa en múltiples lotes, cada uno de longitud batch_size
o menos. Un lote se envía como una sola solicitud a un trabajador libre, donde un asyncmap
local procesa elementos del lote utilizando múltiples tareas concurrentes.
Cualquier error detiene a pmap
de procesar el resto de la colección. Para anular este comportamiento, puedes especificar una función de manejo de errores a través del argumento on_error
que toma un solo argumento, es decir, la excepción. La función puede detener el procesamiento volviendo a lanzar el error, o, para continuar, devolver cualquier valor que luego se devuelve en línea con los resultados al llamador.
Considera los siguientes dos ejemplos. El primero devuelve el objeto de excepción en línea, el segundo un 0 en lugar de cualquier excepción:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Los errores también se pueden manejar reintentando cálculos fallidos. Los argumentos clave retry_delays
y retry_check
se pasan a retry
como argumentos clave delays
y check
respectivamente. Si se especifica el agrupamiento, y un lote entero falla, todos los elementos en el lote se reintentan.
Ten en cuenta que si se especifican tanto on_error
como retry_delays
, el gancho on_error
se llama antes de reintentar. Si on_error
no lanza (o vuelve a lanzar) una excepción, el elemento no se reintentará.
Ejemplo: En caso de errores, reintenta f
en un elemento un máximo de 3 veces sin ningún retraso entre reintentos.
pmap(f, c; retry_delays = zeros(3))
Ejemplo: Reintenta f
solo si la excepción no es del tipo InexactError
, con retrasos que aumentan exponencialmente hasta 3 veces. Devuelve un NaN
en lugar de todas las ocurrencias de InexactError
.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
Las excepciones en cálculos remotos se capturan y se vuelven a lanzar localmente. Un RemoteException
envuelve el pid
del trabajador y una excepción capturada. Una CapturedException
captura la excepción remota y una forma serializable de la pila de llamadas cuando se produjo la excepción.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
Después de que un proceso cliente de Julia ha salido, los intentos posteriores de referenciar al hijo muerto lanzarán esta excepción.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Un Future
es un marcador de posición para un solo cálculo de estado de terminación y tiempo desconocidos. Para múltiples cálculos potenciales, consulte RemoteChannel
. Consulte remoteref_id
para identificar un AbstractRemoteRef
.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
Crea una referencia a un Channel{Any}(1)
en el proceso pid
. El pid
por defecto es el proceso actual.
RemoteChannel(f::Function, pid::Integer=myid())
Crea referencias a canales remotos de un tamaño y tipo específicos. f
es una función que, cuando se ejecuta en pid
, debe devolver una implementación de un AbstractChannel
.
Por ejemplo, RemoteChannel(()->Channel{Int}(10), pid)
, devolverá una referencia a un canal de tipo Int
y tamaño 10 en pid
.
El pid
por defecto es el proceso actual.
Base.fetch
— Methodfetch(x::Future)
Espera y obtiene el valor de un Future
. El valor obtenido se almacena en caché localmente. Las llamadas posteriores a fetch
en la misma referencia devuelven el valor en caché. Si el valor remoto es una excepción, lanza una RemoteException
que captura la excepción remota y la traza de la pila.
Base.fetch
— Methodfetch(c::RemoteChannel)
Espera y obtiene un valor de un RemoteChannel
. Las excepciones que se generan son las mismas que para un Future
. No elimina el elemento obtenido.
fetch(x::Any)
Devuelve x
.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
Llama a una función f
de manera asíncrona con los argumentos dados en el proceso especificado. Devuelve un Future
. Los argumentos de palabra clave, si los hay, se pasan a f
.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
Realiza un wait(remotecall(...))
más rápido en un solo mensaje en el Worker
especificado por el id de trabajador id
. Los argumentos de palabra clave, si los hay, se pasan a f
.
Consulta también wait
y remotecall
.
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
Realiza fetch(remotecall(...))
en un solo mensaje. Los argumentos de palabra clave, si los hay, se pasan a f
. Cualquier excepción remota se captura en una RemoteException
y se lanza.
Véase también fetch
y remotecall
.
Ejemplos
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: En el trabajador 2:
DomainError con -4.0:
sqrt fue llamado con un argumento real negativo pero solo devolverá un resultado complejo si se llama con un argumento complejo. Intenta sqrt(Complex(x)).
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
Ejecuta f
en el trabajador id
de forma asíncrona. A diferencia de remotecall
, no almacena el resultado de la computación, ni hay forma de esperar su finalización.
Una invocación exitosa indica que la solicitud ha sido aceptada para su ejecución en el nodo remoto.
Mientras que las llamadas consecutivas a remotecall
al mismo trabajador se serializan en el orden en que se invocan, el orden de las ejecuciones en el trabajador remoto es indeterminado. Por ejemplo, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
serializará la llamada a f1
, seguida de f2
y f3
en ese orden. Sin embargo, no se garantiza que f1
se ejecute antes que f3
en el trabajador 2.
Cualquier excepción lanzada por f
se imprime en stderr
en el trabajador remoto.
Los argumentos de palabra clave, si los hay, se pasan a f
.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
Almacena un conjunto de valores en el RemoteChannel
. Si el canal está lleno, bloquea hasta que haya espacio disponible. Devuelve el primer argumento.
Base.put!
— Methodput!(rr::Future, v)
Almacena un valor en un Future
rr
. Los Future
s son referencias remotas de escritura única. Un put!
en un Future
ya establecido lanza una Excepción
. Todas las llamadas remotas asíncronas devuelven Future
s y establecen el valor al valor de retorno de la llamada al completarse.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
Obtiene valor(es) de un RemoteChannel
rr
, eliminando el/los valor(es) en el proceso.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
Determina si un RemoteChannel
tiene un valor almacenado. Ten en cuenta que esta función puede causar condiciones de carrera, ya que para cuando recibas su resultado, puede que ya no sea cierto. Sin embargo, se puede usar de manera segura en un Future
ya que solo se asignan una vez.
Base.isready
— Methodisready(rr::Future)
Determina si un Future
tiene un valor almacenado.
Si el argumento Future
es propiedad de un nodo diferente, esta llamada se bloqueará para esperar la respuesta. Se recomienda esperar por rr
en una tarea separada o usar un Channel
local como proxy:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # no bloqueará
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
Supertipo para grupos de trabajadores como WorkerPool
y CachingPool
. Un AbstractWorkerPool
debe implementar:
push!
- agregar un nuevo trabajador al grupo general (disponible + ocupado)put!
- devolver un trabajador al grupo disponibletake!
- tomar un trabajador del grupo disponible (para ser utilizado en la ejecución de funciones remotas)length
- número de trabajadores disponibles en el grupo generalisready
- devolver falso si untake!
en el grupo bloquearía, de lo contrario, verdadero
Las implementaciones predeterminadas de lo anterior (en un AbstractWorkerPool
) requieren campos
channel::Channel{Int}
workers::Set{Int}
donde channel
contiene pids de trabajadores libres y workers
es el conjunto de todos los trabajadores asociados con este grupo.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Crea un WorkerPool
a partir de un vector o rango de identificadores de trabajadores.
Ejemplos
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
Una implementación de un AbstractWorkerPool
. remote
, remotecall_fetch
, pmap
(y otras llamadas remotas que ejecutan funciones de forma remota) se benefician de almacenar en caché las funciones serializadas/deserializadas en los nodos de trabajo, especialmente los cierres (que pueden capturar grandes cantidades de datos).
La caché remota se mantiene durante la vida útil del objeto CachingPool
devuelto. Para limpiar la caché antes, usa clear!(pool)
.
Para las variables globales, solo se capturan los enlaces en un cierre, no los datos. Se pueden usar bloques let
para capturar datos globales.
Ejemplos
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
Lo anterior transferiría foo
solo una vez a cada trabajador.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
que contiene workers
inactivos - utilizado por remote(f)
y pmap
(por defecto). A menos que se establezca explícitamente a través de default_worker_pool!(pool)
, el grupo de trabajadores predeterminado se inicializa a un WorkerPool
.
Ejemplos
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
Limpia las vinculaciones globales en los módulos inicializándolas a nothing
. syms
debe ser de tipo Symbol
o una colección de Symbol
s. pids
y mod
identifican los procesos y el módulo en el que se deben reinicializar las variables globales. Solo se limpian aquellos nombres que se encuentran definidos bajo mod
.
Se genera una excepción si se solicita limpiar una constante global.
clear!(pool::CachingPool) -> pool
Elimina todas las funciones en caché de todos los trabajadores participantes.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
Devuelve una función anónima que ejecuta la función f
en un trabajador disponible (extraído de WorkerPool
p
si se proporciona) utilizando remotecall_fetch
.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
variante de remotecall(f, pid, ....)
. Espera y toma un trabajador libre de pool
y realiza un remotecall
en él.
Ejemplos
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
En este ejemplo, la tarea se ejecutó en pid 2, llamada desde pid 1.
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
Variante de WorkerPool
de remotecall_wait(f, pid, ....)
. Espera y toma un trabajador libre de pool
y realiza un remotecall_wait
en él.
Ejemplos
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
variante de remotecall_fetch(f, pid, ....)
. Espera y toma un trabajador libre de pool
y realiza un remotecall_fetch
en él.
Ejemplos
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
Variante de WorkerPool
de remote_do(f, pid, ....)
. Espera y toma un trabajador libre de pool
y realiza un remote_do
en él.
Distributed.@spawn
— Macro@spawn expr
Crea un cierre alrededor de una expresión y la ejecuta en un proceso elegido automáticamente, devolviendo un Future
al resultado. Este macro está en desuso; se debe usar @spawnat :any expr
en su lugar.
Ejemplos
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
A partir de Julia 1.3, este macro está en desuso. Usa @spawnat :any
en su lugar.
Distributed.@spawnat
— Macro@spawnat p expr
Crea un cierre alrededor de una expresión y ejecuta el cierre de manera asíncrona en el proceso p
. Devuelve un Future
al resultado. Si p
es el símbolo literal citado :any
, entonces el sistema elegirá automáticamente un procesador para usar.
Ejemplos
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
El argumento :any
está disponible a partir de Julia 1.3.
Distributed.@fetch
— Macro@fetch expr
Equivalente a fetch(@spawnat :any expr)
. Ver fetch
y @spawnat
.
Ejemplos
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
Equivalente a fetch(@spawnat p expr)
. Ver fetch
y @spawnat
.
Ejemplos
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
Un bucle for paralelo de memoria distribuida de la forma:
@distributed [reducer] for var = range
body
end
El rango especificado se particiona y se ejecuta localmente en todos los trabajadores. En caso de que se especifique una función reductor opcional, @distributed
realiza reducciones locales en cada trabajador con una reducción final en el proceso que llama.
Tenga en cuenta que sin una función reductor, @distributed
se ejecuta de manera asíncrona, es decir, genera tareas independientes en todos los trabajadores disponibles y devuelve inmediatamente sin esperar a que se complete. Para esperar a que se complete, prefije la llamada con @sync
, como:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
Ejecuta una expresión bajo Main
en todos los procs
. Los errores en cualquiera de los procesos se recopilan en una CompositeException
y se lanzan. Por ejemplo:
@everywhere bar = 1
definirá Main.bar
en todos los procesos actuales. Cualquier proceso agregado más tarde (digamos con addprocs()
) no tendrá la expresión definida.
A diferencia de @spawnat
, @everywhere
no captura ninguna variable local. En su lugar, las variables locales se pueden transmitir utilizando interpolación:
foo = 1
@everywhere bar = $foo
El argumento opcional procs
permite especificar un subconjunto de todos los procesos para que ejecuten la expresión.
Similar a llamar a remotecall_eval(Main, procs, expr)
, pero con dos características adicionales:
- Las declaraciones `using` e `import` se ejecutan primero en el proceso que llama, para asegurar
que los paquetes estén precompilados.
- La ruta del archivo fuente actual utilizada por `include` se propaga a otros procesos.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s y RemoteChannel
s se identifican por campos:
where
- se refiere al nodo donde el objeto/almacenamiento subyacente al que se refiere la referencia realmente existe.whence
- se refiere al nodo desde el cual se creó la referencia remota. Tenga en cuenta que esto es diferente del nodo donde el objeto subyacente al que se refiere realmente existe. Por ejemplo, llamar aRemoteChannel(2)
desde el proceso maestro resultaría en un valor dewhere
de 2 y un valor dewhence
de 1.id
es único entre todas las referencias creadas desde el trabajador especificado porwhence
.
Tomados en conjunto, whence
e id
identifican de manera única una referencia entre todos los trabajadores.
remoteref_id
es una API de bajo nivel que devuelve un objeto RRID
que envuelve los valores de whence
e id
de una referencia remota.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
Una API de bajo nivel que devuelve el AbstractChannel
de respaldo para un id
devuelto por remoteref_id
. La llamada es válida solo en el nodo donde existe el canal de respaldo.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
Una API de bajo nivel que, dado un IO
conexión o un Worker
, devuelve el pid
del trabajador al que está conectado. Esto es útil al escribir métodos personalizados serialize
para un tipo, que optimizan los datos escritos dependiendo del id del proceso receptor.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
Devuelve la cookie del clúster.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
Establece la cookie pasada como la cookie del clúster y luego la devuelve.
Cluster Manager Interface
Esta interfaz proporciona un mecanismo para lanzar y gestionar trabajadores de Julia en diferentes entornos de clúster. Hay dos tipos de gestores presentes en Base: LocalManager
, para lanzar trabajadores adicionales en el mismo host, y SSHManager
, para lanzar en hosts remotos a través de ssh
. Se utilizan sockets TCP/IP para conectar y transportar mensajes entre procesos. Es posible que los Gestores de Clúster proporcionen un transporte diferente.
Distributed.ClusterManager
— TypeClusterManager
Supertipo para administradores de clúster, que controlan procesos de trabajadores como un clúster. Los administradores de clúster implementan cómo se pueden agregar, eliminar y comunicar los trabajadores. SSHManager
y LocalManager
son subtipos de esto.
Distributed.WorkerConfig
— TypeWorkerConfig
Tipo utilizado por ClusterManager
s para controlar los trabajadores añadidos a sus clústeres. Algunos campos son utilizados por todos los administradores de clúster para acceder a un host:
io
– la conexión utilizada para acceder al trabajador (un subtipo deIO
oNothing
)host
– la dirección del host (ya sea unString
oNothing
)port
– el puerto en el host utilizado para conectarse al trabajador (ya sea unInt
oNothing
)
Algunos son utilizados por el administrador de clúster para añadir trabajadores a un host ya inicializado:
count
– el número de trabajadores que se lanzarán en el hostexename
– la ruta al ejecutable de Julia en el host, por defecto es"$(Sys.BINDIR)/julia"
o"$(Sys.BINDIR)/julia-debug"
exeflags
– banderas a utilizar al lanzar Julia de forma remota
El campo userdata
se utiliza para almacenar información para cada trabajador por administradores externos.
Algunos campos son utilizados por SSHManager
y administradores similares:
tunnel
–true
(usar túneles),false
(no usar túneles), onothing
(usar el valor por defecto para el administrador)multiplex
–true
(usar multiplexión SSH para túneles) ofalse
forward
– la opción de reenvío utilizada para la opción-L
de sshbind_addr
– la dirección en el host remoto a la que enlazarsshflags
– banderas a utilizar al establecer la conexión SSHmax_parallel
– el número máximo de trabajadores a los que conectarse en paralelo en el host
Algunos campos son utilizados tanto por LocalManager
s como por SSHManager
s:
connect_at
– determina si esta es una llamada de configuración de trabajador a trabajador o de controlador a trabajadorprocess
– el proceso que se conectará (generalmente el administrador asignará esto duranteaddprocs
)ospid
– el ID del proceso según el sistema operativo del host, utilizado para interrumpir procesos de trabajadoresenviron
– diccionario privado utilizado para almacenar información temporal por administradores Local/SSHident
– trabajador tal como es identificado por elClusterManager
connect_idents
– lista de IDs de trabajadores a los que el trabajador debe conectarse si se utiliza una topología personalizadaenable_threaded_blas
–true
,false
, onothing
, si se debe usar BLAS en hilos o no en los trabajadores
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Implementado por administradores de clúster. Para cada trabajador de Julia lanzado por esta función, debe agregar una entrada WorkerConfig
a launched
y notificar a launch_ntfy
. La función DEBE salir una vez que todos los trabajadores, solicitados por manager
, hayan sido lanzados. params
es un diccionario de todos los argumentos de palabra clave con los que se llamó a addprocs
.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Implementado por los administradores de clúster. Se llama en el proceso maestro, durante la vida útil de un trabajador, con los valores op
apropiados:
- con
:register
/:deregister
cuando un trabajador es agregado / eliminado del grupo de trabajadores de Julia. - con
:interrupt
cuando se llama ainterrupt(workers)
. ElClusterManager
debe señalizar al trabajador apropiado con una señal de interrupción. - con
:finalize
para propósitos de limpieza.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Implementado por los administradores de clúster. Se llama en el proceso maestro, por rmprocs
. Debería hacer que el trabajador remoto especificado por pid
salga. kill(manager::ClusterManager.....)
ejecuta un exit()
remoto en pid
.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Implementado por administradores de clúster utilizando transportes personalizados. Debe establecer una conexión lógica con el trabajador con id pid
, especificado por config
y devolver un par de objetos IO
. Los mensajes de pid
al proceso actual se leerán de instrm
, mientras que los mensajes que se enviarán a pid
se escribirán en outstrm
. La implementación del transporte personalizado debe garantizar que los mensajes se entreguen y reciban completamente y en orden. connect(manager::ClusterManager.....)
establece conexiones de socket TCP/IP entre los trabajadores.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Llamado por administradores de clúster que implementan transportes personalizados. Inicializa un proceso recién lanzado como un trabajador. El argumento de línea de comandos --worker[=<cookie>]
tiene el efecto de inicializar un proceso como un trabajador utilizando sockets TCP/IP para el transporte. cookie
es un cluster_cookie
.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
es una función interna que es el punto de entrada predeterminado para los procesos de trabajo que se conectan a través de TCP/IP. Configura el proceso como un trabajador del clúster de Julia.
La información de host:puerto se escribe en el flujo out
(por defecto en stdout).
La función lee la cookie de stdin si es necesario, y escucha en un puerto libre (o si se especifica, el puerto en la opción de línea de comandos --bind-to
) y programa tareas para procesar conexiones y solicitudes TCP entrantes. También (opcionalmente) cierra stdin y redirige stderr a stdout.
No devuelve.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Llamado por los administradores de clúster utilizando transportes personalizados. Debe ser llamado cuando la implementación del transporte personalizado recibe el primer mensaje de un trabajador remoto. El transporte personalizado debe gestionar una conexión lógica con el trabajador remoto y proporcionar dos objetos IO
, uno para los mensajes entrantes y el otro para los mensajes dirigidos al trabajador remoto. Si incoming
es true
, el par remoto inició la conexión. Cualquiera de los dos que inicie la conexión envía la cookie del clúster y su número de versión de Julia para realizar el apretón de manos de autenticación.
Véase también cluster_cookie
.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Implementado por administradores de clúster. Los parámetros de palabra clave predeterminados que se pasan al llamar a addprocs(mgr)
. El conjunto mínimo de opciones está disponible al llamar a default_addprocs_params()
.