Tasks

Core.TaskType
Task(func)

Crea un Task (es decir, una coroutine) para ejecutar la función dada func (que debe ser invocable sin argumentos). La tarea finaliza cuando esta función retorna. La tarea se ejecutará en la "edad del mundo" del padre en el momento de la construcción cuando se scheduled.

Warning

Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky. Esto modela el comportamiento predeterminado histórico para @async. Las tareas sticky solo pueden ejecutarse en el hilo de trabajo en el que fueron programadas por primera vez, y al ser programadas harán que la tarea desde la cual fueron programadas sea sticky. Para obtener el comportamiento de Threads.@spawn, establece el bit sticky manualmente en false.

Ejemplos

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

En este ejemplo, b es un Task ejecutable que aún no ha comenzado.

source
Base.@taskMacro
@task

Envuelve una expresión en una Task sin ejecutarla, y devuelve la Task. Esto solo crea una tarea y no la ejecuta.

Warning

Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky. Esto modela el comportamiento predeterminado histórico para @async. Las tareas sticky solo se pueden ejecutar en el hilo de trabajo en el que se programaron por primera vez, y al programarse harán que la tarea desde la cual se programaron sea sticky. Para obtener el comportamiento de Threads.@spawn, establece el bit sticky manualmente en false.

Ejemplos

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

Envuelve una expresión en una Task y añádela a la cola del programador de la máquina local.

Los valores se pueden interpolar en @async a través de $, que copia el valor directamente en el cierre subyacente construido. Esto te permite insertar el valor de una variable, aislando el código asíncrono de los cambios en el valor de la variable en la tarea actual.

Warning

Se recomienda encarecidamente favorecer Threads.@spawn sobre @async siempre incluso cuando no se requiera paralelismo, especialmente en bibliotecas distribuidas públicamente. Esto se debe a que el uso de @async desactiva la migración de la tarea padre a través de hilos de trabajo en la implementación actual de Julia. Por lo tanto, el uso aparentemente inocente de @async en una función de biblioteca puede tener un gran impacto en el rendimiento de partes muy diferentes de las aplicaciones de los usuarios.

Julia 1.4

La interpolación de valores a través de $ está disponible desde Julia 1.4.

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

Utiliza múltiples tareas concurrentes para mapear f sobre una colección (o múltiples colecciones de igual longitud). Para múltiples argumentos de colección, f se aplica elemento por elemento.

ntasks especifica el número de tareas que se ejecutarán de forma concurrente. Dependiendo de la longitud de las colecciones, si ntasks no está especificado, se utilizarán hasta 100 tareas para el mapeo concurrente.

ntasks también se puede especificar como una función sin argumentos. En este caso, el número de tareas que se ejecutarán en paralelo se verifica antes de procesar cada elemento y se inicia una nueva tarea si el valor de ntasks_func es mayor que el número actual de tareas.

Si se especifica batch_size, la colección se procesa en modo por lotes. f debe ser una función que acepte un Vector de tuplas de argumentos y debe devolver un vector de resultados. El vector de entrada tendrá una longitud de batch_size o menos.

Los siguientes ejemplos destacan la ejecución en diferentes tareas al devolver el objectid de las tareas en las que se ejecuta la función de mapeo.

Primero, con ntasks indefinido, cada elemento se procesa en una tarea diferente.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

Con ntasks=2, todos los elementos se procesan en 2 tareas.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

Con batch_size definido, la función de mapeo necesita ser cambiada para aceptar un arreglo de tuplas de argumentos y devolver un arreglo de resultados. map se utiliza en la función de mapeo modificada para lograr esto.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Como asyncmap, pero almacena la salida en results en lugar de devolver una colección.

Advertencia

El comportamiento puede ser inesperado cuando cualquier argumento mutado comparte memoria con cualquier otro argumento.

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

Determina si una tarea ha salido.

Ejemplos

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

Determina si una tarea ha comenzado a ejecutarse.

Ejemplos

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

Determina si una tarea ha salido porque se lanzó una excepción.

Ejemplos

julia> a4() = error("tarea fallida");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

Esta función requiere al menos Julia 1.3.

source
Base.task_local_storageMethod
task_local_storage(key)

Busca el valor de una clave en el almacenamiento local de tareas de la tarea actual.

source
Base.task_local_storageMethod
task_local_storage(key, value)

Asigna un valor a una clave en el almacenamiento local de tareas de la tarea actual.

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

