Distributed Computing

Distributed.addprocsFunction
addprocs(manager::ClusterManager; kwargs...) -> Lista de identificadores de procesos

Lanza procesos de trabajo a través del administrador de clústeres especificado.

Por ejemplo, los clústeres Beowulf son compatibles a través de un administrador de clústeres personalizado implementado en el paquete ClusterManagers.jl.

El número de segundos que un nuevo trabajador espera para el establecimiento de la conexión desde el maestro se puede especificar a través de la variable JULIA_WORKER_TIMEOUT en el entorno del proceso de trabajo. Relevante solo cuando se utiliza TCP/IP como transporte.

Para lanzar trabajadores sin bloquear el REPL, o la función que contiene si se lanzan trabajadores programáticamente, ejecuta addprocs en su propia tarea.

Ejemplos

# En clústeres ocupados, llama a `addprocs` de forma asíncrona
t = @async addprocs(...)
# Utiliza trabajadores a medida que se conectan
if nprocs() > 1   # Asegúrate de que al menos un nuevo trabajador esté disponible
   ....   # realiza ejecución distribuida
end
# Recupera los IDs de los trabajadores recién lanzados, o cualquier mensaje de error
if istaskdone(t)   # Verifica si `addprocs` ha completado para asegurar que `fetch` no bloquee
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Lista de identificadores de procesos

Agrega procesos de trabajo en máquinas remotas a través de SSH. La configuración se realiza con argumentos de palabra clave (ver más abajo). En particular, se puede usar la palabra clave exename para especificar la ruta al binario julia en la(s) máquina(s) remota(s).

machines es un vector de "especificaciones de máquina" que se dan como cadenas en la forma [user@]host[:port] [bind_addr[:port]]. user se establece de forma predeterminada como el usuario actual y port como el puerto SSH estándar. Si se especifica [bind_addr[:port]], otros trabajadores se conectarán a este trabajador en el bind_addr y port especificados.

Es posible lanzar múltiples procesos en un host remoto utilizando una tupla en el vector machines o la forma (machine_spec, count), donde count es el número de trabajadores que se lanzarán en el host especificado. Pasar :auto como el conteo de trabajadores lanzará tantos trabajadores como el número de hilos de CPU en el host remoto.

Ejemplos:

addprocs([
    "remote1",               # un trabajador en 'remote1' iniciando sesión con el nombre de usuario actual
    "user@remote2",          # un trabajador en 'remote2' iniciando sesión con el nombre de usuario 'user'
    "user@remote3:2222",     # especificando el puerto SSH a '2222' para 'remote3'
    ("user@remote4", 4),     # lanzar 4 trabajadores en 'remote4'
    ("user@remote5", :auto), # lanzar tantos trabajadores como hilos de CPU en 'remote5'
])

