Tasks
Core.Task
— TypeTask(func)
Crea un Task
(es decir, una coroutine) para ejecutar la función dada func
(que debe ser invocable sin argumentos). La tarea finaliza cuando esta función retorna. La tarea se ejecutará en la "edad del mundo" del padre en el momento de la construcción cuando se schedule
d.
Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky
. Esto modela el comportamiento predeterminado histórico para @async
. Las tareas sticky solo pueden ejecutarse en el hilo de trabajo en el que fueron programadas por primera vez, y al ser programadas harán que la tarea desde la cual fueron programadas sea sticky. Para obtener el comportamiento de Threads.@spawn
, establece el bit sticky manualmente en false
.
Ejemplos
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
En este ejemplo, b
es un Task
ejecutable que aún no ha comenzado.
Base.@task
— Macro@task
Envuelve una expresión en una Task
sin ejecutarla, y devuelve la Task
. Esto solo crea una tarea y no la ejecuta.
Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky
. Esto modela el comportamiento predeterminado histórico para @async
. Las tareas sticky solo se pueden ejecutar en el hilo de trabajo en el que se programaron por primera vez, y al programarse harán que la tarea desde la cual se programaron sea sticky. Para obtener el comportamiento de Threads.@spawn
, establece el bit sticky manualmente en false
.
Ejemplos
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
Envuelve una expresión en una Task
y añádela a la cola del programador de la máquina local.
Los valores se pueden interpolar en @async
a través de $
, que copia el valor directamente en el cierre subyacente construido. Esto te permite insertar el valor de una variable, aislando el código asíncrono de los cambios en el valor de la variable en la tarea actual.
Se recomienda encarecidamente favorecer Threads.@spawn
sobre @async
siempre incluso cuando no se requiera paralelismo, especialmente en bibliotecas distribuidas públicamente. Esto se debe a que el uso de @async
desactiva la migración de la tarea padre a través de hilos de trabajo en la implementación actual de Julia. Por lo tanto, el uso aparentemente inocente de @async
en una función de biblioteca puede tener un gran impacto en el rendimiento de partes muy diferentes de las aplicaciones de los usuarios.
La interpolación de valores a través de $
está disponible desde Julia 1.4.
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
Utiliza múltiples tareas concurrentes para mapear f
sobre una colección (o múltiples colecciones de igual longitud). Para múltiples argumentos de colección, f
se aplica elemento por elemento.
ntasks
especifica el número de tareas que se ejecutarán de forma concurrente. Dependiendo de la longitud de las colecciones, si ntasks
no está especificado, se utilizarán hasta 100 tareas para el mapeo concurrente.
ntasks
también se puede especificar como una función sin argumentos. En este caso, el número de tareas que se ejecutarán en paralelo se verifica antes de procesar cada elemento y se inicia una nueva tarea si el valor de ntasks_func
es mayor que el número actual de tareas.
Si se especifica batch_size
, la colección se procesa en modo por lotes. f
debe ser una función que acepte un Vector
de tuplas de argumentos y debe devolver un vector de resultados. El vector de entrada tendrá una longitud de batch_size
o menos.
Los siguientes ejemplos destacan la ejecución en diferentes tareas al devolver el objectid
de las tareas en las que se ejecuta la función de mapeo.
Primero, con ntasks
indefinido, cada elemento se procesa en una tarea diferente.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
Con ntasks=2
, todos los elementos se procesan en 2 tareas.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
Con batch_size
definido, la función de mapeo necesita ser cambiada para aceptar un arreglo de tuplas de argumentos y devolver un arreglo de resultados. map
se utiliza en la función de mapeo modificada para lograr esto.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
Como asyncmap
, pero almacena la salida en results
en lugar de devolver una colección.
El comportamiento puede ser inesperado cuando cualquier argumento mutado comparte memoria con cualquier otro argumento.
Base.current_task
— Functioncurrent_task()
Obtén la Task
que se está ejecutando actualmente.
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
Determina si una tarea ha salido.
Ejemplos
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
Determina si una tarea ha comenzado a ejecutarse.
Ejemplos
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
Determina si una tarea ha salido porque se lanzó una excepción.
Ejemplos
julia> a4() = error("tarea fallida");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Esta función requiere al menos Julia 1.3.
Base.task_local_storage
— Methodtask_local_storage(key)
Busca el valor de una clave en el almacenamiento local de tareas de la tarea actual.
Base.task_local_storage
— Methodtask_local_storage(key, value)
Asigna un valor a una clave en el almacenamiento local de tareas de la tarea actual.
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
Llama a la función body
con un almacenamiento local de tareas modificado, en el cual value
se asigna a key
; el valor anterior de key
, o la falta del mismo, se restaura después. Útil para emular el alcance dinámico.
Scheduling
Base.yield
— Functionyield()
Cambia al programador para permitir que otra tarea programada se ejecute. Una tarea que llama a esta función sigue siendo ejecutable y se reiniciará inmediatamente si no hay otras tareas ejecutables.
yield(t::Task, arg = nothing)
Una versión rápida y de programación injusta de schedule(t, arg); yield()
que cede inmediatamente a t
antes de llamar al programador.
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
Cambia a la tarea dada. La primera vez que se cambia a una tarea, la función de la tarea se llama sin argumentos. En cambios posteriores, arg
se devuelve de la última llamada de la tarea a yieldto
. Esta es una llamada de bajo nivel que solo cambia tareas, sin considerar estados o programación de ninguna manera. Su uso no se recomienda.
Base.sleep
— Functionsleep(seconds)
Bloquea la tarea actual durante un número especificado de segundos. El tiempo mínimo de espera es de 1 milisegundo o una entrada de 0.001
.
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
Agrega un Task
a la cola del programador. Esto hace que la tarea se ejecute constantemente cuando el sistema está inactivo, a menos que la tarea realice una operación de bloqueo como wait
.
Si se proporciona un segundo argumento val
, se pasará a la tarea (a través del valor de retorno de yieldto
) cuando se ejecute nuevamente. Si error
es true
, el valor se levantará como una excepción en la tarea despertada.
Es incorrecto usar schedule
en un Task
arbitrario que ya ha sido iniciado. Consulta la referencia de la API para más información.
Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky
. Esto modela el comportamiento predeterminado histórico para @async
. Las tareas sticky solo se pueden ejecutar en el hilo de trabajo en el que se programaron por primera vez, y al programarse harán que la tarea desde la que se programaron sea sticky. Para obtener el comportamiento de Threads.@spawn
, establece el bit sticky manualmente en false
.
Ejemplos
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
Imprime un registro de errores en stderr
si la tarea t
falla.
Ejemplos
julia> Base._wait(errormonitor(Threads.@spawn error("la tarea falló")))
Unhandled Task ERROR: la tarea falló
Stacktrace:
[...]
Base.@sync
— Macro@sync
Espera hasta que se completen todos los usos encerrados léxicamente de @async
, @spawn
, Distributed.@spawnat
y Distributed.@distributed
. Todas las excepciones lanzadas por las operaciones asíncronas encerradas se recopilan y se lanzan como una CompositeException
.
Ejemplos
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— FunctionNota especial para Threads.Condition
:
El llamador debe estar sosteniendo el lock
que posee un Threads.Condition
antes de llamar a este método. La tarea que llama estará bloqueada hasta que otra tarea la despierte, generalmente llamando a notify
en el mismo objeto Threads.Condition
. El lock se liberará de forma atómica al bloquearse (incluso si estaba bloqueado recursivamente) y se volverá a adquirir antes de regresar.
wait(r::Future)
Espera a que un valor esté disponible para el Future
especificado.
wait(r::RemoteChannel, args...)
Espera a que un valor esté disponible en el RemoteChannel
especificado.
wait([x])
Bloquea la tarea actual hasta que ocurra algún evento, dependiendo del tipo del argumento:
Channel
: Espera a que se agregue un valor al canal.Condition
: Espera a que se llame anotify
en una condición y devuelve el parámetroval
pasado anotify
. Esperar en una condición permite además pasarfirst=true
, lo que resulta en que el que espera se coloque primero en la fila para despertarse ennotify
en lugar del comportamiento habitual de primero en entrar, primero en salir.Process
: Espera a que un proceso o cadena de procesos termine. El campoexitcode
de un proceso se puede usar para determinar el éxito o el fracaso.Task
: Espera a que unaTask
termine. Si la tarea falla con una excepción, se lanza unaTaskFailedException
(que envuelve la tarea fallida).RawFD
: Espera cambios en un descriptor de archivo (ver el paqueteFileWatching
).
Si no se pasa ningún argumento, la tarea se bloquea por un período indefinido. Una tarea solo se puede reiniciar mediante una llamada explícita a schedule
o yieldto
.
A menudo, wait
se llama dentro de un bucle while
para asegurar que se cumpla una condición esperada antes de continuar.
wait(c::Channel)
Bloquea hasta que el Channel
isready
.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # la tarea está bloqueada porque el canal no está listo
false
julia> put!(c, 1);
julia> istaskdone(task) # la tarea ahora está desbloqueada
true
Base.fetch
— Methodfetch(t::Task)
Espera a que una Task
termine, luego devuelve su valor de resultado. Si la tarea falla con una excepción, se lanza una TaskFailedException
(que envuelve la tarea fallida).
Base.fetch
— Methodfetch(x::Any)
Devuelve x
.
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
Espera hasta que testcb()
devuelva true
o hayan pasado timeout
segundos, lo que ocurra primero. La función de prueba se consulta cada pollint
segundos. El valor mínimo para pollint
es 0.001 segundos, es decir, 1 milisegundo.
Devuelve :ok
o :timed_out
.
Ejemplos
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
Crea una fuente de eventos activada por bordes que las tareas pueden esperar. Las tareas que llaman a wait
en un Condition
son suspendidas y encoladas. Las tareas se despiertan cuando se llama a notify
más tarde en el Condition
. Esperar en una condición puede devolver un valor o generar un error si se utilizan los argumentos opcionales de notify
. La activación por bordes significa que solo las tareas que están esperando en el momento en que se llama a notify
pueden ser despertadas. Para notificaciones activadas por niveles, debes mantener un estado adicional para hacer un seguimiento de si ha ocurrido una notificación. Los tipos Channel
y Threads.Event
hacen esto y se pueden usar para eventos activados por niveles.
Este objeto NO es seguro para hilos. Consulta Threads.Condition
para una versión segura para hilos.
Base.Threads.Condition
— TypeThreads.Condition([lock])
Una versión segura para hilos de Base.Condition
.
Para llamar a wait
o notify
en un Threads.Condition
, primero debes llamar a lock
en él. Cuando se llama a wait
, el bloqueo se libera atómicamente durante el bloqueo y se volverá a adquirir antes de que wait
regrese. Por lo tanto, el uso idiomático de un Threads.Condition
c
se ve como sigue:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
Esta funcionalidad requiere al menos Julia 1.2.
Base.Event
— TypeEvent([autoreset=false])
Crea una fuente de eventos activada por nivel. Las tareas que llaman a wait
en un Event
son suspendidas y encoladas hasta que se llame a notify
en el Event
. Después de que se llama a notify
, el Event
permanece en un estado señalizado y las tareas ya no se bloquearán al esperar por él, hasta que se llame a reset
.
Si autoreset
es verdadero, como máximo una tarea será liberada de wait
por cada llamada a notify
.
Esto proporciona un orden de memoria de adquisición y liberación en notify/wait.
Esta funcionalidad requiere al menos Julia 1.1.
La funcionalidad de autoreset
y la garantía de orden de memoria requieren al menos Julia 1.8.
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
Despierta las tareas que están esperando una condición, pasándoles val
. Si all
es true
(el valor predeterminado), se despiertan todas las tareas en espera; de lo contrario, solo se despierta una. Si error
es true
, el valor pasado se eleva como una excepción en las tareas despertadas.
Devuelve el conteo de tareas despertadas. Devuelve 0 si no hay tareas esperando en condition
.
Base.reset
— Methodreset(::Event)
Restablece un Event
a un estado no establecido. Luego, cualquier llamada futura a wait
bloqueará hasta que se llame a notify
nuevamente.
Base.Semaphore
— TypeSemaphore(sem_size)
Crea un semáforo de conteo que permite como máximo sem_size
adquisiciones en uso en cualquier momento. Cada adquisición debe ser emparejada con una liberación.
Esto proporciona un ordenamiento de memoria de adquisición y liberación en las llamadas de adquisición/liberación.
Base.acquire
— Functionacquire(s::Semaphore)
Espera a que uno de los permisos sem_size
esté disponible, bloqueando hasta que se pueda adquirir uno.
acquire(f, s::Semaphore)
Ejecuta f
después de adquirir del Semáforo s
, y release
al completar o en caso de error.
Por ejemplo, una forma de bloque do que asegura que solo 2 llamadas a foo
estarán activas al mismo tiempo:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Este método requiere al menos Julia 1.8.
Base.release
— Functionrelease(s::Semaphore)
Devuelve un permiso al grupo, permitiendo posiblemente que otra tarea lo adquiera y reanude la ejecución.
Base.AbstractLock
— TypeAbstractLock
Supertipo abstracto que describe tipos que implementan los primitivos de sincronización: lock
, trylock
, unlock
y islocked
.
Base.lock
— Functionlock(lock)
Adquiere el lock
cuando esté disponible. Si el lock ya está bloqueado por otra tarea/hilo, espera a que esté disponible.
Cada lock
debe ser emparejado con un unlock
.
lock(f::Function, lock)
Adquiere el lock
, ejecuta f
con el lock
mantenido, y libera el lock
cuando f
retorna. Si el lock ya está bloqueado por otra tarea/hilo, espera a que esté disponible.
Cuando esta función retorna, el lock
ha sido liberado, por lo que el llamador no debe intentar unlock
earlo.
Ver también: @lock
.
Usar un Channel
como segundo argumento requiere Julia 1.7 o posterior.
lock(f::Function, l::Lockable)
Adquiere el bloqueo asociado con l
, ejecuta f
con el bloqueo mantenido y libera el bloqueo cuando f
regresa. f
recibirá un argumento posicional: el valor envuelto por l
. Si el bloqueo ya está bloqueado por otra tarea/hilo, espera a que esté disponible. Cuando esta función regresa, el lock
ha sido liberado, por lo que el llamador no debe intentar unlock
arlo.
Requiere al menos Julia 1.11.
Base.unlock
— Functionunlock(lock)
Libera la propiedad del lock
.
Si este es un bloqueo recursivo que se ha adquirido antes, decrementa un contador interno y retorna inmediatamente.
Base.trylock
— Functiontrylock(lock) -> Éxito (Booleano)
Adquiere el bloqueo si está disponible y devuelve true
si tiene éxito. Si el bloqueo ya está bloqueado por otra tarea/hilo, devuelve false
.
Cada trylock
exitoso debe ser emparejado con un unlock
.
La función trylock
combinada con islocked
se puede usar para escribir los algoritmos de prueba-y-prueba-y-establecer o retroceso exponencial si es compatible con el typeof(lock)
(lee su documentación).
Base.islocked
— Functionislocked(lock) -> Estado (Booleano)
Verifica si el lock
está sostenido por alguna tarea/hilo. Esta función por sí sola no debe ser utilizada para sincronización. Sin embargo, islocked
combinado con trylock
puede ser utilizado para escribir los algoritmos de prueba-y-prueba-y-establecer o retroceso exponencial si es soportado por el typeof(lock)
(lee su documentación).
Ayuda extendida
Por ejemplo, un retroceso exponencial puede ser implementado de la siguiente manera si la implementación del lock
satisface las propiedades documentadas a continuación.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Implementación
Se aconseja a una implementación de lock definir islocked
con las siguientes propiedades y anotarlo en su docstring.
islocked(lock)
es libre de condiciones de carrera.- Si
islocked(lock)
devuelvefalse
, una invocación inmediata detrylock(lock)
debe tener éxito (devolvertrue
) si no hay interferencia de otras tareas.
Base.ReentrantLock
— TypeReentrantLock()
Crea un candado reentrante para sincronizar Task
s. La misma tarea puede adquirir el candado tantas veces como sea necesario (esto es lo que significa la parte "Reentrante" del nombre). Cada lock
debe coincidir con un unlock
.
Llamar a lock
también inhibirá la ejecución de finalizadores en ese hilo hasta el correspondiente unlock
. El uso del patrón de bloqueo estándar ilustrado a continuación debería ser naturalmente compatible, pero ten cuidado de invertir el orden de try/lock o de omitir completamente el bloque try (por ejemplo, intentar devolver con el candado aún sostenido):
Esto proporciona un orden de memoria de adquisición/liberación en las llamadas de lock/unlock.
lock(l)
try
<trabajo atómico>
finally
unlock(l)
end
Si !islocked(lck::ReentrantLock)
se cumple, trylock(lck)
tendrá éxito a menos que haya otras tareas intentando mantener el candado "al mismo tiempo."
Base.@lock
— Macro@lock l expr
Versión macro de lock(f, l::AbstractLock)
pero con expr
en lugar de la función f
. Se expande a:
lock(l)
try
expr
finally
unlock(l)
end
Esto es similar a usar lock
con un bloque do
, pero evita crear un cierre y, por lo tanto, puede mejorar el rendimiento.
@lock
se agregó en Julia 1.3 y se exportó en Julia 1.10.
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
Crea un objeto Lockable
que envuelve value
y lo asocia con el lock
proporcionado. Este objeto soporta @lock
, lock
, trylock
, unlock
. Para acceder al valor, indexa el objeto bloqueable mientras sostienes el bloqueo.
Requiere al menos Julia 1.11.
Ejemplo
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # debes sostener el bloqueo para acceder al valor
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
Representación de un canal que pasa objetos del tipo T
.
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
Construye un Channel
con un búfer interno que puede contener un máximo de size
objetos del tipo T
. Las llamadas a put!
en un canal lleno bloquean hasta que se elimine un objeto con take!
.
Channel(0)
construye un canal sin búfer. put!
bloquea hasta que se llama a un take!
correspondiente. Y viceversa.
Otros constructores:
Channel()
: constructor por defecto, equivalente aChannel{Any}(0)
Channel(Inf)
: equivalente aChannel{Any}(typemax(Int))
Channel(sz)
: equivalente aChannel{Any}(sz)
El constructor por defecto Channel()
y el tamaño por defecto size=0
se añadieron en Julia 1.3.
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Crea una nueva tarea a partir de func
, la vincula a un nuevo canal de tipo T
y tamaño size
, y programa la tarea, todo en una sola llamada. El canal se cierra automáticamente cuando la tarea termina.
func
debe aceptar el canal vinculado como su único argumento.
Si necesitas una referencia a la tarea creada, pasa un objeto Ref{Task}
a través del argumento de palabra clave taskref
.
Si spawn=true
, la Task
creada para func
puede ser programada en otro hilo en paralelo, equivalente a crear una tarea a través de Threads.@spawn
.
Si spawn=true
y el argumento threadpool
no está establecido, por defecto es :default
.
Si el argumento threadpool
está establecido (a :default
o :interactive
), esto implica que spawn=true
y la nueva Task se genera en el threadpool especificado.
Devuelve un Channel
.
Ejemplos
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Referenciando la tarea creada:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
El parámetro spawn=
se agregó en Julia 1.3. Este constructor se agregó en Julia 1.3. En versiones anteriores de Julia, Channel utilizaba argumentos de palabra clave para establecer size
y T
, pero esos constructores están en desuso.
El argumento threadpool=
se agregó en Julia 1.9.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
Agrega un elemento v
al canal c
. Bloquea si el canal está lleno.
Para canales no almacenados, bloquea hasta que se realice un take!
por otra tarea.
v
ahora se convierte al tipo del canal con convert
cuando se llama a put!
.
Base.take!
— Methodtake!(c::Channel)
Elimina y devuelve un valor de un Channel
en orden. Bloquea hasta que los datos estén disponibles. Para canales sin búfer, bloquea hasta que se realice un put!
por otra tarea.
Ejemplos
Canal con búfer:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Canal sin búfer:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
Determina si un Channel
tiene un valor almacenado en él. Devuelve inmediatamente, no bloquea.
Para canales no bufferizados, devuelve true
si hay tareas esperando en un put!
.
Ejemplos
Canal bufferizado:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Canal no bufferizado:
julia> c = Channel();
julia> isready(c) # ¡sin tareas esperando para poner!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # programa una tarea de put!
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
Espera y devuelve (sin eliminar) el primer elemento disponible del Channel
. Nota: fetch
no es compatible con un Channel
sin búfer (tamaño 0).
Ejemplos
Canal con búfer:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # el elemento no se elimina
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
Cierra un canal. Se lanza una excepción (opcionalmente dada por excp
) por:
Base.bind
— Methodbind(chnl::Channel, task::Task)
Asocia la duración de chnl
con una tarea. Channel
chnl
se cierra automáticamente cuando la tarea termina. Cualquier excepción no capturada en la tarea se propaga a todos los que están a la espera en chnl
.
El objeto chnl
se puede cerrar explícitamente independientemente de la terminación de la tarea. Las tareas que terminan no tienen efecto en los objetos Channel
que ya están cerrados.
Cuando un canal está vinculado a múltiples tareas, la primera tarea que termine cerrará el canal. Cuando múltiples canales están vinculados a la misma tarea, la terminación de la tarea cerrará todos los canales vinculados.
Ejemplos
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
El uso correcto más fácil de schedule
es en una Tarea
que aún no ha comenzado (programada). Sin embargo, es posible usar 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
y wait
como un bloque de construcción de muy bajo nivel para construir interfaces de sincronización. Una condición previa crucial para llamar a schedule(task)
es que el llamador debe "poseer" la tarea
; es decir, debe saber que la llamada a wait
en la tarea
dada está ocurriendo en las ubicaciones conocidas por el código que llama a schedule(task)
. Una estrategia para asegurar tal condición previa es usar atómicos, como se demuestra en el siguiente ejemplo:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
permite que una tarea espere
la notificación
de otra tarea. Es una interfaz de comunicación limitada ya que wait
solo puede ser utilizado una vez desde una única tarea (nota la asignación no atómica de ev.task
)
En este ejemplo, notify(ev::OneWayEvent)
puede llamar a schedule(ev.task)
si y solo si modifica el estado de OWE_WAITING
a OWE_NOTIFYING
. Esto nos permite saber que la tarea que ejecuta wait(ev::OneWayEvent)
está ahora en la rama ok
y que no puede haber otras tareas que intenten schedule(ev.task)
ya que su @atomicreplace(ev.state, state => OWE_NOTIFYING)
fallará.