Llama a la función body con un almacenamiento local de tareas modificado, en el cual value se asigna a key; el valor anterior de key, o la falta del mismo, se restaura después. Útil para emular el alcance dinámico.

source

Scheduling

Base.yieldFunction
yield()

Cambia al programador para permitir que otra tarea programada se ejecute. Una tarea que llama a esta función sigue siendo ejecutable y se reiniciará inmediatamente si no hay otras tareas ejecutables.

source
yield(t::Task, arg = nothing)

Una versión rápida y de programación injusta de schedule(t, arg); yield() que cede inmediatamente a t antes de llamar al programador.

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

Cambia a la tarea dada. La primera vez que se cambia a una tarea, la función de la tarea se llama sin argumentos. En cambios posteriores, arg se devuelve de la última llamada de la tarea a yieldto. Esta es una llamada de bajo nivel que solo cambia tareas, sin considerar estados o programación de ninguna manera. Su uso no se recomienda.

source
Base.sleepFunction
sleep(seconds)

Bloquea la tarea actual durante un número especificado de segundos. El tiempo mínimo de espera es de 1 milisegundo o una entrada de 0.001.

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Agrega un Task a la cola del programador. Esto hace que la tarea se ejecute constantemente cuando el sistema está inactivo, a menos que la tarea realice una operación de bloqueo como wait.

Si se proporciona un segundo argumento val, se pasará a la tarea (a través del valor de retorno de yieldto) cuando se ejecute nuevamente. Si error es true, el valor se levantará como una excepción en la tarea despertada.

Warning

Es incorrecto usar schedule en un Task arbitrario que ya ha sido iniciado. Consulta la referencia de la API para más información.

Warning

Por defecto, las tareas tendrán el bit sticky configurado en true t.sticky. Esto modela el comportamiento predeterminado histórico para @async. Las tareas sticky solo se pueden ejecutar en el hilo de trabajo en el que se programaron por primera vez, y al programarse harán que la tarea desde la que se programaron sea sticky. Para obtener el comportamiento de Threads.@spawn, establece el bit sticky manualmente en false.

Ejemplos

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

Imprime un registro de errores en stderr si la tarea t falla.

Ejemplos

julia> Base._wait(errormonitor(Threads.@spawn error("la tarea falló")))
Unhandled Task ERROR: la tarea falló
Stacktrace:
[...]
source
Base.@syncMacro
@sync

Espera hasta que se completen todos los usos encerrados léxicamente de @async, @spawn, Distributed.@spawnat y Distributed.@distributed. Todas las excepciones lanzadas por las operaciones asíncronas encerradas se recopilan y se lanzan como una CompositeException.

Ejemplos

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

Nota especial para Threads.Condition:

El llamador debe estar sosteniendo el lock que posee un Threads.Condition antes de llamar a este método. La tarea que llama estará bloqueada hasta que otra tarea la despierte, generalmente llamando a notify en el mismo objeto Threads.Condition. El lock se liberará de forma atómica al bloquearse (incluso si estaba bloqueado recursivamente) y se volverá a adquirir antes de regresar.

source
wait(r::Future)

Espera a que un valor esté disponible para el Future especificado.

source
wait(r::RemoteChannel, args...)

Espera a que un valor esté disponible en el RemoteChannel especificado.

source
wait([x])

Bloquea la tarea actual hasta que ocurra algún evento, dependiendo del tipo del argumento:

  • Channel: Espera a que se agregue un valor al canal.
  • Condition: Espera a que se llame a notify en una condición y devuelve el parámetro val pasado a notify. Esperar en una condición permite además pasar first=true, lo que resulta en que el que espera se coloque primero en la fila para despertarse en notify en lugar del comportamiento habitual de primero en entrar, primero en salir.
  • Process: Espera a que un proceso o cadena de procesos termine. El campo exitcode de un proceso se puede usar para determinar el éxito o el fracaso.
  • Task: Espera a que una Task termine. Si la tarea falla con una excepción, se lanza una TaskFailedException (que envuelve la tarea fallida).
  • RawFD: Espera cambios en un descriptor de archivo (ver el paquete FileWatching).

Si no se pasa ningún argumento, la tarea se bloquea por un período indefinido. Una tarea solo se puede reiniciar mediante una llamada explícita a schedule o yieldto.

A menudo, wait se llama dentro de un bucle while para asegurar que se cumpla una condición esperada antes de continuar.

source
wait(c::Channel)

Bloquea hasta que el Channel isready.

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # la tarea está bloqueada porque el canal no está listo
false

julia> put!(c, 1);