Argumentos de palabra clave:

  • tunnel: si true, se utilizará un túnel SSH para conectarse al trabajador desde el proceso maestro. El valor predeterminado es false.

  • multiplex: si true, se utiliza multiplexión SSH para el túnel SSH. El valor predeterminado es false.

  • ssh: el nombre o la ruta del ejecutable del cliente SSH utilizado para iniciar los trabajadores. El valor predeterminado es "ssh".

  • sshflags: especifica opciones ssh adicionales, p. ej. sshflags=`-i /home/foo/bar.pem`

  • max_parallel: especifica el número máximo de trabajadores conectados en paralelo en un host. El valor predeterminado es 10.

  • shell: especifica el tipo de shell al que ssh se conecta en los trabajadores.

    • shell=:posix: un shell Unix/Linux compatible con POSIX (sh, ksh, bash, dash, zsh, etc.). El valor predeterminado.
    • shell=:csh: un shell C de Unix (csh, tcsh).
    • shell=:wincmd: Microsoft Windows cmd.exe.
  • dir: especifica el directorio de trabajo en los trabajadores. El valor predeterminado es el directorio actual del host (como se encuentra con pwd())

  • enable_threaded_blas: si true, entonces BLAS se ejecutará en múltiples hilos en los procesos añadidos. El valor predeterminado es false.

  • exename: nombre del ejecutable julia. El valor predeterminado es "$(Sys.BINDIR)/julia" o "$(Sys.BINDIR)/julia-debug" según sea el caso. Se recomienda que se use una versión común de Julia en todas las máquinas remotas porque la serialización y la distribución de código podrían fallar de lo contrario.

  • exeflags: banderas adicionales pasadas a los procesos de trabajo.

  • topology: Especifica cómo los trabajadores se conectan entre sí. Enviar un mensaje entre trabajadores no conectados resulta en un error.

    • topology=:all_to_all: Todos los procesos están conectados entre sí. El valor predeterminado.
    • topology=:master_worker: Solo el proceso controlador, es decir, pid 1 se conecta a los trabajadores. Los trabajadores no se conectan entre sí.
    • topology=:custom: El método launch del administrador de clústeres especifica la topología de conexión a través de los campos ident y connect_idents en WorkerConfig. Un trabajador con una identidad de administrador de clúster ident se conectará a todos los trabajadores especificados en connect_idents.
  • lazy: Aplicable solo con topology=:all_to_all. Si true, las conexiones trabajador-trabajador se configuran de manera perezosa, es decir, se configuran en la primera instancia de una llamada remota entre trabajadores. El valor predeterminado es true.

  • env: proporciona un array de pares de cadenas como env=["JULIA_DEPOT_PATH"=>"/depot"] para solicitar que se establezcan variables de entorno en la máquina remota. Por defecto, solo la variable de entorno JULIA_WORKER_TIMEOUT se pasa automáticamente del entorno local al remoto.

  • cmdline_cookie: pasa la cookie de autenticación a través de la opción de línea de comandos --worker. El comportamiento predeterminado (más seguro) de pasar la cookie a través de ssh stdio puede colgarse con trabajadores de Windows que utilizan versiones anteriores de Julia o Windows (pre-ConPTY), en cuyo caso cmdline_cookie=true ofrece una solución alternativa.

Julia 1.6

Los argumentos de palabra clave ssh, shell, env y cmdline_cookie se añadieron en Julia 1.6.

Variables de entorno:

Si el proceso maestro no logra establecer una conexión con un trabajador recién lanzado dentro de 60.0 segundos, el trabajador lo considera una situación fatal y termina. Este tiempo de espera se puede controlar a través de la variable de entorno JULIA_WORKER_TIMEOUT. El valor de JULIA_WORKER_TIMEOUT en el proceso maestro especifica el número de segundos que un trabajador recién lanzado espera para establecer la conexión.

source
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Lista de identificadores de procesos

Lanza np trabajadores en el host local utilizando el LocalManager incorporado.

Los trabajadores locales heredan el entorno de paquetes actual (es decir, proyecto activo, LOAD_PATH y DEPOT_PATH) del proceso principal.

Warning

Tenga en cuenta que los trabajadores no ejecutan un script de inicio ~/.julia/config/startup.jl, ni sincronizan su estado global (como opciones de línea de comandos, variables globales, definiciones de nuevos métodos y módulos cargados) con ninguno de los otros procesos en ejecución.

Argumentos clave:

  • restrict::Bool: si es true (por defecto) la vinculación está restringida a 127.0.0.1.
  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas: mismo efecto que para SSHManager, consulte la documentación para addprocs(machines::AbstractVector).
Julia 1.9

La herencia del entorno de paquetes y el argumento clave env se añadieron en Julia 1.9.

source
Distributed.nprocsFunction
nprocs()

Obtiene el número de procesos disponibles.

Ejemplos

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.nworkersFunction
nworkers()

Obtiene el número de procesos de trabajo disponibles. Esto es uno menos que nprocs(). Igual a nprocs() si nprocs() == 1.

Ejemplos

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
source
Distributed.procsMethod
procs()

