Tasks
Core.Task — TypeTask(func)Créez une Task (c'est-à-dire une coroutine) pour exécuter la fonction donnée func (qui doit être appelable sans arguments). La tâche se termine lorsque cette fonction retourne. La tâche s'exécutera dans l'"âge du monde" du parent au moment de la construction lors de la planification schedule.
Par défaut, les tâches auront le bit collant défini sur true t.sticky. Cela modélise le comportement par défaut historique pour @async. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn, définissez manuellement le bit collant sur false.
Exemples
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);Dans cet exemple, b est une Task exécutable qui n'a pas encore commencé.
Base.@task — Macro@taskEnveloppez une expression dans une Task sans l'exécuter, et renvoyez la Task. Cela crée uniquement une tâche, et ne l'exécute pas.
Par défaut, les tâches auront le bit collant défini sur true t.sticky. Cela modélise le comportement par défaut historique pour @async. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn, définissez manuellement le bit collant sur false.
Exemples
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.@async — Macro@asyncEnveloppez une expression dans une Task et ajoutez-la à la file d'attente du planificateur de la machine locale.
Les valeurs peuvent être interpolées dans @async via $, ce qui copie la valeur directement dans la fermeture sous-jacente construite. Cela vous permet d'insérer la valeur d'une variable, isolant le code asynchrone des changements de la valeur de la variable dans la tâche actuelle.
Il est fortement recommandé de privilégier Threads.@spawn plutôt que @async toujours même lorsque le parallélisme n'est pas requis, en particulier dans les bibliothèques distribuées publiquement. Cela est dû au fait qu'une utilisation de @async désactive la migration de la tâche parent à travers les threads de travail dans l'implémentation actuelle de Julia. Ainsi, une utilisation apparemment innocente de @async dans une fonction de bibliothèque peut avoir un impact important sur les performances de parties très différentes des applications des utilisateurs.
L'interpolation des valeurs via $ est disponible depuis Julia 1.4.
Base.asyncmap — Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)Utilise plusieurs tâches concurrentes pour appliquer f sur une collection (ou plusieurs collections de même longueur). Pour plusieurs arguments de collection, f est appliqué élément par élément.
ntasks spécifie le nombre de tâches à exécuter de manière concurrente. En fonction de la longueur des collections, si ntasks n'est pas spécifié, jusqu'à 100 tâches seront utilisées pour le mappage concurrent.
ntasks peut également être spécifié comme une fonction sans argument. Dans ce cas, le nombre de tâches à exécuter en parallèle est vérifié avant le traitement de chaque élément et une nouvelle tâche est démarrée si la valeur de ntasks_func est supérieure au nombre actuel de tâches.
Si batch_size est spécifié, la collection est traitée en mode batch. f doit alors être une fonction qui doit accepter un Vector de tuples d'arguments et doit retourner un vecteur de résultats. Le vecteur d'entrée aura une longueur de batch_size ou moins.
Les exemples suivants mettent en évidence l'exécution dans différentes tâches en retournant l'objectid des tâches dans lesquelles la fonction de mappage est exécutée.
Tout d'abord, avec ntasks indéfini, chaque élément est traité dans une tâche différente.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5Avec ntasks=2, tous les éléments sont traités dans 2 tâches.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2Avec batch_size défini, la fonction de mappage doit être modifiée pour accepter un tableau de tuples d'arguments et retourner un tableau de résultats. map est utilisé dans la fonction de mappage modifiée pour y parvenir.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"Base.asyncmap! — Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)Comme asyncmap, mais stocke la sortie dans results plutôt que de renvoyer une collection.
Le comportement peut être inattendu lorsque tout argument muté partage de la mémoire avec un autre argument.
Base.current_task — Functioncurrent_task()Obtenez la Task en cours d'exécution.
Base.istaskdone — Functionistaskdone(t::Task) -> BoolDéterminez si une tâche a quitté.
Exemples
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.istaskstarted — Functionistaskstarted(t::Task) -> BoolDéterminez si une tâche a commencé à s'exécuter.
Exemples
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
falseBase.istaskfailed — Functionistaskfailed(t::Task) -> BoolDétermine si une tâche a quitté en raison d'une exception lancée.
Exemples
julia> a4() = error("tâche échouée");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
trueCette fonction nécessite au moins Julia 1.3.
Base.task_local_storage — Methodtask_local_storage(key)Recherchez la valeur d'une clé dans le stockage local de la tâche actuelle.
Base.task_local_storage — Methodtask_local_storage(key, value)Attribuer une valeur à une clé dans le stockage local de la tâche actuelle.
Base.task_local_storage — Methodtask_local_storage(body, key, value)Appelle la fonction body avec un stockage local de tâche modifié, dans lequel value est assigné à key; la valeur précédente de key, ou son absence, est restaurée par la suite. Utile pour émuler la portée dynamique.
Scheduling
Base.yield — Functionyield()Passez au planificateur pour permettre à une autre tâche planifiée de s'exécuter. Une tâche qui appelle cette fonction est toujours exécutable et sera redémarrée immédiatement s'il n'y a pas d'autres tâches exécutables.
yield(t::Task, arg = nothing)Une version rapide et de planification injuste de schedule(t, arg); yield() qui cède immédiatement à t avant d'appeler le planificateur.
Base.yieldto — Functionyieldto(t::Task, arg = nothing)Bascule vers la tâche donnée. La première fois qu'une tâche est basculée, la fonction de la tâche est appelée sans arguments. Lors des bascules suivantes, arg est renvoyé de l'appel précédent de la tâche à yieldto. C'est un appel de bas niveau qui ne fait que basculer les tâches, sans tenir compte des états ou de la planification de quelque manière que ce soit. Son utilisation est déconseillée.
Base.sleep — Functionsleep(seconds)Bloque la tâche actuelle pendant un nombre spécifié de secondes. Le temps de sommeil minimum est de 1 milliseconde ou une entrée de 0.001.
Base.schedule — Functionschedule(t::Task, [val]; error=false)Ajoute une Task à la file d'attente du planificateur. Cela fait en sorte que la tâche s'exécute constamment lorsque le système est autrement inactif, à moins que la tâche n'effectue une opération bloquante telle que wait.
Si un deuxième argument val est fourni, il sera passé à la tâche (via la valeur de retour de yieldto) lorsqu'elle s'exécute à nouveau. Si error est true, la valeur est levée comme une exception dans la tâche réveillée.
Il est incorrect d'utiliser schedule sur une Task arbitraire qui a déjà été démarrée. Voir la référence API pour plus d'informations.
Par défaut, les tâches auront le bit collant défini sur vrai t.sticky. Cela modélise le comportement par défaut historique pour @async. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn, définissez manuellement le bit collant sur false.
Exemples
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
trueSynchronization
Base.errormonitor — Functionerrormonitor(t::Task)Imprime un journal d'erreurs dans stderr si la tâche t échoue.
Exemples
julia> Base._wait(errormonitor(Threads.@spawn error("la tâche a échoué")))
Erreur de tâche non gérée : la tâche a échoué
Trace de la pile :
[...]Base.@sync — Macro@syncAttendez que toutes les utilisations lexicalement enfermées de @async, @spawn, Distributed.@spawnat et Distributed.@distributed soient terminées. Toutes les exceptions levées par les opérations asynchrones enfermées sont collectées et levées sous la forme d'une CompositeException.
Exemples
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2Base.wait — FunctionNote spéciale pour Threads.Condition :
L'appelant doit détenir le lock qui possède un Threads.Condition avant d'appeler cette méthode. La tâche appelante sera bloquée jusqu'à ce qu'une autre tâche la réveille, généralement en appelant notify sur le même objet Threads.Condition. Le verrou sera libéré de manière atomique lors du blocage (même s'il était verrouillé de manière récursive) et sera réacquis avant le retour.
wait(r::Future)Attendez qu'une valeur soit disponible pour le Future spécifié.
wait(r::RemoteChannel, args...)Attendez qu'une valeur soit disponible sur le RemoteChannel spécifié.
wait([x])Bloque la tâche actuelle jusqu'à ce qu'un événement se produise, en fonction du type de l'argument :
Channel: Attendre qu'une valeur soit ajoutée au canal.Condition: Attendrenotifysur une condition et retourner le paramètrevalpassé ànotify. Attendre sur une condition permet également de passerfirst=true, ce qui fait que le wait est mis en premier dans la file d'attente pour se réveiller surnotifyau lieu du comportement habituel premier arrivé, premier servi.Process: Attendre qu'un processus ou une chaîne de processus se termine. Le champexitcoded'un processus peut être utilisé pour déterminer le succès ou l'échec.Task: Attendre qu'uneTaskse termine. Si la tâche échoue avec une exception, uneTaskFailedException(qui enveloppe la tâche échouée) est levée.RawFD: Attendre des changements sur un descripteur de fichier (voir le packageFileWatching).
Si aucun argument n'est passé, la tâche se bloque pour une période indéfinie. Une tâche ne peut être redémarrée que par un appel explicite à schedule ou yieldto.
Souvent, wait est appelé dans une boucle while pour s'assurer qu'une condition attendue est remplie avant de continuer.
wait(c::Channel)Bloque jusqu'à ce que le Channel isready.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # la tâche est bloquée car le canal n'est pas prêt
false
julia> put!(c, 1);
julia> istaskdone(task) # la tâche n'est plus bloquée
trueBase.fetch — Methodfetch(t::Task)Attendez qu'une Task se termine, puis renvoyez sa valeur de résultat. Si la tâche échoue avec une exception, une TaskFailedException (qui enveloppe la tâche échouée) est levée.
Base.fetch — Methodfetch(x::Any)Retourne x.
Base.timedwait — Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)Attendez jusqu'à ce que testcb() retourne true ou que timeout secondes se soient écoulées, selon ce qui se produit en premier. La fonction de test est interrogée toutes les pollint secondes. La valeur minimale pour pollint est de 0,001 secondes, c'est-à-dire 1 milliseconde.
Retourne :ok ou :timed_out.
Exemples
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:okBase.Condition — TypeCondition()Créez une source d'événements déclenchée par un bord sur laquelle les tâches peuvent attendre. Les tâches qui appellent wait sur un Condition sont suspendues et mises en file d'attente. Les tâches sont réveillées lorsque notify est appelé plus tard sur le Condition. Attendre sur une condition peut renvoyer une valeur ou lever une erreur si les arguments optionnels de notify sont utilisés. Le déclenchement par bord signifie que seules les tâches en attente au moment où notify est appelé peuvent être réveillées. Pour des notifications déclenchées par niveau, vous devez conserver un état supplémentaire pour suivre si une notification a eu lieu. Les types Channel et Threads.Event le font et peuvent être utilisés pour des événements déclenchés par niveau.
Cet objet n'est PAS sûr pour les threads. Voir Threads.Condition pour une version sûre pour les threads.
Base.Threads.Condition — TypeThreads.Condition([lock])Une version thread-safe de Base.Condition.
Pour appeler wait ou notify sur un Threads.Condition, vous devez d'abord appeler lock dessus. Lorsque wait est appelé, le verrou est libéré de manière atomique pendant le blocage, et sera réacquis avant que wait ne retourne. Par conséquent, l'utilisation idiomatique d'un Threads.Condition c ressemble à ce qui suit :
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
endCette fonctionnalité nécessite au moins Julia 1.2.
Base.Event — TypeEvent([autoreset=false])Créez une source d'événement à déclenchement de niveau. Les tâches qui appellent wait sur un Event sont suspendues et mises en file d'attente jusqu'à ce que notify soit appelé sur l'Event. Après que notify a été appelé, l'Event reste dans un état signalé et les tâches ne bloqueront plus en attendant, jusqu'à ce que reset soit appelé.
Si autoreset est vrai, au maximum une tâche sera libérée de wait pour chaque appel à notify.
Cela fournit un ordre de mémoire d'acquisition et de libération sur notify/wait.
Cette fonctionnalité nécessite au moins Julia 1.1.
La fonctionnalité autoreset et la garantie d'ordre de mémoire nécessitent au moins Julia 1.8.
Base.notify — Functionnotify(condition, val=nothing; all=true, error=false)Réveille les tâches en attente d'une condition, en leur passant val. Si all est true (par défaut), toutes les tâches en attente sont réveillées, sinon une seule l'est. Si error est true, la valeur passée est levée comme une exception dans les tâches réveillées.
Retourne le nombre de tâches réveillées. Retourne 0 si aucune tâche n'attend sur condition.
Base.reset — Methodreset(::Event)Réinitialise un Event dans un état non défini. Ensuite, tout appel futur à wait bloquera jusqu'à ce que notify soit appelé à nouveau.
Base.Semaphore — TypeSemaphore(sem_size)Créez un sémaphore de comptage qui permet au maximum sem_size acquisitions d'être utilisées à tout moment. Chaque acquisition doit être assortie d'une libération.
Cela fournit un ordre de mémoire d'acquisition et de libération sur les appels d'acquisition/libération.
Base.acquire — Functionacquire(s::Semaphore)Attendez qu'un des permis sem_size soit disponible, en bloquant jusqu'à ce qu'un puisse être acquis.
acquire(f, s::Semaphore)Exécute f après avoir acquis le sémaphore s, et release à la fin ou en cas d'erreur.
Par exemple, une forme de bloc do qui garantit que seulement 2 appels de foo seront actifs en même temps :
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
endCette méthode nécessite au moins Julia 1.8.
Base.release — Functionrelease(s::Semaphore)Retourne un permis au pool, permettant éventuellement à une autre tâche de l'acquérir et de reprendre l'exécution.
Base.AbstractLock — TypeAbstractLockSupertype abstrait décrivant les types qui implémentent les primitives de synchronisation : lock, trylock, unlock et islocked.
Base.lock — Functionlock(lock)Acquérir le lock lorsqu'il devient disponible. Si le verrou est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible.
Chaque lock doit être associé à un unlock.
lock(f::Function, lock)Acquérir le lock, exécuter f avec le lock maintenu, et libérer le lock lorsque f retourne. Si le lock est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible.
Lorsque cette fonction retourne, le lock a été libéré, donc l'appelant ne doit pas tenter de le unlock.
Voir aussi : @lock.
Utiliser un Channel comme deuxième argument nécessite Julia 1.7 ou version ultérieure.
lock(f::Function, l::Lockable)
Acquérir le verrou associé à l, exécuter f avec le verrou détenu, et libérer le verrou lorsque f retourne. f recevra un argument positionnel : la valeur encapsulée par l. Si le verrou est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible. Lorsque cette fonction retourne, le lock a été libéré, donc l'appelant ne doit pas tenter de unlock le verrou.
Nécessite au moins Julia 1.11.
Base.unlock — Functionunlock(lock)Libère la propriété du lock.
Si c'est un verrou récursif qui a été acquis auparavant, décrémentez un compteur interne et retournez immédiatement.
Base.trylock — Functiontrylock(lock) -> Succès (Booléen)Acquérir le verrou s'il est disponible et retourner true si réussi. Si le verrou est déjà verrouillé par une autre tâche/fil, retourner false.
Chaque trylock réussi doit être associé à un unlock.
La fonction trylock combinée avec islocked peut être utilisée pour écrire les algorithmes de test-et-test-et-set ou de retour exponentiel si cela est supporté par le typeof(lock) (lisez sa documentation).
Base.islocked — Functionislocked(lock) -> Statut (Booléen)Vérifiez si le lock est détenu par une tâche/fil. Cette fonction seule ne doit pas être utilisée pour la synchronisation. Cependant, islocked combiné avec trylock peut être utilisé pour écrire les algorithmes de test-et-test-et-set ou de retour exponentiel si cela est supporté par le typeof(lock) (lisez sa documentation).
Aide étendue
Par exemple, un retour exponentiel peut être implémenté comme suit si l'implémentation du lock satisfait les propriétés documentées ci-dessous.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
endImplémentation
Il est conseillé à une implémentation de lock de définir islocked avec les propriétés suivantes et de le noter dans sa docstring.
islocked(lock)est sans course de données.- Si
islocked(lock)retournefalse, une invocation immédiate detrylock(lock)doit réussir (retournetrue) s'il n'y a pas d'interférence d'autres tâches.
Base.ReentrantLock — TypeReentrantLock()Crée un verrou réentrant pour synchroniser les Tasks. La même tâche peut acquérir le verrou autant de fois que nécessaire (c'est ce que signifie la partie "Réentrant" du nom). Chaque lock doit être associé à un unlock.
Appeler lock inhibera également l'exécution des finaliseurs sur ce thread jusqu'au unlock correspondant. L'utilisation du modèle de verrouillage standard illustré ci-dessous devrait être naturellement supportée, mais attention à inverser l'ordre try/lock ou à manquer complètement le bloc try (par exemple, tenter de retourner avec le verrou toujours détenu) :
Cela fournit un ordre de mémoire d'acquisition/libération sur les appels lock/unlock.
lock(l)
try
<travail atomique>
finally
unlock(l)
endSi !islocked(lck::ReentrantLock) est vrai, trylock(lck) réussit à moins qu'il y ait d'autres tâches tentant de détenir le verrou "en même temps."
Base.@lock — Macro@lock l exprVersion macro de lock(f, l::AbstractLock) mais avec expr au lieu de la fonction f. Se développe en :
lock(l)
try
expr
finally
unlock(l)
endC'est similaire à l'utilisation de lock avec un bloc do, mais évite de créer une fermeture et peut donc améliorer les performances.
@lock a été ajouté dans Julia 1.3 et exporté dans Julia 1.10.
Base.Lockable — TypeLockable(value, lock = ReentrantLock())
Crée un objet Lockable qui enveloppe value et l'associe au lock fourni. Cet objet prend en charge @lock, lock, trylock, unlock. Pour accéder à la valeur, indexez l'objet verrouillable tout en maintenant le verrou.
Nécessite au moins Julia 1.11.
Exemple
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # doit maintenir le verrou pour accéder à la valeur
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"Channels
Base.AbstractChannel — TypeAbstractChannel{T}Représentation d'un canal passant des objets de type T.
Base.Channel — TypeChannel{T=Any}(size::Int=0)Construit un Channel avec un tampon interne pouvant contenir un maximum de size objets de type T. put! appelle sur un canal plein bloque jusqu'à ce qu'un objet soit retiré avec take!.
Channel(0) construit un canal non tamponné. put! bloque jusqu'à ce qu'un take! correspondant soit appelé. Et vice-versa.
Autres constructeurs :
Channel(): constructeur par défaut, équivalent àChannel{Any}(0)Channel(Inf): équivalent àChannel{Any}(typemax(Int))Channel(sz): équivalent àChannel{Any}(sz)
Le constructeur par défaut Channel() et le size=0 par défaut ont été ajoutés dans Julia 1.3.
Base.Channel — MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)Créez une nouvelle tâche à partir de func, liez-la à un nouveau canal de type T et de taille size, et planifiez la tâche, le tout en un seul appel. Le canal est automatiquement fermé lorsque la tâche se termine.
func doit accepter le canal lié comme son seul argument.
Si vous avez besoin d'une référence à la tâche créée, passez un objet Ref{Task} via l'argument clé taskref.
Si spawn=true, la Task créée pour func peut être planifiée sur un autre thread en parallèle, équivalent à la création d'une tâche via Threads.@spawn.
Si spawn=true et que l'argument threadpool n'est pas défini, il par défaut à :default.
Si l'argument threadpool est défini (à :default ou :interactive), cela implique que spawn=true et la nouvelle tâche est lancée dans le threadpool spécifié.
Retourne un Channel.
Exemples
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4Référencer la tâche créée :
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
trueLe paramètre spawn= a été ajouté dans Julia 1.3. Ce constructeur a été ajouté dans Julia 1.3. Dans les versions antérieures de Julia, Channel utilisait des arguments clés pour définir size et T, mais ces constructeurs sont obsolètes.
L'argument threadpool= a été ajouté dans Julia 1.9.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"Base.put! — Methodput!(c::Channel, v)Ajoute un élément v au canal c. Bloque si le canal est plein.
Pour les canaux non tamponnés, bloque jusqu'à ce qu'un take! soit effectué par une autre tâche.
v est maintenant converti au type du canal avec convert lorsque put! est appelé.
Base.take! — Methodtake!(c::Channel)Supprime et renvoie une valeur d'un Channel dans l'ordre. Bloque jusqu'à ce que des données soient disponibles. Pour les canaux non tamponnés, bloque jusqu'à ce qu'un put! soit effectué par une autre tâche.
Exemples
Canal tamponné :
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1Canal non tamponné :
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1Base.isready — Methodisready(c::Channel)Détermine si un Channel a une valeur stockée en lui. Retourne immédiatement, ne bloque pas.
Pour les canaux non tamponnés, retourne true s'il y a des tâches en attente sur un put!.
Exemples
Canal tamponné :
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
trueCanal non tamponné :
julia> c = Channel();
julia> isready(c) # aucune tâche en attente pour mettre !
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # planifier une tâche put !
julia> isready(c)
trueBase.fetch — Methodfetch(c::Channel)Attend et retourne (sans supprimer) le premier élément disponible du Channel. Remarque : fetch n'est pas pris en charge sur un Channel non tamponné (taille 0).
Exemples
Channel tamponné :
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # l'élément n'est pas supprimé
3-element Vector{Any}:
1
2
3Base.close — Methodclose(c::Channel[, excp::Exception])Ferme un canal. Une exception (facultativement donnée par excp) est levée par :
Base.bind — Methodbind(chnl::Channel, task::Task)Associe la durée de vie de chnl à une tâche. Le Channel chnl est automatiquement fermé lorsque la tâche se termine. Toute exception non interceptée dans la tâche est propagée à tous les attendus sur chnl.
L'objet chnl peut être explicitement fermé indépendamment de la terminaison de la tâche. Les tâches terminées n'ont aucun effet sur les objets Channel déjà fermés.
Lorsqu'un canal est lié à plusieurs tâches, la première tâche à se terminer fermera le canal. Lorsque plusieurs canaux sont liés à la même tâche, la terminaison de la tâche fermera tous les canaux liés.
Exemples
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
falsejulia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]Low-level synchronization using schedule and wait
L'utilisation correcte la plus simple de schedule est sur une Task qui n'est pas encore commencée (planifiée). Cependant, il est possible d'utiliser 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 et wait comme un élément de base très bas niveau pour construire des interfaces de synchronisation. Une condition préalable cruciale pour appeler schedule(task) est que l'appelant doit "posséder" la task ; c'est-à-dire qu'il doit savoir que l'appel à wait dans la task donnée se produit aux emplacements connus du code appelant schedule(task). Une stratégie pour garantir une telle condition préalable est d'utiliser des atomiques, comme démontré dans l'exemple suivant :
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
doneOneWayEvent permet à une tâche d'attendre la notification d'une autre tâche. C'est une interface de communication limitée puisque attendre ne peut être utilisé qu'une seule fois par une tâche unique (notez l'assignation non atomique de ev.task)
Dans cet exemple, notify(ev::OneWayEvent) est autorisé à appeler schedule(ev.task) si et seulement si il modifie l'état de OWE_WAITING à OWE_NOTIFYING. Cela nous permet de savoir que la tâche exécutant wait(ev::OneWayEvent) est maintenant dans la branche ok et qu'il ne peut pas y avoir d'autres tâches qui essaient de schedule(ev.task) puisque leur @atomicreplace(ev.state, state => OWE_NOTIFYING) échouera.