Tasks

Core.TaskType
Task(func)

Créez une Task (c'est-à-dire une coroutine) pour exécuter la fonction donnée func (qui doit être appelable sans arguments). La tâche se termine lorsque cette fonction retourne. La tâche s'exécutera dans l'"âge du monde" du parent au moment de la construction lors de la planification schedule.

Avertissement

Par défaut, les tâches auront le bit collant défini sur true t.sticky. Cela modélise le comportement par défaut historique pour @async. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn, définissez manuellement le bit collant sur false.

Exemples

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

Dans cet exemple, b est une Task exécutable qui n'a pas encore commencé.

source
Base.@taskMacro
@task

Enveloppez une expression dans une Task sans l'exécuter, et renvoyez la Task. Cela crée uniquement une tâche, et ne l'exécute pas.

Avertissement

Par défaut, les tâches auront le bit collant défini sur true t.sticky. Cela modélise le comportement par défaut historique pour @async. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn, définissez manuellement le bit collant sur false.

Exemples

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

Enveloppez une expression dans une Task et ajoutez-la à la file d'attente du planificateur de la machine locale.

Les valeurs peuvent être interpolées dans @async via $, ce qui copie la valeur directement dans la fermeture sous-jacente construite. Cela vous permet d'insérer la valeur d'une variable, isolant le code asynchrone des changements de la valeur de la variable dans la tâche actuelle.

Warning

Il est fortement recommandé de privilégier Threads.@spawn plutôt que @async toujours même lorsque le parallélisme n'est pas requis, en particulier dans les bibliothèques distribuées publiquement. Cela est dû au fait qu'une utilisation de @async désactive la migration de la tâche parent à travers les threads de travail dans l'implémentation actuelle de Julia. Ainsi, une utilisation apparemment innocente de @async dans une fonction de bibliothèque peut avoir un impact important sur les performances de parties très différentes des applications des utilisateurs.

Julia 1.4

L'interpolation des valeurs via $ est disponible depuis Julia 1.4.

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

Utilise plusieurs tâches concurrentes pour appliquer f sur une collection (ou plusieurs collections de même longueur). Pour plusieurs arguments de collection, f est appliqué élément par élément.

ntasks spécifie le nombre de tâches à exécuter de manière concurrente. En fonction de la longueur des collections, si ntasks n'est pas spécifié, jusqu'à 100 tâches seront utilisées pour le mappage concurrent.

ntasks peut également être spécifié comme une fonction sans argument. Dans ce cas, le nombre de tâches à exécuter en parallèle est vérifié avant le traitement de chaque élément et une nouvelle tâche est démarrée si la valeur de ntasks_func est supérieure au nombre actuel de tâches.

Si batch_size est spécifié, la collection est traitée en mode batch. f doit alors être une fonction qui doit accepter un Vector de tuples d'arguments et doit retourner un vecteur de résultats. Le vecteur d'entrée aura une longueur de batch_size ou moins.

Les exemples suivants mettent en évidence l'exécution dans différentes tâches en retournant l'objectid des tâches dans lesquelles la fonction de mappage est exécutée.

Tout d'abord, avec ntasks indéfini, chaque élément est traité dans une tâche différente.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

Avec ntasks=2, tous les éléments sont traités dans 2 tâches.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

Avec batch_size défini, la fonction de mappage doit être modifiée pour accepter un tableau de tuples d'arguments et retourner un tableau de résultats. map est utilisé dans la fonction de mappage modifiée pour y parvenir.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Comme asyncmap, mais stocke la sortie dans results plutôt que de renvoyer une collection.

Avertissement

Le comportement peut être inattendu lorsque tout argument muté partage de la mémoire avec un autre argument.

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

Déterminez si une tâche a quitté.

Exemples

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

Déterminez si une tâche a commencé à s'exécuter.

Exemples

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

Détermine si une tâche a quitté en raison d'une exception lancée.

Exemples

julia> a4() = error("tâche échouée");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

Cette fonction nécessite au moins Julia 1.3.

source
Base.task_local_storageMethod
task_local_storage(key, value)

Attribuer une valeur à une clé dans le stockage local de la tâche actuelle.

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

Appelle la fonction body avec un stockage local de tâche modifié, dans lequel value est assigné à key; la valeur précédente de key, ou son absence, est restaurée par la suite. Utile pour émuler la portée dynamique.

source

Scheduling

Base.yieldFunction
yield()

Passez au planificateur pour permettre à une autre tâche planifiée de s'exécuter. Une tâche qui appelle cette fonction est toujours exécutable et sera redémarrée immédiatement s'il n'y a pas d'autres tâches exécutables.

source
yield(t::Task, arg = nothing)

Une version rapide et de planification injuste de schedule(t, arg); yield() qui cède immédiatement à t avant d'appeler le planificateur.

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

Bascule vers la tâche donnée. La première fois qu'une tâche est basculée, la fonction de la tâche est appelée sans arguments. Lors des bascules suivantes, arg est renvoyé de l'appel précédent de la tâche à yieldto. C'est un appel de bas niveau qui ne fait que basculer les tâches, sans tenir compte des états ou de la planification de quelque manière que ce soit. Son utilisation est déconseillée.

source
Base.sleepFunction
sleep(seconds)

Bloque la tâche actuelle pendant un nombre spécifié de secondes. Le temps de sommeil minimum est de 1 milliseconde ou une entrée de 0.001.

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Ajoute une Task à la file d'attente du planificateur. Cela fait en sorte que la tâche s'exécute constamment lorsque le système est autrement inactif, à moins que la tâche n'effectue une opération bloquante telle que wait.

Si un deuxième argument val est fourni, il sera passé à la tâche (via la valeur de retour de yieldto) lorsqu'elle s'exécute à nouveau. Si error est true, la valeur est levée comme une exception dans la tâche réveillée.

Warning

Il est incorrect d'utiliser schedule sur une Task arbitraire qui a déjà été démarrée. Voir la référence API pour plus d'informations.

Warning

Par défaut, les tâches auront le bit collant défini sur vrai t.sticky. Cela modélise le comportement par défaut historique pour @async. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn, définissez manuellement le bit collant sur false.

Exemples

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

Imprime un journal d'erreurs dans stderr si la tâche t échoue.

Exemples

julia> Base._wait(errormonitor(Threads.@spawn error("la tâche a échoué")))
Erreur de tâche non gérée : la tâche a échoué
Trace de la pile :
[...]
source
Base.@syncMacro
@sync

Attendez que toutes les utilisations lexicalement enfermées de @async, @spawn, Distributed.@spawnat et Distributed.@distributed soient terminées. Toutes les exceptions levées par les opérations asynchrones enfermées sont collectées et levées sous la forme d'une CompositeException.

Exemples

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

Note spéciale pour Threads.Condition :

L'appelant doit détenir le lock qui possède un Threads.Condition avant d'appeler cette méthode. La tâche appelante sera bloquée jusqu'à ce qu'une autre tâche la réveille, généralement en appelant notify sur le même objet Threads.Condition. Le verrou sera libéré de manière atomique lors du blocage (même s'il était verrouillé de manière récursive) et sera réacquis avant le retour.

source
wait(r::Future)

Attendez qu'une valeur soit disponible pour le Future spécifié.

source
wait(r::RemoteChannel, args...)

Attendez qu'une valeur soit disponible sur le RemoteChannel spécifié.

source
wait([x])

Bloque la tâche actuelle jusqu'à ce qu'un événement se produise, en fonction du type de l'argument :

  • Channel : Attendre qu'une valeur soit ajoutée au canal.
  • Condition : Attendre notify sur une condition et retourner le paramètre val passé à notify. Attendre sur une condition permet également de passer first=true, ce qui fait que le wait est mis en premier dans la file d'attente pour se réveiller sur notify au lieu du comportement habituel premier arrivé, premier servi.
  • Process : Attendre qu'un processus ou une chaîne de processus se termine. Le champ exitcode d'un processus peut être utilisé pour déterminer le succès ou l'échec.
  • Task : Attendre qu'une Task se termine. Si la tâche échoue avec une exception, une TaskFailedException (qui enveloppe la tâche échouée) est levée.
  • RawFD : Attendre des changements sur un descripteur de fichier (voir le package FileWatching).

Si aucun argument n'est passé, la tâche se bloque pour une période indéfinie. Une tâche ne peut être redémarrée que par un appel explicite à schedule ou yieldto.

Souvent, wait est appelé dans une boucle while pour s'assurer qu'une condition attendue est remplie avant de continuer.

source
wait(c::Channel)

Bloque jusqu'à ce que le Channel isready.

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # la tâche est bloquée car le canal n'est pas prêt
false

julia> put!(c, 1);

julia> istaskdone(task)  # la tâche n'est plus bloquée
true
source
Base.fetchMethod
fetch(t::Task)

Attendez qu'une Task se termine, puis renvoyez sa valeur de résultat. Si la tâche échoue avec une exception, une TaskFailedException (qui enveloppe la tâche échouée) est levée.

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

Attendez jusqu'à ce que testcb() retourne true ou que timeout secondes se soient écoulées, selon ce qui se produit en premier. La fonction de test est interrogée toutes les pollint secondes. La valeur minimale pour pollint est de 0,001 secondes, c'est-à-dire 1 milliseconde.

Retourne :ok ou :timed_out.

Exemples

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

Créez une source d'événements déclenchée par un bord sur laquelle les tâches peuvent attendre. Les tâches qui appellent wait sur un Condition sont suspendues et mises en file d'attente. Les tâches sont réveillées lorsque notify est appelé plus tard sur le Condition. Attendre sur une condition peut renvoyer une valeur ou lever une erreur si les arguments optionnels de notify sont utilisés. Le déclenchement par bord signifie que seules les tâches en attente au moment où notify est appelé peuvent être réveillées. Pour des notifications déclenchées par niveau, vous devez conserver un état supplémentaire pour suivre si une notification a eu lieu. Les types Channel et Threads.Event le font et peuvent être utilisés pour des événements déclenchés par niveau.

Cet objet n'est PAS sûr pour les threads. Voir Threads.Condition pour une version sûre pour les threads.

source
Base.Threads.ConditionType
Threads.Condition([lock])

Une version thread-safe de Base.Condition.

Pour appeler wait ou notify sur un Threads.Condition, vous devez d'abord appeler lock dessus. Lorsque wait est appelé, le verrou est libéré de manière atomique pendant le blocage, et sera réacquis avant que wait ne retourne. Par conséquent, l'utilisation idiomatique d'un Threads.Condition c ressemble à ce qui suit :

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

Cette fonctionnalité nécessite au moins Julia 1.2.

source
Base.EventType
Event([autoreset=false])

Créez une source d'événement à déclenchement de niveau. Les tâches qui appellent wait sur un Event sont suspendues et mises en file d'attente jusqu'à ce que notify soit appelé sur l'Event. Après que notify a été appelé, l'Event reste dans un état signalé et les tâches ne bloqueront plus en attendant, jusqu'à ce que reset soit appelé.

Si autoreset est vrai, au maximum une tâche sera libérée de wait pour chaque appel à notify.

Cela fournit un ordre de mémoire d'acquisition et de libération sur notify/wait.

Julia 1.1

Cette fonctionnalité nécessite au moins Julia 1.1.

Julia 1.8

La fonctionnalité autoreset et la garantie d'ordre de mémoire nécessitent au moins Julia 1.8.

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

Réveille les tâches en attente d'une condition, en leur passant val. Si all est true (par défaut), toutes les tâches en attente sont réveillées, sinon une seule l'est. Si error est true, la valeur passée est levée comme une exception dans les tâches réveillées.

Retourne le nombre de tâches réveillées. Retourne 0 si aucune tâche n'attend sur condition.

source
Base.resetMethod
reset(::Event)

Réinitialise un Event dans un état non défini. Ensuite, tout appel futur à wait bloquera jusqu'à ce que notify soit appelé à nouveau.

source
Base.SemaphoreType
Semaphore(sem_size)

Créez un sémaphore de comptage qui permet au maximum sem_size acquisitions d'être utilisées à tout moment. Chaque acquisition doit être assortie d'une libération.

Cela fournit un ordre de mémoire d'acquisition et de libération sur les appels d'acquisition/libération.

source
Base.acquireFunction
acquire(s::Semaphore)

Attendez qu'un des permis sem_size soit disponible, en bloquant jusqu'à ce qu'un puisse être acquis.

source
acquire(f, s::Semaphore)

Exécute f après avoir acquis le sémaphore s, et release à la fin ou en cas d'erreur.

Par exemple, une forme de bloc do qui garantit que seulement 2 appels de foo seront actifs en même temps :

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

Cette méthode nécessite au moins Julia 1.8.

source
Base.releaseFunction
release(s::Semaphore)

Retourne un permis au pool, permettant éventuellement à une autre tâche de l'acquérir et de reprendre l'exécution.

source
Base.lockFunction
lock(lock)

Acquérir le lock lorsqu'il devient disponible. Si le verrou est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible.

Chaque lock doit être associé à un unlock.

source
lock(f::Function, lock)

Acquérir le lock, exécuter f avec le lock maintenu, et libérer le lock lorsque f retourne. Si le lock est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible.

Lorsque cette fonction retourne, le lock a été libéré, donc l'appelant ne doit pas tenter de le unlock.

Voir aussi : @lock.

Julia 1.7

Utiliser un Channel comme deuxième argument nécessite Julia 1.7 ou version ultérieure.

source

lock(f::Function, l::Lockable)

Acquérir le verrou associé à l, exécuter f avec le verrou détenu, et libérer le verrou lorsque f retourne. f recevra un argument positionnel : la valeur encapsulée par l. Si le verrou est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible. Lorsque cette fonction retourne, le lock a été libéré, donc l'appelant ne doit pas tenter de unlock le verrou.

Julia 1.11

Nécessite au moins Julia 1.11.

source
Base.unlockFunction
unlock(lock)

Libère la propriété du lock.

Si c'est un verrou récursif qui a été acquis auparavant, décrémentez un compteur interne et retournez immédiatement.

source
Base.trylockFunction
trylock(lock) -> Succès (Booléen)

Acquérir le verrou s'il est disponible et retourner true si réussi. Si le verrou est déjà verrouillé par une autre tâche/fil, retourner false.

Chaque trylock réussi doit être associé à un unlock.

La fonction trylock combinée avec islocked peut être utilisée pour écrire les algorithmes de test-et-test-et-set ou de retour exponentiel si cela est supporté par le typeof(lock) (lisez sa documentation).

source
Base.islockedFunction
islocked(lock) -> Statut (Booléen)

Vérifiez si le lock est détenu par une tâche/fil. Cette fonction seule ne doit pas être utilisée pour la synchronisation. Cependant, islocked combiné avec trylock peut être utilisé pour écrire les algorithmes de test-et-test-et-set ou de retour exponentiel si cela est supporté par le typeof(lock) (lisez sa documentation).

Aide étendue

Par exemple, un retour exponentiel peut être implémenté comme suit si l'implémentation du lock satisfait les propriétés documentées ci-dessous.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Implémentation

Il est conseillé à une implémentation de lock de définir islocked avec les propriétés suivantes et de le noter dans sa docstring.

  • islocked(lock) est sans course de données.
  • Si islocked(lock) retourne false, une invocation immédiate de trylock(lock) doit réussir (retourne true) s'il n'y a pas d'interférence d'autres tâches.
source
Base.ReentrantLockType
ReentrantLock()

Crée un verrou réentrant pour synchroniser les Tasks. La même tâche peut acquérir le verrou autant de fois que nécessaire (c'est ce que signifie la partie "Réentrant" du nom). Chaque lock doit être associé à un unlock.

Appeler lock inhibera également l'exécution des finaliseurs sur ce thread jusqu'au unlock correspondant. L'utilisation du modèle de verrouillage standard illustré ci-dessous devrait être naturellement supportée, mais attention à inverser l'ordre try/lock ou à manquer complètement le bloc try (par exemple, tenter de retourner avec le verrou toujours détenu) :

Cela fournit un ordre de mémoire d'acquisition/libération sur les appels lock/unlock.

lock(l)
try
    <travail atomique>
finally
    unlock(l)
end

Si !islocked(lck::ReentrantLock) est vrai, trylock(lck) réussit à moins qu'il y ait d'autres tâches tentant de détenir le verrou "en même temps."

source
Base.@lockMacro
@lock l expr

Version macro de lock(f, l::AbstractLock) mais avec expr au lieu de la fonction f. Se développe en :

lock(l)
try
    expr
finally
    unlock(l)
end

C'est similaire à l'utilisation de lock avec un bloc do, mais évite de créer une fermeture et peut donc améliorer les performances.

Compat

@lock a été ajouté dans Julia 1.3 et exporté dans Julia 1.10.

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

Crée un objet Lockable qui enveloppe value et l'associe au lock fourni. Cet objet prend en charge @lock, lock, trylock, unlock. Pour accéder à la valeur, indexez l'objet verrouillable tout en maintenant le verrou.

Julia 1.11

Nécessite au moins Julia 1.11.

Exemple

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # doit maintenir le verrou pour accéder à la valeur
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

Construit un Channel avec un tampon interne pouvant contenir un maximum de size objets de type T. put! appelle sur un canal plein bloque jusqu'à ce qu'un objet soit retiré avec take!.

Channel(0) construit un canal non tamponné. put! bloque jusqu'à ce qu'un take! correspondant soit appelé. Et vice-versa.

Autres constructeurs :

  • Channel(): constructeur par défaut, équivalent à Channel{Any}(0)
  • Channel(Inf): équivalent à Channel{Any}(typemax(Int))
  • Channel(sz): équivalent à Channel{Any}(sz)
Julia 1.3

Le constructeur par défaut Channel() et le size=0 par défaut ont été ajoutés dans Julia 1.3.

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Créez une nouvelle tâche à partir de func, liez-la à un nouveau canal de type T et de taille size, et planifiez la tâche, le tout en un seul appel. Le canal est automatiquement fermé lorsque la tâche se termine.

func doit accepter le canal lié comme son seul argument.

Si vous avez besoin d'une référence à la tâche créée, passez un objet Ref{Task} via l'argument clé taskref.

Si spawn=true, la Task créée pour func peut être planifiée sur un autre thread en parallèle, équivalent à la création d'une tâche via Threads.@spawn.

Si spawn=true et que l'argument threadpool n'est pas défini, il par défaut à :default.

Si l'argument threadpool est défini (à :default ou :interactive), cela implique que spawn=true et la nouvelle tâche est lancée dans le threadpool spécifié.

Retourne un Channel.

Exemples

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

Référencer la tâche créée :

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

Le paramètre spawn= a été ajouté dans Julia 1.3. Ce constructeur a été ajouté dans Julia 1.3. Dans les versions antérieures de Julia, Channel utilisait des arguments clés pour définir size et T, mais ces constructeurs sont obsolètes.

Julia 1.9

L'argument threadpool= a été ajouté dans Julia 1.9.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

Ajoute un élément v au canal c. Bloque si le canal est plein.

Pour les canaux non tamponnés, bloque jusqu'à ce qu'un take! soit effectué par une autre tâche.

Julia 1.1

v est maintenant converti au type du canal avec convert lorsque put! est appelé.

source
Base.take!Method
take!(c::Channel)

Supprime et renvoie une valeur d'un Channel dans l'ordre. Bloque jusqu'à ce que des données soient disponibles. Pour les canaux non tamponnés, bloque jusqu'à ce qu'un put! soit effectué par une autre tâche.

Exemples

Canal tamponné :

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

Canal non tamponné :

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

Détermine si un Channel a une valeur stockée en lui. Retourne immédiatement, ne bloque pas.

Pour les canaux non tamponnés, retourne true s'il y a des tâches en attente sur un put!.

Exemples

Canal tamponné :

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

Canal non tamponné :

julia> c = Channel();

julia> isready(c)  # aucune tâche en attente pour mettre !
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # planifier une tâche put !

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

Attend et retourne (sans supprimer) le premier élément disponible du Channel. Remarque : fetch n'est pas pris en charge sur un Channel non tamponné (taille 0).

Exemples

Channel tamponné :

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # l'élément n'est pas supprimé
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

Ferme un canal. Une exception (facultativement donnée par excp) est levée par :

source
Base.bindMethod
bind(chnl::Channel, task::Task)

Associe la durée de vie de chnl à une tâche. Le Channel chnl est automatiquement fermé lorsque la tâche se termine. Toute exception non interceptée dans la tâche est propagée à tous les attendus sur chnl.

L'objet chnl peut être explicitement fermé indépendamment de la terminaison de la tâche. Les tâches terminées n'ont aucun effet sur les objets Channel déjà fermés.

Lorsqu'un canal est lié à plusieurs tâches, la première tâche à se terminer fermera le canal. Lorsque plusieurs canaux sont liés à la même tâche, la terminaison de la tâche fermera tous les canaux liés.

Exemples

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

L'utilisation correcte la plus simple de schedule est sur une Task qui n'est pas encore commencée (planifiée). Cependant, il est possible d'utiliser 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 et wait comme un élément de base très bas niveau pour construire des interfaces de synchronisation. Une condition préalable cruciale pour appeler schedule(task) est que l'appelant doit "posséder" la task ; c'est-à-dire qu'il doit savoir que l'appel à wait dans la task donnée se produit aux emplacements connus du code appelant schedule(task). Une stratégie pour garantir une telle condition préalable est d'utiliser des atomiques, comme démontré dans l'exemple suivant :

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent permet à une tâche d'attendre la notification d'une autre tâche. C'est une interface de communication limitée puisque attendre ne peut être utilisé qu'une seule fois par une tâche unique (notez l'assignation non atomique de ev.task)

Dans cet exemple, notify(ev::OneWayEvent) est autorisé à appeler schedule(ev.task) si et seulement si il modifie l'état de OWE_WAITING à OWE_NOTIFYING. Cela nous permet de savoir que la tâche exécutant wait(ev::OneWayEvent) est maintenant dans la branche ok et qu'il ne peut pas y avoir d'autres tâches qui essaient de schedule(ev.task) puisque leur @atomicreplace(ev.state, state => OWE_NOTIFYING) échouera.