Devuelve una lista de todos los identificadores de proceso, incluyendo el pid 1 (que no está incluido por workers()).

Ejemplos

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source
Distributed.procsMethod
procs(pid::Integer)

Devuelve una lista de todos los identificadores de proceso en el mismo nodo físico. Específicamente, se devuelven todos los trabajadores vinculados a la misma dirección IP que pid.

source
Distributed.workersFunction
workers()

Devuelve una lista de todos los identificadores de procesos de trabajo.

Ejemplos

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.rmprocsFunction
rmprocs(pids...; waitfor=typemax(Int))

Elimina los trabajadores especificados. Ten en cuenta que solo el proceso 1 puede agregar o eliminar trabajadores.

El argumento waitfor especifica cuánto tiempo esperar para que los trabajadores se apaguen:

  • Si no se especifica, rmprocs esperará hasta que todos los pids solicitados sean eliminados.
  • Se genera una ErrorException si no se pueden terminar todos los trabajadores antes de los segundos solicitados en waitfor.
  • Con un valor de waitfor de 0, la llamada devuelve inmediatamente con los trabajadores programados para ser eliminados en una tarea diferente. Se devuelve el objeto Task programado. El usuario debe llamar a wait en la tarea antes de invocar cualquier otra llamada paralela.

Ejemplos

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source
Distributed.interruptFunction
interrupt(pids::Integer...)

Interrumpe la tarea que se está ejecutando actualmente en los trabajadores especificados. Esto es equivalente a presionar Ctrl-C en la máquina local. Si no se dan argumentos, se interrumpen todos los trabajadores.

source
interrupt(pids::AbstractVector=workers())

Interrumpe la tarea que se está ejecutando actualmente en los trabajadores especificados. Esto es equivalente a presionar Ctrl-C en la máquina local. Si no se proporcionan argumentos, se interrumpen todos los trabajadores.

source
Distributed.myidFunction
myid()

Obtiene el id del proceso actual.

Ejemplos

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source
Distributed.pmapFunction
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> colección

Transforma la colección c aplicando f a cada elemento utilizando los trabajadores y tareas disponibles.

Para múltiples argumentos de colección, aplica f elemento por elemento.

Ten en cuenta que f debe estar disponible para todos los procesos de trabajo; consulta Disponibilidad de Código y Carga de Paquetes para más detalles.

Si no se especifica un grupo de trabajadores, se utilizarán todos los trabajadores disponibles a través de un CachingPool.

Por defecto, pmap distribuye la computación entre todos los trabajadores especificados. Para usar solo el proceso local y distribuir entre tareas, especifica distributed=false. Esto es equivalente a usar asyncmap. Por ejemplo, pmap(f, c; distributed=false) es equivalente a asyncmap(f,c; ntasks=()->nworkers())

pmap también puede usar una mezcla de procesos y tareas a través del argumento batch_size. Para tamaños de lote mayores que 1, la colección se procesa en múltiples lotes, cada uno de longitud batch_size o menos. Un lote se envía como una sola solicitud a un trabajador libre, donde un asyncmap local procesa elementos del lote utilizando múltiples tareas concurrentes.

Cualquier error detiene a pmap de procesar el resto de la colección. Para anular este comportamiento, puedes especificar una función de manejo de errores a través del argumento on_error que toma un solo argumento, es decir, la excepción. La función puede detener el procesamiento volviendo a lanzar el error, o, para continuar, devolver cualquier valor que luego se devuelve en línea con los resultados al llamador.

Considera los siguientes dos ejemplos. El primero devuelve el objeto de excepción en línea, el segundo un 0 en lugar de cualquier excepción:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Los errores también se pueden manejar reintentando cálculos fallidos. Los argumentos clave retry_delays y retry_check se pasan a retry como argumentos clave delays y check respectivamente. Si se especifica el agrupamiento, y un lote entero falla, todos los elementos en el lote se reintentan.