julia> istaskdone(task)  # la tarea ahora está desbloqueada
true
source
Base.fetchMethod
fetch(t::Task)

Espera a que una Task termine, luego devuelve su valor de resultado. Si la tarea falla con una excepción, se lanza una TaskFailedException (que envuelve la tarea fallida).

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

Espera hasta que testcb() devuelva true o hayan pasado timeout segundos, lo que ocurra primero. La función de prueba se consulta cada pollint segundos. El valor mínimo para pollint es 0.001 segundos, es decir, 1 milisegundo.

Devuelve :ok o :timed_out.

Ejemplos

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

Crea una fuente de eventos activada por bordes que las tareas pueden esperar. Las tareas que llaman a wait en un Condition son suspendidas y encoladas. Las tareas se despiertan cuando se llama a notify más tarde en el Condition. Esperar en una condición puede devolver un valor o generar un error si se utilizan los argumentos opcionales de notify. La activación por bordes significa que solo las tareas que están esperando en el momento en que se llama a notify pueden ser despertadas. Para notificaciones activadas por niveles, debes mantener un estado adicional para hacer un seguimiento de si ha ocurrido una notificación. Los tipos Channel y Threads.Event hacen esto y se pueden usar para eventos activados por niveles.

Este objeto NO es seguro para hilos. Consulta Threads.Condition para una versión segura para hilos.

source
Base.Threads.ConditionType
Threads.Condition([lock])

Una versión segura para hilos de Base.Condition.

Para llamar a wait o notify en un Threads.Condition, primero debes llamar a lock en él. Cuando se llama a wait, el bloqueo se libera atómicamente durante el bloqueo y se volverá a adquirir antes de que wait regrese. Por lo tanto, el uso idiomático de un Threads.Condition c se ve como sigue:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

Esta funcionalidad requiere al menos Julia 1.2.

source
Base.EventType
Event([autoreset=false])

Crea una fuente de eventos activada por nivel. Las tareas que llaman a wait en un Event son suspendidas y encoladas hasta que se llame a notify en el Event. Después de que se llama a notify, el Event permanece en un estado señalizado y las tareas ya no se bloquearán al esperar por él, hasta que se llame a reset.

Si autoreset es verdadero, como máximo una tarea será liberada de wait por cada llamada a notify.

Esto proporciona un orden de memoria de adquisición y liberación en notify/wait.

Julia 1.1

Esta funcionalidad requiere al menos Julia 1.1.

Julia 1.8

La funcionalidad de autoreset y la garantía de orden de memoria requieren al menos Julia 1.8.

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

Despierta las tareas que están esperando una condición, pasándoles val. Si all es true (el valor predeterminado), se despiertan todas las tareas en espera; de lo contrario, solo se despierta una. Si error es true, el valor pasado se eleva como una excepción en las tareas despertadas.

Devuelve el conteo de tareas despertadas. Devuelve 0 si no hay tareas esperando en condition.

source
Base.resetMethod
reset(::Event)

Restablece un Event a un estado no establecido. Luego, cualquier llamada futura a wait bloqueará hasta que se llame a notify nuevamente.

source
Base.SemaphoreType
Semaphore(sem_size)

Crea un semáforo de conteo que permite como máximo sem_size adquisiciones en uso en cualquier momento. Cada adquisición debe ser emparejada con una liberación.

Esto proporciona un ordenamiento de memoria de adquisición y liberación en las llamadas de adquisición/liberación.

source
Base.acquireFunction
acquire(s::Semaphore)

Espera a que uno de los permisos sem_size esté disponible, bloqueando hasta que se pueda adquirir uno.

source
acquire(f, s::Semaphore)

Ejecuta f después de adquirir del Semáforo s, y release al completar o en caso de error.

Por ejemplo, una forma de bloque do que asegura que solo 2 llamadas a foo estarán activas al mismo tiempo:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

Este método requiere al menos Julia 1.8.

source
Base.releaseFunction
release(s::Semaphore)

Devuelve un permiso al grupo, permitiendo posiblemente que otra tarea lo adquiera y reanude la ejecución.

source
Base.lockFunction
lock(lock)

Adquiere el lock cuando esté disponible. Si el lock ya está bloqueado por otra tarea/hilo, espera a que esté disponible.

Cada lock debe ser emparejado con un unlock.

source
lock(f::Function, lock)

Adquiere el lock, ejecuta f con el lock mantenido, y libera el lock cuando f retorna. Si el lock ya está bloqueado por otra tarea/hilo, espera a que esté disponible.

