Tasks
Core.Task — TypeTask(func)Crea un Task (es decir, una coroutine) para ejecutar la función dada func (que debe ser invocable sin argumentos). La tarea finaliza cuando esta función retorna. La tarea se ejecutará en la "edad del mundo" del padre en el momento de la construcción cuando se scheduled.
Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky. Esto modela el comportamiento predeterminado histórico para @async. Las tareas sticky solo pueden ejecutarse en el hilo de trabajo en el que fueron programadas por primera vez, y al ser programadas harán que la tarea desde la cual fueron programadas sea sticky. Para obtener el comportamiento de Threads.@spawn, establece el bit sticky manualmente en false.
Ejemplos
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);En este ejemplo, b es un Task ejecutable que aún no ha comenzado.
Base.@task — Macro@taskEnvuelve una expresión en una Task sin ejecutarla, y devuelve la Task. Esto solo crea una tarea y no la ejecuta.
Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky. Esto modela el comportamiento predeterminado histórico para @async. Las tareas sticky solo se pueden ejecutar en el hilo de trabajo en el que se programaron por primera vez, y al programarse harán que la tarea desde la cual se programaron sea sticky. Para obtener el comportamiento de Threads.@spawn, establece el bit sticky manualmente en false.
Ejemplos
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.@async — Macro@asyncEnvuelve una expresión en una Task y añádela a la cola del programador de la máquina local.
Los valores se pueden interpolar en @async a través de $, que copia el valor directamente en el cierre subyacente construido. Esto te permite insertar el valor de una variable, aislando el código asíncrono de los cambios en el valor de la variable en la tarea actual.
Se recomienda encarecidamente favorecer Threads.@spawn sobre @async siempre incluso cuando no se requiera paralelismo, especialmente en bibliotecas distribuidas públicamente. Esto se debe a que el uso de @async desactiva la migración de la tarea padre a través de hilos de trabajo en la implementación actual de Julia. Por lo tanto, el uso aparentemente inocente de @async en una función de biblioteca puede tener un gran impacto en el rendimiento de partes muy diferentes de las aplicaciones de los usuarios.
La interpolación de valores a través de $ está disponible desde Julia 1.4.
Base.asyncmap — Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)Utiliza múltiples tareas concurrentes para mapear f sobre una colección (o múltiples colecciones de igual longitud). Para múltiples argumentos de colección, f se aplica elemento por elemento.
ntasks especifica el número de tareas que se ejecutarán de forma concurrente. Dependiendo de la longitud de las colecciones, si ntasks no está especificado, se utilizarán hasta 100 tareas para el mapeo concurrente.
ntasks también se puede especificar como una función sin argumentos. En este caso, el número de tareas que se ejecutarán en paralelo se verifica antes de procesar cada elemento y se inicia una nueva tarea si el valor de ntasks_func es mayor que el número actual de tareas.
Si se especifica batch_size, la colección se procesa en modo por lotes. f debe ser una función que acepte un Vector de tuplas de argumentos y debe devolver un vector de resultados. El vector de entrada tendrá una longitud de batch_size o menos.
Los siguientes ejemplos destacan la ejecución en diferentes tareas al devolver el objectid de las tareas en las que se ejecuta la función de mapeo.
Primero, con ntasks indefinido, cada elemento se procesa en una tarea diferente.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5Con ntasks=2, todos los elementos se procesan en 2 tareas.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2Con batch_size definido, la función de mapeo necesita ser cambiada para aceptar un arreglo de tuplas de argumentos y devolver un arreglo de resultados. map se utiliza en la función de mapeo modificada para lograr esto.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"Base.asyncmap! — Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)Como asyncmap, pero almacena la salida en results en lugar de devolver una colección.
El comportamiento puede ser inesperado cuando cualquier argumento mutado comparte memoria con cualquier otro argumento.
Base.current_task — Functioncurrent_task()Obtén la Task que se está ejecutando actualmente.
Base.istaskdone — Functionistaskdone(t::Task) -> BoolDetermina si una tarea ha salido.
Ejemplos
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.istaskstarted — Functionistaskstarted(t::Task) -> BoolDetermina si una tarea ha comenzado a ejecutarse.
Ejemplos
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
falseBase.istaskfailed — Functionistaskfailed(t::Task) -> BoolDetermina si una tarea ha salido porque se lanzó una excepción.
Ejemplos
julia> a4() = error("tarea fallida");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
trueEsta función requiere al menos Julia 1.3.
Base.task_local_storage — Methodtask_local_storage(key)Busca el valor de una clave en el almacenamiento local de tareas de la tarea actual.
Base.task_local_storage — Methodtask_local_storage(key, value)Asigna un valor a una clave en el almacenamiento local de tareas de la tarea actual.
Base.task_local_storage — Methodtask_local_storage(body, key, value)Llama a la función body con un almacenamiento local de tareas modificado, en el cual value se asigna a key; el valor anterior de key, o la falta del mismo, se restaura después. Útil para emular el alcance dinámico.
Scheduling
Base.yield — Functionyield()Cambia al programador para permitir que otra tarea programada se ejecute. Una tarea que llama a esta función sigue siendo ejecutable y se reiniciará inmediatamente si no hay otras tareas ejecutables.
yield(t::Task, arg = nothing)Una versión rápida y de programación injusta de schedule(t, arg); yield() que cede inmediatamente a t antes de llamar al programador.
Base.yieldto — Functionyieldto(t::Task, arg = nothing)Cambia a la tarea dada. La primera vez que se cambia a una tarea, la función de la tarea se llama sin argumentos. En cambios posteriores, arg se devuelve de la última llamada de la tarea a yieldto. Esta es una llamada de bajo nivel que solo cambia tareas, sin considerar estados o programación de ninguna manera. Su uso no se recomienda.
Base.sleep — Functionsleep(seconds)Bloquea la tarea actual durante un número especificado de segundos. El tiempo mínimo de espera es de 1 milisegundo o una entrada de 0.001.
Base.schedule — Functionschedule(t::Task, [val]; error=false)Agrega un Task a la cola del programador. Esto hace que la tarea se ejecute constantemente cuando el sistema está inactivo, a menos que la tarea realice una operación de bloqueo como wait.
Si se proporciona un segundo argumento val, se pasará a la tarea (a través del valor de retorno de yieldto) cuando se ejecute nuevamente. Si error es true, el valor se levantará como una excepción en la tarea despertada.
Es incorrecto usar schedule en un Task arbitrario que ya ha sido iniciado. Consulta la referencia de la API para más información.
Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky. Esto modela el comportamiento predeterminado histórico para @async. Las tareas sticky solo se pueden ejecutar en el hilo de trabajo en el que se programaron por primera vez, y al programarse harán que la tarea desde la que se programaron sea sticky. Para obtener el comportamiento de Threads.@spawn, establece el bit sticky manualmente en false.
Ejemplos
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
trueSynchronization
Base.errormonitor — Functionerrormonitor(t::Task)Imprime un registro de errores en stderr si la tarea t falla.
Ejemplos
julia> Base._wait(errormonitor(Threads.@spawn error("la tarea falló")))
Unhandled Task ERROR: la tarea falló
Stacktrace:
[...]Base.@sync — Macro@syncEspera hasta que se completen todos los usos encerrados léxicamente de @async, @spawn, Distributed.@spawnat y Distributed.@distributed. Todas las excepciones lanzadas por las operaciones asíncronas encerradas se recopilan y se lanzan como una CompositeException.
Ejemplos
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2Base.wait — FunctionNota especial para Threads.Condition:
El llamador debe estar sosteniendo el lock que posee un Threads.Condition antes de llamar a este método. La tarea que llama estará bloqueada hasta que otra tarea la despierte, generalmente llamando a notify en el mismo objeto Threads.Condition. El lock se liberará de forma atómica al bloquearse (incluso si estaba bloqueado recursivamente) y se volverá a adquirir antes de regresar.
wait(r::Future)Espera a que un valor esté disponible para el Future especificado.
wait(r::RemoteChannel, args...)Espera a que un valor esté disponible en el RemoteChannel especificado.
wait([x])Bloquea la tarea actual hasta que ocurra algún evento, dependiendo del tipo del argumento:
Channel: Espera a que se agregue un valor al canal.Condition: Espera a que se llame anotifyen una condición y devuelve el parámetrovalpasado anotify. Esperar en una condición permite además pasarfirst=true, lo que resulta en que el que espera se coloque primero en la fila para despertarse ennotifyen lugar del comportamiento habitual de primero en entrar, primero en salir.Process: Espera a que un proceso o cadena de procesos termine. El campoexitcodede un proceso se puede usar para determinar el éxito o el fracaso.Task: Espera a que unaTasktermine. Si la tarea falla con una excepción, se lanza unaTaskFailedException(que envuelve la tarea fallida).RawFD: Espera cambios en un descriptor de archivo (ver el paqueteFileWatching).
Si no se pasa ningún argumento, la tarea se bloquea por un período indefinido. Una tarea solo se puede reiniciar mediante una llamada explícita a schedule o yieldto.
A menudo, wait se llama dentro de un bucle while para asegurar que se cumpla una condición esperada antes de continuar.
wait(c::Channel)Bloquea hasta que el Channel isready.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # la tarea está bloqueada porque el canal no está listo
false
julia> put!(c, 1);
julia> istaskdone(task) # la tarea ahora está desbloqueada
trueBase.fetch — Methodfetch(t::Task)Espera a que una Task termine, luego devuelve su valor de resultado. Si la tarea falla con una excepción, se lanza una TaskFailedException (que envuelve la tarea fallida).
Base.fetch — Methodfetch(x::Any)Devuelve x.
Base.timedwait — Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)Espera hasta que testcb() devuelva true o hayan pasado timeout segundos, lo que ocurra primero. La función de prueba se consulta cada pollint segundos. El valor mínimo para pollint es 0.001 segundos, es decir, 1 milisegundo.
Devuelve :ok o :timed_out.
Ejemplos
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:okBase.Condition — TypeCondition()Crea una fuente de eventos activada por bordes que las tareas pueden esperar. Las tareas que llaman a wait en un Condition son suspendidas y encoladas. Las tareas se despiertan cuando se llama a notify más tarde en el Condition. Esperar en una condición puede devolver un valor o generar un error si se utilizan los argumentos opcionales de notify. La activación por bordes significa que solo las tareas que están esperando en el momento en que se llama a notify pueden ser despertadas. Para notificaciones activadas por niveles, debes mantener un estado adicional para hacer un seguimiento de si ha ocurrido una notificación. Los tipos Channel y Threads.Event hacen esto y se pueden usar para eventos activados por niveles.
Este objeto NO es seguro para hilos. Consulta Threads.Condition para una versión segura para hilos.
Base.Threads.Condition — TypeThreads.Condition([lock])Una versión segura para hilos de Base.Condition.
Para llamar a wait o notify en un Threads.Condition, primero debes llamar a lock en él. Cuando se llama a wait, el bloqueo se libera atómicamente durante el bloqueo y se volverá a adquirir antes de que wait regrese. Por lo tanto, el uso idiomático de un Threads.Condition c se ve como sigue:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
endEsta funcionalidad requiere al menos Julia 1.2.
Base.Event — TypeEvent([autoreset=false])Crea una fuente de eventos activada por nivel. Las tareas que llaman a wait en un Event son suspendidas y encoladas hasta que se llame a notify en el Event. Después de que se llama a notify, el Event permanece en un estado señalizado y las tareas ya no se bloquearán al esperar por él, hasta que se llame a reset.
Si autoreset es verdadero, como máximo una tarea será liberada de wait por cada llamada a notify.
Esto proporciona un orden de memoria de adquisición y liberación en notify/wait.
Esta funcionalidad requiere al menos Julia 1.1.
La funcionalidad de autoreset y la garantía de orden de memoria requieren al menos Julia 1.8.
Base.notify — Functionnotify(condition, val=nothing; all=true, error=false)Despierta las tareas que están esperando una condición, pasándoles val. Si all es true (el valor predeterminado), se despiertan todas las tareas en espera; de lo contrario, solo se despierta una. Si error es true, el valor pasado se eleva como una excepción en las tareas despertadas.
Devuelve el conteo de tareas despertadas. Devuelve 0 si no hay tareas esperando en condition.
Base.reset — Methodreset(::Event)Restablece un Event a un estado no establecido. Luego, cualquier llamada futura a wait bloqueará hasta que se llame a notify nuevamente.
Base.Semaphore — TypeSemaphore(sem_size)Crea un semáforo de conteo que permite como máximo sem_size adquisiciones en uso en cualquier momento. Cada adquisición debe ser emparejada con una liberación.
Esto proporciona un ordenamiento de memoria de adquisición y liberación en las llamadas de adquisición/liberación.
Base.acquire — Functionacquire(s::Semaphore)Espera a que uno de los permisos sem_size esté disponible, bloqueando hasta que se pueda adquirir uno.
acquire(f, s::Semaphore)Ejecuta f después de adquirir del Semáforo s, y release al completar o en caso de error.
Por ejemplo, una forma de bloque do que asegura que solo 2 llamadas a foo estarán activas al mismo tiempo:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
endEste método requiere al menos Julia 1.8.
Base.release — Functionrelease(s::Semaphore)Devuelve un permiso al grupo, permitiendo posiblemente que otra tarea lo adquiera y reanude la ejecución.
Base.AbstractLock — TypeAbstractLockSupertipo abstracto que describe tipos que implementan los primitivos de sincronización: lock, trylock, unlock y islocked.
Base.lock — Functionlock(lock)Adquiere el lock cuando esté disponible. Si el lock ya está bloqueado por otra tarea/hilo, espera a que esté disponible.
Cada lock debe ser emparejado con un unlock.
lock(f::Function, lock)Adquiere el lock, ejecuta f con el lock mantenido, y libera el lock cuando f retorna. Si el lock ya está bloqueado por otra tarea/hilo, espera a que esté disponible.
Cuando esta función retorna, el lock ha sido liberado, por lo que el llamador no debe intentar unlockearlo.
Ver también: @lock.
Usar un Channel como segundo argumento requiere Julia 1.7 o posterior.
lock(f::Function, l::Lockable)
Adquiere el bloqueo asociado con l, ejecuta f con el bloqueo mantenido y libera el bloqueo cuando f regresa. f recibirá un argumento posicional: el valor envuelto por l. Si el bloqueo ya está bloqueado por otra tarea/hilo, espera a que esté disponible. Cuando esta función regresa, el lock ha sido liberado, por lo que el llamador no debe intentar unlockarlo.
Requiere al menos Julia 1.11.
Base.unlock — Functionunlock(lock)Libera la propiedad del lock.
Si este es un bloqueo recursivo que se ha adquirido antes, decrementa un contador interno y retorna inmediatamente.
Base.trylock — Functiontrylock(lock) -> Éxito (Booleano)Adquiere el bloqueo si está disponible y devuelve true si tiene éxito. Si el bloqueo ya está bloqueado por otra tarea/hilo, devuelve false.
Cada trylock exitoso debe ser emparejado con un unlock.
La función trylock combinada con islocked se puede usar para escribir los algoritmos de prueba-y-prueba-y-establecer o retroceso exponencial si es compatible con el typeof(lock) (lee su documentación).
Base.islocked — Functionislocked(lock) -> Estado (Booleano)Verifica si el lock está sostenido por alguna tarea/hilo. Esta función por sí sola no debe ser utilizada para sincronización. Sin embargo, islocked combinado con trylock puede ser utilizado para escribir los algoritmos de prueba-y-prueba-y-establecer o retroceso exponencial si es soportado por el typeof(lock) (lee su documentación).
Ayuda extendida
Por ejemplo, un retroceso exponencial puede ser implementado de la siguiente manera si la implementación del lock satisface las propiedades documentadas a continuación.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
endImplementación
Se aconseja a una implementación de lock definir islocked con las siguientes propiedades y anotarlo en su docstring.
islocked(lock)es libre de condiciones de carrera.- Si
islocked(lock)devuelvefalse, una invocación inmediata detrylock(lock)debe tener éxito (devolvertrue) si no hay interferencia de otras tareas.
Base.ReentrantLock — TypeReentrantLock()Crea un candado reentrante para sincronizar Tasks. La misma tarea puede adquirir el candado tantas veces como sea necesario (esto es lo que significa la parte "Reentrante" del nombre). Cada lock debe coincidir con un unlock.
Llamar a lock también inhibirá la ejecución de finalizadores en ese hilo hasta el correspondiente unlock. El uso del patrón de bloqueo estándar ilustrado a continuación debería ser naturalmente compatible, pero ten cuidado de invertir el orden de try/lock o de omitir completamente el bloque try (por ejemplo, intentar devolver con el candado aún sostenido):
Esto proporciona un orden de memoria de adquisición/liberación en las llamadas de lock/unlock.
lock(l)
try
<trabajo atómico>
finally
unlock(l)
endSi !islocked(lck::ReentrantLock) se cumple, trylock(lck) tendrá éxito a menos que haya otras tareas intentando mantener el candado "al mismo tiempo."
Base.@lock — Macro@lock l exprVersión macro de lock(f, l::AbstractLock) pero con expr en lugar de la función f. Se expande a:
lock(l)
try
expr
finally
unlock(l)
endEsto es similar a usar lock con un bloque do, pero evita crear un cierre y, por lo tanto, puede mejorar el rendimiento.
@lock se agregó en Julia 1.3 y se exportó en Julia 1.10.
Base.Lockable — TypeLockable(value, lock = ReentrantLock())
Crea un objeto Lockable que envuelve value y lo asocia con el lock proporcionado. Este objeto soporta @lock, lock, trylock, unlock. Para acceder al valor, indexa el objeto bloqueable mientras sostienes el bloqueo.
Requiere al menos Julia 1.11.
Ejemplo
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # debes sostener el bloqueo para acceder al valor
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"Channels
Base.AbstractChannel — TypeAbstractChannel{T}Representación de un canal que pasa objetos del tipo T.
Base.Channel — TypeChannel{T=Any}(size::Int=0)Construye un Channel con un búfer interno que puede contener un máximo de size objetos del tipo T. Las llamadas a put! en un canal lleno bloquean hasta que se elimine un objeto con take!.
Channel(0) construye un canal sin búfer. put! bloquea hasta que se llama a un take! correspondiente. Y viceversa.
Otros constructores:
Channel(): constructor por defecto, equivalente aChannel{Any}(0)Channel(Inf): equivalente aChannel{Any}(typemax(Int))Channel(sz): equivalente aChannel{Any}(sz)
El constructor por defecto Channel() y el tamaño por defecto size=0 se añadieron en Julia 1.3.
Base.Channel — MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)Crea una nueva tarea a partir de func, la vincula a un nuevo canal de tipo T y tamaño size, y programa la tarea, todo en una sola llamada. El canal se cierra automáticamente cuando la tarea termina.
func debe aceptar el canal vinculado como su único argumento.
Si necesitas una referencia a la tarea creada, pasa un objeto Ref{Task} a través del argumento de palabra clave taskref.
Si spawn=true, la Task creada para func puede ser programada en otro hilo en paralelo, equivalente a crear una tarea a través de Threads.@spawn.
Si spawn=true y el argumento threadpool no está establecido, por defecto es :default.
Si el argumento threadpool está establecido (a :default o :interactive), esto implica que spawn=true y la nueva Task se genera en el threadpool especificado.
Devuelve un Channel.
Ejemplos
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4Referenciando la tarea creada:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
trueEl parámetro spawn= se agregó en Julia 1.3. Este constructor se agregó en Julia 1.3. En versiones anteriores de Julia, Channel utilizaba argumentos de palabra clave para establecer size y T, pero esos constructores están en desuso.
El argumento threadpool= se agregó en Julia 1.9.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"Base.put! — Methodput!(c::Channel, v)Agrega un elemento v al canal c. Bloquea si el canal está lleno.
Para canales no almacenados, bloquea hasta que se realice un take! por otra tarea.
v ahora se convierte al tipo del canal con convert cuando se llama a put!.
Base.take! — Methodtake!(c::Channel)Elimina y devuelve un valor de un Channel en orden. Bloquea hasta que los datos estén disponibles. Para canales sin búfer, bloquea hasta que se realice un put! por otra tarea.
Ejemplos
Canal con búfer:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1Canal sin búfer:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1Base.isready — Methodisready(c::Channel)Determina si un Channel tiene un valor almacenado en él. Devuelve inmediatamente, no bloquea.
Para canales no bufferizados, devuelve true si hay tareas esperando en un put!.
Ejemplos
Canal bufferizado:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
trueCanal no bufferizado:
julia> c = Channel();
julia> isready(c) # ¡sin tareas esperando para poner!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # programa una tarea de put!
julia> isready(c)
trueBase.fetch — Methodfetch(c::Channel)Espera y devuelve (sin eliminar) el primer elemento disponible del Channel. Nota: fetch no es compatible con un Channel sin búfer (tamaño 0).
Ejemplos
Canal con búfer:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # el elemento no se elimina
3-element Vector{Any}:
1
2
3Base.close — Methodclose(c::Channel[, excp::Exception])Cierra un canal. Se lanza una excepción (opcionalmente dada por excp) por:
Base.bind — Methodbind(chnl::Channel, task::Task)Asocia la duración de chnl con una tarea. Channel chnl se cierra automáticamente cuando la tarea termina. Cualquier excepción no capturada en la tarea se propaga a todos los que están a la espera en chnl.
El objeto chnl se puede cerrar explícitamente independientemente de la terminación de la tarea. Las tareas que terminan no tienen efecto en los objetos Channel que ya están cerrados.
Cuando un canal está vinculado a múltiples tareas, la primera tarea que termine cerrará el canal. Cuando múltiples canales están vinculados a la misma tarea, la terminación de la tarea cerrará todos los canales vinculados.
Ejemplos
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
falsejulia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]Low-level synchronization using schedule and wait
El uso correcto más fácil de schedule es en una Tarea que aún no ha comenzado (programada). Sin embargo, es posible usar 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 y wait como un bloque de construcción de muy bajo nivel para construir interfaces de sincronización. Una condición previa crucial para llamar a schedule(task) es que el llamador debe "poseer" la tarea; es decir, debe saber que la llamada a wait en la tarea dada está ocurriendo en las ubicaciones conocidas por el código que llama a schedule(task). Una estrategia para asegurar tal condición previa es usar atómicos, como se demuestra en el siguiente ejemplo:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
doneOneWayEvent permite que una tarea espere la notificación de otra tarea. Es una interfaz de comunicación limitada ya que wait solo puede ser utilizado una vez desde una única tarea (nota la asignación no atómica de ev.task)
En este ejemplo, notify(ev::OneWayEvent) puede llamar a schedule(ev.task) si y solo si modifica el estado de OWE_WAITING a OWE_NOTIFYING. Esto nos permite saber que la tarea que ejecuta wait(ev::OneWayEvent) está ahora en la rama ok y que no puede haber otras tareas que intenten schedule(ev.task) ya que su @atomicreplace(ev.state, state => OWE_NOTIFYING) fallará.