Ten en cuenta que si se especifican tanto on_error como retry_delays, el gancho on_error se llama antes de reintentar. Si on_error no lanza (o vuelve a lanzar) una excepción, el elemento no se reintentará.

Ejemplo: En caso de errores, reintenta f en un elemento un máximo de 3 veces sin ningún retraso entre reintentos.

pmap(f, c; retry_delays = zeros(3))

Ejemplo: Reintenta f solo si la excepción no es del tipo InexactError, con retrasos que aumentan exponencialmente hasta 3 veces. Devuelve un NaN en lugar de todas las ocurrencias de InexactError.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
source
Distributed.RemoteExceptionType
RemoteException(captured)

Las excepciones en cálculos remotos se capturan y se vuelven a lanzar localmente. Un RemoteException envuelve el pid del trabajador y una excepción capturada. Una CapturedException captura la excepción remota y una forma serializable de la pila de llamadas cuando se produjo la excepción.

source
Distributed.ProcessExitedExceptionType
ProcessExitedException(worker_id::Int)

Después de que un proceso cliente de Julia ha salido, los intentos posteriores de referenciar al hijo muerto lanzarán esta excepción.

source
Distributed.FutureType
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Un Future es un marcador de posición para un solo cálculo de estado de terminación y tiempo desconocidos. Para múltiples cálculos potenciales, consulte RemoteChannel. Consulte remoteref_id para identificar un AbstractRemoteRef.

source
Distributed.RemoteChannelType
RemoteChannel(pid::Integer=myid())

Crea una referencia a un Channel{Any}(1) en el proceso pid. El pid por defecto es el proceso actual.

RemoteChannel(f::Function, pid::Integer=myid())

Crea referencias a canales remotos de un tamaño y tipo específicos. f es una función que, cuando se ejecuta en pid, debe devolver una implementación de un AbstractChannel.

Por ejemplo, RemoteChannel(()->Channel{Int}(10), pid), devolverá una referencia a un canal de tipo Int y tamaño 10 en pid.

El pid por defecto es el proceso actual.

source
Base.fetchMethod
fetch(x::Future)

Espera y obtiene el valor de un Future. El valor obtenido se almacena en caché localmente. Las llamadas posteriores a fetch en la misma referencia devuelven el valor en caché. Si el valor remoto es una excepción, lanza una RemoteException que captura la excepción remota y la traza de la pila.

source
Base.fetchMethod
fetch(c::RemoteChannel)

Espera y obtiene un valor de un RemoteChannel. Las excepciones que se generan son las mismas que para un Future. No elimina el elemento obtenido.

source
fetch(x::Any)

Devuelve x.

source
Distributed.remotecallMethod
remotecall(f, id::Integer, args...; kwargs...) -> Future

Llama a una función f de manera asíncrona con los argumentos dados en el proceso especificado. Devuelve un Future. Los argumentos de palabra clave, si los hay, se pasan a f.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, id::Integer, args...; kwargs...)

Realiza un wait(remotecall(...)) más rápido en un solo mensaje en el Worker especificado por el id de trabajador id. Los argumentos de palabra clave, si los hay, se pasan a f.

Consulta también wait y remotecall.

source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, id::Integer, args...; kwargs...)

Realiza fetch(remotecall(...)) en un solo mensaje. Los argumentos de palabra clave, si los hay, se pasan a f. Cualquier excepción remota se captura en una RemoteException y se lanza.

Véase también fetch y remotecall.

Ejemplos

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: En el trabajador 2:
DomainError con -4.0:
sqrt fue llamado con un argumento real negativo pero solo devolverá un resultado complejo si se llama con un argumento complejo. Intenta sqrt(Complex(x)).
...
source
Distributed.remote_doMethod
remote_do(f, id::Integer, args...; kwargs...) -> nothing

Ejecuta f en el trabajador id de forma asíncrona. A diferencia de remotecall, no almacena el resultado de la computación, ni hay forma de esperar su finalización.

Una invocación exitosa indica que la solicitud ha sido aceptada para su ejecución en el nodo remoto.