Cuando esta función retorna, el lock ha sido liberado, por lo que el llamador no debe intentar unlockearlo.

Ver también: @lock.

Julia 1.7

Usar un Channel como segundo argumento requiere Julia 1.7 o posterior.

source

lock(f::Function, l::Lockable)

Adquiere el bloqueo asociado con l, ejecuta f con el bloqueo mantenido y libera el bloqueo cuando f regresa. f recibirá un argumento posicional: el valor envuelto por l. Si el bloqueo ya está bloqueado por otra tarea/hilo, espera a que esté disponible. Cuando esta función regresa, el lock ha sido liberado, por lo que el llamador no debe intentar unlockarlo.

Julia 1.11

Requiere al menos Julia 1.11.

source
Base.unlockFunction
unlock(lock)

Libera la propiedad del lock.

Si este es un bloqueo recursivo que se ha adquirido antes, decrementa un contador interno y retorna inmediatamente.

source
Base.trylockFunction
trylock(lock) -> Éxito (Booleano)

Adquiere el bloqueo si está disponible y devuelve true si tiene éxito. Si el bloqueo ya está bloqueado por otra tarea/hilo, devuelve false.

Cada trylock exitoso debe ser emparejado con un unlock.

La función trylock combinada con islocked se puede usar para escribir los algoritmos de prueba-y-prueba-y-establecer o retroceso exponencial si es compatible con el typeof(lock) (lee su documentación).

source
Base.islockedFunction
islocked(lock) -> Estado (Booleano)

Verifica si el lock está sostenido por alguna tarea/hilo. Esta función por sí sola no debe ser utilizada para sincronización. Sin embargo, islocked combinado con trylock puede ser utilizado para escribir los algoritmos de prueba-y-prueba-y-establecer o retroceso exponencial si es soportado por el typeof(lock) (lee su documentación).

Ayuda extendida

Por ejemplo, un retroceso exponencial puede ser implementado de la siguiente manera si la implementación del lock satisface las propiedades documentadas a continuación.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Implementación

Se aconseja a una implementación de lock definir islocked con las siguientes propiedades y anotarlo en su docstring.

  • islocked(lock) es libre de condiciones de carrera.
  • Si islocked(lock) devuelve false, una invocación inmediata de trylock(lock) debe tener éxito (devolver true) si no hay interferencia de otras tareas.
source
Base.ReentrantLockType
ReentrantLock()

Crea un candado reentrante para sincronizar Tasks. La misma tarea puede adquirir el candado tantas veces como sea necesario (esto es lo que significa la parte "Reentrante" del nombre). Cada lock debe coincidir con un unlock.

Llamar a lock también inhibirá la ejecución de finalizadores en ese hilo hasta el correspondiente unlock. El uso del patrón de bloqueo estándar ilustrado a continuación debería ser naturalmente compatible, pero ten cuidado de invertir el orden de try/lock o de omitir completamente el bloque try (por ejemplo, intentar devolver con el candado aún sostenido):

Esto proporciona un orden de memoria de adquisición/liberación en las llamadas de lock/unlock.

lock(l)
try
    <trabajo atómico>
finally
    unlock(l)
end

Si !islocked(lck::ReentrantLock) se cumple, trylock(lck) tendrá éxito a menos que haya otras tareas intentando mantener el candado "al mismo tiempo."

source
Base.@lockMacro
@lock l expr

Versión macro de lock(f, l::AbstractLock) pero con expr en lugar de la función f. Se expande a:

lock(l)
try
    expr
finally
    unlock(l)
end

Esto es similar a usar lock con un bloque do, pero evita crear un cierre y, por lo tanto, puede mejorar el rendimiento.

Compat

@lock se agregó en Julia 1.3 y se exportó en Julia 1.10.

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

Crea un objeto Lockable que envuelve value y lo asocia con el lock proporcionado. Este objeto soporta @lock, lock, trylock, unlock. Para acceder al valor, indexa el objeto bloqueable mientras sostienes el bloqueo.

Julia 1.11

Requiere al menos Julia 1.11.

Ejemplo

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # debes sostener el bloqueo para acceder al valor
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

Construye un Channel con un búfer interno que puede contener un máximo de size objetos del tipo T. Las llamadas a put! en un canal lleno bloquean hasta que se elimine un objeto con take!.

Channel(0) construye un canal sin búfer. put! bloquea hasta que se llama a un take! correspondiente. Y viceversa.