Mientras que las llamadas consecutivas a remotecall al mismo trabajador se serializan en el orden en que se invocan, el orden de las ejecuciones en el trabajador remoto es indeterminado. Por ejemplo, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) serializará la llamada a f1, seguida de f2 y f3 en ese orden. Sin embargo, no se garantiza que f1 se ejecute antes que f3 en el trabajador 2.

Cualquier excepción lanzada por f se imprime en stderr en el trabajador remoto.

Los argumentos de palabra clave, si los hay, se pasan a f.

source
Base.put!Method
put!(rr::RemoteChannel, args...)

Almacena un conjunto de valores en el RemoteChannel. Si el canal está lleno, bloquea hasta que haya espacio disponible. Devuelve el primer argumento.

source
Base.put!Method
put!(rr::Future, v)

Almacena un valor en un Future rr. Los Futures son referencias remotas de escritura única. Un put! en un Future ya establecido lanza una Excepción. Todas las llamadas remotas asíncronas devuelven Futures y establecen el valor al valor de retorno de la llamada al completarse.

source
Base.isreadyMethod
isready(rr::RemoteChannel, args...)

Determina si un RemoteChannel tiene un valor almacenado. Ten en cuenta que esta función puede causar condiciones de carrera, ya que para cuando recibas su resultado, puede que ya no sea cierto. Sin embargo, se puede usar de manera segura en un Future ya que solo se asignan una vez.

source
Base.isreadyMethod
isready(rr::Future)

Determina si un Future tiene un valor almacenado.

Si el argumento Future es propiedad de un nodo diferente, esta llamada se bloqueará para esperar la respuesta. Se recomienda esperar por rr en una tarea separada o usar un Channel local como proxy:

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # no bloqueará
source
Distributed.AbstractWorkerPoolType
AbstractWorkerPool

Supertipo para grupos de trabajadores como WorkerPool y CachingPool. Un AbstractWorkerPool debe implementar:

  • push! - agregar un nuevo trabajador al grupo general (disponible + ocupado)
  • put! - devolver un trabajador al grupo disponible
  • take! - tomar un trabajador del grupo disponible (para ser utilizado en la ejecución de funciones remotas)
  • length - número de trabajadores disponibles en el grupo general
  • isready - devolver falso si un take! en el grupo bloquearía, de lo contrario, verdadero

Las implementaciones predeterminadas de lo anterior (en un AbstractWorkerPool) requieren campos

  • channel::Channel{Int}
  • workers::Set{Int}

donde channel contiene pids de trabajadores libres y workers es el conjunto de todos los trabajadores asociados con este grupo.

source
Distributed.WorkerPoolType
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

Crea un WorkerPool a partir de un vector o rango de identificadores de trabajadores.

Ejemplos

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
source
Distributed.CachingPoolType
CachingPool(workers::Vector{Int})

Una implementación de un AbstractWorkerPool. remote, remotecall_fetch, pmap (y otras llamadas remotas que ejecutan funciones de forma remota) se benefician de almacenar en caché las funciones serializadas/deserializadas en los nodos de trabajo, especialmente los cierres (que pueden capturar grandes cantidades de datos).

La caché remota se mantiene durante la vida útil del objeto CachingPool devuelto. Para limpiar la caché antes, usa clear!(pool).

Para las variables globales, solo se capturan los enlaces en un cierre, no los datos. Se pueden usar bloques let para capturar datos globales.

Ejemplos

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

Lo anterior transferiría foo solo una vez a cada trabajador.

source
Distributed.default_worker_poolFunction
default_worker_pool()

AbstractWorkerPool que contiene workers inactivos - utilizado por remote(f) y pmap (por defecto). A menos que se establezca explícitamente a través de default_worker_pool!(pool), el grupo de trabajadores predeterminado se inicializa a un WorkerPool.

Ejemplos

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source
Distributed.clear!Function
clear!(syms, pids=workers(); mod=Main)

Limpia las vinculaciones globales en los módulos inicializándolas a nothing. syms debe ser de tipo Symbol o una colección de Symbols. pids y mod identifican los procesos y el módulo en el que se deben reinicializar las variables globales. Solo se limpian aquellos nombres que se encuentran definidos bajo mod.

Se genera una excepción si se solicita limpiar una constante global.

source
clear!(pool::CachingPool) -> pool

Elimina todas las funciones en caché de todos los trabajadores participantes.

source
Distributed.remotecallMethod
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool variante de remotecall(f, pid, ....). Espera y toma un trabajador libre de pool y realiza un remotecall en él.

Ejemplos

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

En este ejemplo, la tarea se ejecutó en pid 2, llamada desde pid 1.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

Variante de WorkerPool de remotecall_wait(f, pid, ....). Espera y toma un trabajador libre de pool y realiza un remotecall_wait en él.

Ejemplos

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool variante de remotecall_fetch(f, pid, ....). Espera y toma un trabajador libre de pool y realiza un remotecall_fetch en él.

Ejemplos

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source
Distributed.remote_doMethod
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

Variante de WorkerPool de remote_do(f, pid, ....). Espera y toma un trabajador libre de pool y realiza un remote_do en él.

source
Distributed.@spawnMacro
@spawn expr

Crea un cierre alrededor de una expresión y la ejecuta en un proceso elegido automáticamente, devolviendo un Future al resultado. Este macro está en desuso; se debe usar @spawnat :any expr en su lugar.

Ejemplos

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

A partir de Julia 1.3, este macro está en desuso. Usa @spawnat :any en su lugar.

source
Distributed.@spawnatMacro
@spawnat p expr

Crea un cierre alrededor de una expresión y ejecuta el cierre de manera asíncrona en el proceso p. Devuelve un Future al resultado. Si p es el símbolo literal citado :any, entonces el sistema elegirá automáticamente un procesador para usar.

Ejemplos

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

El argumento :any está disponible a partir de Julia 1.3.

source
Distributed.@fetchMacro
@fetch expr

Equivalente a fetch(@spawnat :any expr). Ver fetch y @spawnat.

Ejemplos

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source
Distributed.@distributedMacro
@distributed

Un bucle for paralelo de memoria distribuida de la forma:

@distributed [reducer] for var = range
    body
end

El rango especificado se particiona y se ejecuta localmente en todos los trabajadores. En caso de que se especifique una función reductor opcional, @distributed realiza reducciones locales en cada trabajador con una reducción final en el proceso que llama.

Tenga en cuenta que sin una función reductor, @distributed se ejecuta de manera asíncrona, es decir, genera tareas independientes en todos los trabajadores disponibles y devuelve inmediatamente sin esperar a que se complete. Para esperar a que se complete, prefije la llamada con @sync, como:

@sync @distributed for var = range
    body
end
source
Distributed.@everywhereMacro
@everywhere [procs()] expr

Ejecuta una expresión bajo Main en todos los procs. Los errores en cualquiera de los procesos se recopilan en una CompositeException y se lanzan. Por ejemplo:

@everywhere bar = 1

definirá Main.bar en todos los procesos actuales. Cualquier proceso agregado más tarde (digamos con addprocs()) no tendrá la expresión definida.

A diferencia de @spawnat, @everywhere no captura ninguna variable local. En su lugar, las variables locales se pueden transmitir utilizando interpolación:

foo = 1
@everywhere bar = $foo

El argumento opcional procs permite especificar un subconjunto de todos los procesos para que ejecuten la expresión.

Similar a llamar a remotecall_eval(Main, procs, expr), pero con dos características adicionales:

- Las declaraciones `using` e `import` se ejecutan primero en el proceso que llama, para asegurar
  que los paquetes estén precompilados.
- La ruta del archivo fuente actual utilizada por `include` se propaga a otros procesos.
source
Distributed.remoteref_idFunction
remoteref_id(r::AbstractRemoteRef) -> RRID