Otros constructores:

  • Channel(): constructor por defecto, equivalente a Channel{Any}(0)
  • Channel(Inf): equivalente a Channel{Any}(typemax(Int))
  • Channel(sz): equivalente a Channel{Any}(sz)
Julia 1.3

El constructor por defecto Channel() y el tamaño por defecto size=0 se añadieron en Julia 1.3.

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Crea una nueva tarea a partir de func, la vincula a un nuevo canal de tipo T y tamaño size, y programa la tarea, todo en una sola llamada. El canal se cierra automáticamente cuando la tarea termina.

func debe aceptar el canal vinculado como su único argumento.

Si necesitas una referencia a la tarea creada, pasa un objeto Ref{Task} a través del argumento de palabra clave taskref.

Si spawn=true, la Task creada para func puede ser programada en otro hilo en paralelo, equivalente a crear una tarea a través de Threads.@spawn.

Si spawn=true y el argumento threadpool no está establecido, por defecto es :default.

Si el argumento threadpool está establecido (a :default o :interactive), esto implica que spawn=true y la nueva Task se genera en el threadpool especificado.

Devuelve un Channel.

Ejemplos

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

Referenciando la tarea creada:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

El parámetro spawn= se agregó en Julia 1.3. Este constructor se agregó en Julia 1.3. En versiones anteriores de Julia, Channel utilizaba argumentos de palabra clave para establecer size y T, pero esos constructores están en desuso.

Julia 1.9

El argumento threadpool= se agregó en Julia 1.9.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

Agrega un elemento v al canal c. Bloquea si el canal está lleno.

Para canales no almacenados, bloquea hasta que se realice un take! por otra tarea.

Julia 1.1

v ahora se convierte al tipo del canal con convert cuando se llama a put!.

source
Base.take!Method
take!(c::Channel)

Elimina y devuelve un valor de un Channel en orden. Bloquea hasta que los datos estén disponibles. Para canales sin búfer, bloquea hasta que se realice un put! por otra tarea.

Ejemplos

Canal con búfer:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

Canal sin búfer:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

Determina si un Channel tiene un valor almacenado en él. Devuelve inmediatamente, no bloquea.

Para canales no bufferizados, devuelve true si hay tareas esperando en un put!.

Ejemplos

Canal bufferizado:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

Canal no bufferizado:

julia> c = Channel();

julia> isready(c)  # ¡sin tareas esperando para poner!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # programa una tarea de put!

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

Espera y devuelve (sin eliminar) el primer elemento disponible del Channel. Nota: fetch no es compatible con un Channel sin búfer (tamaño 0).

Ejemplos

Canal con búfer:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # el elemento no se elimina
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

Cierra un canal. Se lanza una excepción (opcionalmente dada por excp) por:

source
Base.bindMethod
bind(chnl::Channel, task::Task)

Asocia la duración de chnl con una tarea. Channel chnl se cierra automáticamente cuando la tarea termina. Cualquier excepción no capturada en la tarea se propaga a todos los que están a la espera en chnl.

El objeto chnl se puede cerrar explícitamente independientemente de la terminación de la tarea. Las tareas que terminan no tienen efecto en los objetos Channel que ya están cerrados.

Cuando un canal está vinculado a múltiples tareas, la primera tarea que termine cerrará el canal. Cuando múltiples canales están vinculados a la misma tarea, la terminación de la tarea cerrará todos los canales vinculados.

Ejemplos

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

El uso correcto más fácil de schedule es en una Tarea que aún no ha comenzado (programada). Sin embargo, es posible usar 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 y wait como un bloque de construcción de muy bajo nivel para construir interfaces de sincronización. Una condición previa crucial para llamar a schedule(task) es que el llamador debe "poseer" la tarea; es decir, debe saber que la llamada a wait en la tarea dada está ocurriendo en las ubicaciones conocidas por el código que llama a schedule(task). Una estrategia para asegurar tal condición previa es usar atómicos, como se demuestra en el siguiente ejemplo:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent permite que una tarea espere la notificación de otra tarea. Es una interfaz de comunicación limitada ya que wait solo puede ser utilizado una vez desde una única tarea (nota la asignación no atómica de ev.task)

En este ejemplo, notify(ev::OneWayEvent) puede llamar a schedule(ev.task) si y solo si modifica el estado de OWE_WAITING a OWE_NOTIFYING. Esto nos permite saber que la tarea que ejecuta wait(ev::OneWayEvent) está ahora en la rama ok y que no puede haber otras tareas que intenten schedule(ev.task) ya que su @atomicreplace(ev.state, state => OWE_NOTIFYING) fallará.