Futures y RemoteChannels se identifican por campos:

  • where - se refiere al nodo donde el objeto/almacenamiento subyacente al que se refiere la referencia realmente existe.
  • whence - se refiere al nodo desde el cual se creó la referencia remota. Tenga en cuenta que esto es diferente del nodo donde el objeto subyacente al que se refiere realmente existe. Por ejemplo, llamar a RemoteChannel(2) desde el proceso maestro resultaría en un valor de where de 2 y un valor de whence de 1.
  • id es único entre todas las referencias creadas desde el trabajador especificado por whence.

Tomados en conjunto, whence e id identifican de manera única una referencia entre todos los trabajadores.

remoteref_id es una API de bajo nivel que devuelve un objeto RRID que envuelve los valores de whence e id de una referencia remota.

source
Distributed.channel_from_idFunction
channel_from_id(id) -> c

Una API de bajo nivel que devuelve el AbstractChannel de respaldo para un id devuelto por remoteref_id. La llamada es válida solo en el nodo donde existe el canal de respaldo.

source
Distributed.worker_id_from_socketFunction
worker_id_from_socket(s) -> pid

Una API de bajo nivel que, dado un IO conexión o un Worker, devuelve el pid del trabajador al que está conectado. Esto es útil al escribir métodos personalizados serialize para un tipo, que optimizan los datos escritos dependiendo del id del proceso receptor.

source

Cluster Manager Interface

Esta interfaz proporciona un mecanismo para lanzar y gestionar trabajadores de Julia en diferentes entornos de clúster. Hay dos tipos de gestores presentes en Base: LocalManager, para lanzar trabajadores adicionales en el mismo host, y SSHManager, para lanzar en hosts remotos a través de ssh. Se utilizan sockets TCP/IP para conectar y transportar mensajes entre procesos. Es posible que los Gestores de Clúster proporcionen un transporte diferente.

Distributed.ClusterManagerType
ClusterManager

Supertipo para administradores de clúster, que controlan procesos de trabajadores como un clúster. Los administradores de clúster implementan cómo se pueden agregar, eliminar y comunicar los trabajadores. SSHManager y LocalManager son subtipos de esto.

source
Distributed.WorkerConfigType
WorkerConfig

Tipo utilizado por ClusterManagers para controlar los trabajadores añadidos a sus clústeres. Algunos campos son utilizados por todos los administradores de clúster para acceder a un host:

  • io – la conexión utilizada para acceder al trabajador (un subtipo de IO o Nothing)
  • host – la dirección del host (ya sea un String o Nothing)
  • port – el puerto en el host utilizado para conectarse al trabajador (ya sea un Int o Nothing)

Algunos son utilizados por el administrador de clúster para añadir trabajadores a un host ya inicializado:

  • count – el número de trabajadores que se lanzarán en el host
  • exename – la ruta al ejecutable de Julia en el host, por defecto es "$(Sys.BINDIR)/julia" o "$(Sys.BINDIR)/julia-debug"
  • exeflags – banderas a utilizar al lanzar Julia de forma remota

El campo userdata se utiliza para almacenar información para cada trabajador por administradores externos.

Algunos campos son utilizados por SSHManager y administradores similares:

  • tunneltrue (usar túneles), false (no usar túneles), o nothing (usar el valor por defecto para el administrador)
  • multiplextrue (usar multiplexión SSH para túneles) o false
  • forward – la opción de reenvío utilizada para la opción -L de ssh
  • bind_addr – la dirección en el host remoto a la que enlazar
  • sshflags – banderas a utilizar al establecer la conexión SSH
  • max_parallel – el número máximo de trabajadores a los que conectarse en paralelo en el host

Algunos campos son utilizados tanto por LocalManagers como por SSHManagers:

  • connect_at – determina si esta es una llamada de configuración de trabajador a trabajador o de controlador a trabajador
  • process – el proceso que se conectará (generalmente el administrador asignará esto durante addprocs)
  • ospid – el ID del proceso según el sistema operativo del host, utilizado para interrumpir procesos de trabajadores
  • environ – diccionario privado utilizado para almacenar información temporal por administradores Local/SSH
  • ident – trabajador tal como es identificado por el ClusterManager
  • connect_idents – lista de IDs de trabajadores a los que el trabajador debe conectarse si se utiliza una topología personalizada
  • enable_threaded_blastrue, false, o nothing, si se debe usar BLAS en hilos o no en los trabajadores
source
Distributed.launchFunction
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Implementado por administradores de clúster. Para cada trabajador de Julia lanzado por esta función, debe agregar una entrada WorkerConfig a launched y notificar a launch_ntfy. La función DEBE salir una vez que todos los trabajadores, solicitados por manager, hayan sido lanzados. params es un diccionario de todos los argumentos de palabra clave con los que se llamó a addprocs.

source
Distributed.manageFunction
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Implementado por los administradores de clúster. Se llama en el proceso maestro, durante la vida útil de un trabajador, con los valores op apropiados:

  • con :register/:deregister cuando un trabajador es agregado / eliminado del grupo de trabajadores de Julia.
  • con :interrupt cuando se llama a interrupt(workers). El ClusterManager debe señalizar al trabajador apropiado con una señal de interrupción.
  • con :finalize para propósitos de limpieza.
source
Base.killMethod
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Implementado por los administradores de clúster. Se llama en el proceso maestro, por rmprocs. Debería hacer que el trabajador remoto especificado por pid salga. kill(manager::ClusterManager.....) ejecuta un exit() remoto en pid.

source
Sockets.connectMethod
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Implementado por administradores de clúster utilizando transportes personalizados. Debe establecer una conexión lógica con el trabajador con id pid, especificado por config y devolver un par de objetos IO. Los mensajes de pid al proceso actual se leerán de instrm, mientras que los mensajes que se enviarán a pid se escribirán en outstrm. La implementación del transporte personalizado debe garantizar que los mensajes se entreguen y reciban completamente y en orden. connect(manager::ClusterManager.....) establece conexiones de socket TCP/IP entre los trabajadores.

source
Distributed.init_workerFunction
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Llamado por administradores de clúster que implementan transportes personalizados. Inicializa un proceso recién lanzado como un trabajador. El argumento de línea de comandos --worker[=<cookie>] tiene el efecto de inicializar un proceso como un trabajador utilizando sockets TCP/IP para el transporte. cookie es un cluster_cookie.

source
Distributed.start_workerFunction
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker es una función interna que es el punto de entrada predeterminado para los procesos de trabajo que se conectan a través de TCP/IP. Configura el proceso como un trabajador del clúster de Julia.

La información de host:puerto se escribe en el flujo out (por defecto en stdout).

La función lee la cookie de stdin si es necesario, y escucha en un puerto libre (o si se especifica, el puerto en la opción de línea de comandos --bind-to) y programa tareas para procesar conexiones y solicitudes TCP entrantes. También (opcionalmente) cierra stdin y redirige stderr a stdout.

No devuelve.

source
Distributed.process_messagesFunction
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Llamado por los administradores de clúster utilizando transportes personalizados. Debe ser llamado cuando la implementación del transporte personalizado recibe el primer mensaje de un trabajador remoto. El transporte personalizado debe gestionar una conexión lógica con el trabajador remoto y proporcionar dos objetos IO, uno para los mensajes entrantes y el otro para los mensajes dirigidos al trabajador remoto. Si incoming es true, el par remoto inició la conexión. Cualquiera de los dos que inicie la conexión envía la cookie del clúster y su número de versión de Julia para realizar el apretón de manos de autenticación.

Véase también cluster_cookie.

source
Distributed.default_addprocs_paramsFunction
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

Implementado por administradores de clúster. Los parámetros de palabra clave predeterminados que se pasan al llamar a addprocs(mgr). El conjunto mínimo de opciones está disponible al llamar a default_addprocs_params().

source