Tasks
Core.Task
— TypeTask(func)
Créez une Task
(c'est-à-dire une coroutine) pour exécuter la fonction donnée func
(qui doit être appelable sans arguments). La tâche se termine lorsque cette fonction retourne. La tâche s'exécutera dans l'"âge du monde" du parent au moment de la construction lors de la planification schedule
.
Par défaut, les tâches auront le bit collant défini sur true t.sticky
. Cela modélise le comportement par défaut historique pour @async
. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn
, définissez manuellement le bit collant sur false
.
Exemples
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
Dans cet exemple, b
est une Task
exécutable qui n'a pas encore commencé.
Base.@task
— Macro@task
Enveloppez une expression dans une Task
sans l'exécuter, et renvoyez la Task
. Cela crée uniquement une tâche, et ne l'exécute pas.
Par défaut, les tâches auront le bit collant défini sur true t.sticky
. Cela modélise le comportement par défaut historique pour @async
. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn
, définissez manuellement le bit collant sur false
.
Exemples
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
Enveloppez une expression dans une Task
et ajoutez-la à la file d'attente du planificateur de la machine locale.
Les valeurs peuvent être interpolées dans @async
via $
, ce qui copie la valeur directement dans la fermeture sous-jacente construite. Cela vous permet d'insérer la valeur d'une variable, isolant le code asynchrone des changements de la valeur de la variable dans la tâche actuelle.
Il est fortement recommandé de privilégier Threads.@spawn
plutôt que @async
toujours même lorsque le parallélisme n'est pas requis, en particulier dans les bibliothèques distribuées publiquement. Cela est dû au fait qu'une utilisation de @async
désactive la migration de la tâche parent à travers les threads de travail dans l'implémentation actuelle de Julia. Ainsi, une utilisation apparemment innocente de @async
dans une fonction de bibliothèque peut avoir un impact important sur les performances de parties très différentes des applications des utilisateurs.
L'interpolation des valeurs via $
est disponible depuis Julia 1.4.
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
Utilise plusieurs tâches concurrentes pour appliquer f
sur une collection (ou plusieurs collections de même longueur). Pour plusieurs arguments de collection, f
est appliqué élément par élément.
ntasks
spécifie le nombre de tâches à exécuter de manière concurrente. En fonction de la longueur des collections, si ntasks
n'est pas spécifié, jusqu'à 100 tâches seront utilisées pour le mappage concurrent.
ntasks
peut également être spécifié comme une fonction sans argument. Dans ce cas, le nombre de tâches à exécuter en parallèle est vérifié avant le traitement de chaque élément et une nouvelle tâche est démarrée si la valeur de ntasks_func
est supérieure au nombre actuel de tâches.
Si batch_size
est spécifié, la collection est traitée en mode batch. f
doit alors être une fonction qui doit accepter un Vector
de tuples d'arguments et doit retourner un vecteur de résultats. Le vecteur d'entrée aura une longueur de batch_size
ou moins.
Les exemples suivants mettent en évidence l'exécution dans différentes tâches en retournant l'objectid
des tâches dans lesquelles la fonction de mappage est exécutée.
Tout d'abord, avec ntasks
indéfini, chaque élément est traité dans une tâche différente.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
Avec ntasks=2
, tous les éléments sont traités dans 2 tâches.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
Avec batch_size
défini, la fonction de mappage doit être modifiée pour accepter un tableau de tuples d'arguments et retourner un tableau de résultats. map
est utilisé dans la fonction de mappage modifiée pour y parvenir.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
Comme asyncmap
, mais stocke la sortie dans results
plutôt que de renvoyer une collection.
Le comportement peut être inattendu lorsque tout argument muté partage de la mémoire avec un autre argument.
Base.current_task
— Functioncurrent_task()
Obtenez la Task
en cours d'exécution.
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
Déterminez si une tâche a quitté.
Exemples
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
Déterminez si une tâche a commencé à s'exécuter.
Exemples
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
Détermine si une tâche a quitté en raison d'une exception lancée.
Exemples
julia> a4() = error("tâche échouée");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Cette fonction nécessite au moins Julia 1.3.
Base.task_local_storage
— Methodtask_local_storage(key)
Recherchez la valeur d'une clé dans le stockage local de la tâche actuelle.
Base.task_local_storage
— Methodtask_local_storage(key, value)
Attribuer une valeur à une clé dans le stockage local de la tâche actuelle.
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
Appelle la fonction body
avec un stockage local de tâche modifié, dans lequel value
est assigné à key
; la valeur précédente de key
, ou son absence, est restaurée par la suite. Utile pour émuler la portée dynamique.
Scheduling
Base.yield
— Functionyield()
Passez au planificateur pour permettre à une autre tâche planifiée de s'exécuter. Une tâche qui appelle cette fonction est toujours exécutable et sera redémarrée immédiatement s'il n'y a pas d'autres tâches exécutables.
yield(t::Task, arg = nothing)
Une version rapide et de planification injuste de schedule(t, arg); yield()
qui cède immédiatement à t
avant d'appeler le planificateur.
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
Bascule vers la tâche donnée. La première fois qu'une tâche est basculée, la fonction de la tâche est appelée sans arguments. Lors des bascules suivantes, arg
est renvoyé de l'appel précédent de la tâche à yieldto
. C'est un appel de bas niveau qui ne fait que basculer les tâches, sans tenir compte des états ou de la planification de quelque manière que ce soit. Son utilisation est déconseillée.
Base.sleep
— Functionsleep(seconds)
Bloque la tâche actuelle pendant un nombre spécifié de secondes. Le temps de sommeil minimum est de 1 milliseconde ou une entrée de 0.001
.
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
Ajoute une Task
à la file d'attente du planificateur. Cela fait en sorte que la tâche s'exécute constamment lorsque le système est autrement inactif, à moins que la tâche n'effectue une opération bloquante telle que wait
.
Si un deuxième argument val
est fourni, il sera passé à la tâche (via la valeur de retour de yieldto
) lorsqu'elle s'exécute à nouveau. Si error
est true
, la valeur est levée comme une exception dans la tâche réveillée.
Il est incorrect d'utiliser schedule
sur une Task
arbitraire qui a déjà été démarrée. Voir la référence API pour plus d'informations.
Par défaut, les tâches auront le bit collant défini sur vrai t.sticky
. Cela modélise le comportement par défaut historique pour @async
. Les tâches collantes ne peuvent être exécutées que sur le fil de travail sur lequel elles ont été d'abord planifiées, et lorsqu'elles sont planifiées, elles rendront la tâche à partir de laquelle elles ont été planifiées collante. Pour obtenir le comportement de Threads.@spawn
, définissez manuellement le bit collant sur false
.
Exemples
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
Imprime un journal d'erreurs dans stderr
si la tâche t
échoue.
Exemples
julia> Base._wait(errormonitor(Threads.@spawn error("la tâche a échoué")))
Erreur de tâche non gérée : la tâche a échoué
Trace de la pile :
[...]
Base.@sync
— Macro@sync
Attendez que toutes les utilisations lexicalement enfermées de @async
, @spawn
, Distributed.@spawnat
et Distributed.@distributed
soient terminées. Toutes les exceptions levées par les opérations asynchrones enfermées sont collectées et levées sous la forme d'une CompositeException
.
Exemples
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— FunctionNote spéciale pour Threads.Condition
:
L'appelant doit détenir le lock
qui possède un Threads.Condition
avant d'appeler cette méthode. La tâche appelante sera bloquée jusqu'à ce qu'une autre tâche la réveille, généralement en appelant notify
sur le même objet Threads.Condition
. Le verrou sera libéré de manière atomique lors du blocage (même s'il était verrouillé de manière récursive) et sera réacquis avant le retour.
wait(r::Future)
Attendez qu'une valeur soit disponible pour le Future
spécifié.
wait(r::RemoteChannel, args...)
Attendez qu'une valeur soit disponible sur le RemoteChannel
spécifié.
wait([x])
Bloque la tâche actuelle jusqu'à ce qu'un événement se produise, en fonction du type de l'argument :
Channel
: Attendre qu'une valeur soit ajoutée au canal.Condition
: Attendrenotify
sur une condition et retourner le paramètreval
passé ànotify
. Attendre sur une condition permet également de passerfirst=true
, ce qui fait que le wait est mis en premier dans la file d'attente pour se réveiller surnotify
au lieu du comportement habituel premier arrivé, premier servi.Process
: Attendre qu'un processus ou une chaîne de processus se termine. Le champexitcode
d'un processus peut être utilisé pour déterminer le succès ou l'échec.Task
: Attendre qu'uneTask
se termine. Si la tâche échoue avec une exception, uneTaskFailedException
(qui enveloppe la tâche échouée) est levée.RawFD
: Attendre des changements sur un descripteur de fichier (voir le packageFileWatching
).
Si aucun argument n'est passé, la tâche se bloque pour une période indéfinie. Une tâche ne peut être redémarrée que par un appel explicite à schedule
ou yieldto
.
Souvent, wait
est appelé dans une boucle while
pour s'assurer qu'une condition attendue est remplie avant de continuer.
wait(c::Channel)
Bloque jusqu'à ce que le Channel
isready
.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # la tâche est bloquée car le canal n'est pas prêt
false
julia> put!(c, 1);
julia> istaskdone(task) # la tâche n'est plus bloquée
true
Base.fetch
— Methodfetch(t::Task)
Attendez qu'une Task
se termine, puis renvoyez sa valeur de résultat. Si la tâche échoue avec une exception, une TaskFailedException
(qui enveloppe la tâche échouée) est levée.
Base.fetch
— Methodfetch(x::Any)
Retourne x
.
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
Attendez jusqu'à ce que testcb()
retourne true
ou que timeout
secondes se soient écoulées, selon ce qui se produit en premier. La fonction de test est interrogée toutes les pollint
secondes. La valeur minimale pour pollint
est de 0,001 secondes, c'est-à-dire 1 milliseconde.
Retourne :ok
ou :timed_out
.
Exemples
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
Créez une source d'événements déclenchée par un bord sur laquelle les tâches peuvent attendre. Les tâches qui appellent wait
sur un Condition
sont suspendues et mises en file d'attente. Les tâches sont réveillées lorsque notify
est appelé plus tard sur le Condition
. Attendre sur une condition peut renvoyer une valeur ou lever une erreur si les arguments optionnels de notify
sont utilisés. Le déclenchement par bord signifie que seules les tâches en attente au moment où notify
est appelé peuvent être réveillées. Pour des notifications déclenchées par niveau, vous devez conserver un état supplémentaire pour suivre si une notification a eu lieu. Les types Channel
et Threads.Event
le font et peuvent être utilisés pour des événements déclenchés par niveau.
Cet objet n'est PAS sûr pour les threads. Voir Threads.Condition
pour une version sûre pour les threads.
Base.Threads.Condition
— TypeThreads.Condition([lock])
Une version thread-safe de Base.Condition
.
Pour appeler wait
ou notify
sur un Threads.Condition
, vous devez d'abord appeler lock
dessus. Lorsque wait
est appelé, le verrou est libéré de manière atomique pendant le blocage, et sera réacquis avant que wait
ne retourne. Par conséquent, l'utilisation idiomatique d'un Threads.Condition
c
ressemble à ce qui suit :
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
Cette fonctionnalité nécessite au moins Julia 1.2.
Base.Event
— TypeEvent([autoreset=false])
Créez une source d'événement à déclenchement de niveau. Les tâches qui appellent wait
sur un Event
sont suspendues et mises en file d'attente jusqu'à ce que notify
soit appelé sur l'Event
. Après que notify
a été appelé, l'Event
reste dans un état signalé et les tâches ne bloqueront plus en attendant, jusqu'à ce que reset
soit appelé.
Si autoreset
est vrai, au maximum une tâche sera libérée de wait
pour chaque appel à notify
.
Cela fournit un ordre de mémoire d'acquisition et de libération sur notify/wait.
Cette fonctionnalité nécessite au moins Julia 1.1.
La fonctionnalité autoreset
et la garantie d'ordre de mémoire nécessitent au moins Julia 1.8.
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
Réveille les tâches en attente d'une condition, en leur passant val
. Si all
est true
(par défaut), toutes les tâches en attente sont réveillées, sinon une seule l'est. Si error
est true
, la valeur passée est levée comme une exception dans les tâches réveillées.
Retourne le nombre de tâches réveillées. Retourne 0 si aucune tâche n'attend sur condition
.
Base.reset
— Methodreset(::Event)
Réinitialise un Event
dans un état non défini. Ensuite, tout appel futur à wait
bloquera jusqu'à ce que notify
soit appelé à nouveau.
Base.Semaphore
— TypeSemaphore(sem_size)
Créez un sémaphore de comptage qui permet au maximum sem_size
acquisitions d'être utilisées à tout moment. Chaque acquisition doit être assortie d'une libération.
Cela fournit un ordre de mémoire d'acquisition et de libération sur les appels d'acquisition/libération.
Base.acquire
— Functionacquire(s::Semaphore)
Attendez qu'un des permis sem_size
soit disponible, en bloquant jusqu'à ce qu'un puisse être acquis.
acquire(f, s::Semaphore)
Exécute f
après avoir acquis le sémaphore s
, et release
à la fin ou en cas d'erreur.
Par exemple, une forme de bloc do qui garantit que seulement 2 appels de foo
seront actifs en même temps :
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Cette méthode nécessite au moins Julia 1.8.
Base.release
— Functionrelease(s::Semaphore)
Retourne un permis au pool, permettant éventuellement à une autre tâche de l'acquérir et de reprendre l'exécution.
Base.AbstractLock
— TypeAbstractLock
Supertype abstrait décrivant les types qui implémentent les primitives de synchronisation : lock
, trylock
, unlock
et islocked
.
Base.lock
— Functionlock(lock)
Acquérir le lock
lorsqu'il devient disponible. Si le verrou est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible.
Chaque lock
doit être associé à un unlock
.
lock(f::Function, lock)
Acquérir le lock
, exécuter f
avec le lock
maintenu, et libérer le lock
lorsque f
retourne. Si le lock est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible.
Lorsque cette fonction retourne, le lock
a été libéré, donc l'appelant ne doit pas tenter de le unlock
.
Voir aussi : @lock
.
Utiliser un Channel
comme deuxième argument nécessite Julia 1.7 ou version ultérieure.
lock(f::Function, l::Lockable)
Acquérir le verrou associé à l
, exécuter f
avec le verrou détenu, et libérer le verrou lorsque f
retourne. f
recevra un argument positionnel : la valeur encapsulée par l
. Si le verrou est déjà verrouillé par une autre tâche/fil, attendez qu'il devienne disponible. Lorsque cette fonction retourne, le lock
a été libéré, donc l'appelant ne doit pas tenter de unlock
le verrou.
Nécessite au moins Julia 1.11.
Base.unlock
— Functionunlock(lock)
Libère la propriété du lock
.
Si c'est un verrou récursif qui a été acquis auparavant, décrémentez un compteur interne et retournez immédiatement.
Base.trylock
— Functiontrylock(lock) -> Succès (Booléen)
Acquérir le verrou s'il est disponible et retourner true
si réussi. Si le verrou est déjà verrouillé par une autre tâche/fil, retourner false
.
Chaque trylock
réussi doit être associé à un unlock
.
La fonction trylock
combinée avec islocked
peut être utilisée pour écrire les algorithmes de test-et-test-et-set ou de retour exponentiel si cela est supporté par le typeof(lock)
(lisez sa documentation).
Base.islocked
— Functionislocked(lock) -> Statut (Booléen)
Vérifiez si le lock
est détenu par une tâche/fil. Cette fonction seule ne doit pas être utilisée pour la synchronisation. Cependant, islocked
combiné avec trylock
peut être utilisé pour écrire les algorithmes de test-et-test-et-set ou de retour exponentiel si cela est supporté par le typeof(lock)
(lisez sa documentation).
Aide étendue
Par exemple, un retour exponentiel peut être implémenté comme suit si l'implémentation du lock
satisfait les propriétés documentées ci-dessous.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Implémentation
Il est conseillé à une implémentation de lock de définir islocked
avec les propriétés suivantes et de le noter dans sa docstring.
islocked(lock)
est sans course de données.- Si
islocked(lock)
retournefalse
, une invocation immédiate detrylock(lock)
doit réussir (retournetrue
) s'il n'y a pas d'interférence d'autres tâches.
Base.ReentrantLock
— TypeReentrantLock()
Crée un verrou réentrant pour synchroniser les Task
s. La même tâche peut acquérir le verrou autant de fois que nécessaire (c'est ce que signifie la partie "Réentrant" du nom). Chaque lock
doit être associé à un unlock
.
Appeler lock
inhibera également l'exécution des finaliseurs sur ce thread jusqu'au unlock
correspondant. L'utilisation du modèle de verrouillage standard illustré ci-dessous devrait être naturellement supportée, mais attention à inverser l'ordre try/lock ou à manquer complètement le bloc try (par exemple, tenter de retourner avec le verrou toujours détenu) :
Cela fournit un ordre de mémoire d'acquisition/libération sur les appels lock/unlock.
lock(l)
try
<travail atomique>
finally
unlock(l)
end
Si !islocked(lck::ReentrantLock)
est vrai, trylock(lck)
réussit à moins qu'il y ait d'autres tâches tentant de détenir le verrou "en même temps."
Base.@lock
— Macro@lock l expr
Version macro de lock(f, l::AbstractLock)
mais avec expr
au lieu de la fonction f
. Se développe en :
lock(l)
try
expr
finally
unlock(l)
end
C'est similaire à l'utilisation de lock
avec un bloc do
, mais évite de créer une fermeture et peut donc améliorer les performances.
@lock
a été ajouté dans Julia 1.3 et exporté dans Julia 1.10.
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
Crée un objet Lockable
qui enveloppe value
et l'associe au lock
fourni. Cet objet prend en charge @lock
, lock
, trylock
, unlock
. Pour accéder à la valeur, indexez l'objet verrouillable tout en maintenant le verrou.
Nécessite au moins Julia 1.11.
Exemple
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # doit maintenir le verrou pour accéder à la valeur
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
Représentation d'un canal passant des objets de type T
.
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
Construit un Channel
avec un tampon interne pouvant contenir un maximum de size
objets de type T
. put!
appelle sur un canal plein bloque jusqu'à ce qu'un objet soit retiré avec take!
.
Channel(0)
construit un canal non tamponné. put!
bloque jusqu'à ce qu'un take!
correspondant soit appelé. Et vice-versa.
Autres constructeurs :
Channel()
: constructeur par défaut, équivalent àChannel{Any}(0)
Channel(Inf)
: équivalent àChannel{Any}(typemax(Int))
Channel(sz)
: équivalent àChannel{Any}(sz)
Le constructeur par défaut Channel()
et le size=0
par défaut ont été ajoutés dans Julia 1.3.
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Créez une nouvelle tâche à partir de func
, liez-la à un nouveau canal de type T
et de taille size
, et planifiez la tâche, le tout en un seul appel. Le canal est automatiquement fermé lorsque la tâche se termine.
func
doit accepter le canal lié comme son seul argument.
Si vous avez besoin d'une référence à la tâche créée, passez un objet Ref{Task}
via l'argument clé taskref
.
Si spawn=true
, la Task
créée pour func
peut être planifiée sur un autre thread en parallèle, équivalent à la création d'une tâche via Threads.@spawn
.
Si spawn=true
et que l'argument threadpool
n'est pas défini, il par défaut à :default
.
Si l'argument threadpool
est défini (à :default
ou :interactive
), cela implique que spawn=true
et la nouvelle tâche est lancée dans le threadpool spécifié.
Retourne un Channel
.
Exemples
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Référencer la tâche créée :
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
Le paramètre spawn=
a été ajouté dans Julia 1.3. Ce constructeur a été ajouté dans Julia 1.3. Dans les versions antérieures de Julia, Channel utilisait des arguments clés pour définir size
et T
, mais ces constructeurs sont obsolètes.
L'argument threadpool=
a été ajouté dans Julia 1.9.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
Ajoute un élément v
au canal c
. Bloque si le canal est plein.
Pour les canaux non tamponnés, bloque jusqu'à ce qu'un take!
soit effectué par une autre tâche.
v
est maintenant converti au type du canal avec convert
lorsque put!
est appelé.
Base.take!
— Methodtake!(c::Channel)
Supprime et renvoie une valeur d'un Channel
dans l'ordre. Bloque jusqu'à ce que des données soient disponibles. Pour les canaux non tamponnés, bloque jusqu'à ce qu'un put!
soit effectué par une autre tâche.
Exemples
Canal tamponné :
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Canal non tamponné :
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
Détermine si un Channel
a une valeur stockée en lui. Retourne immédiatement, ne bloque pas.
Pour les canaux non tamponnés, retourne true
s'il y a des tâches en attente sur un put!
.
Exemples
Canal tamponné :
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Canal non tamponné :
julia> c = Channel();
julia> isready(c) # aucune tâche en attente pour mettre !
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # planifier une tâche put !
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
Attend et retourne (sans supprimer) le premier élément disponible du Channel
. Remarque : fetch
n'est pas pris en charge sur un Channel
non tamponné (taille 0).
Exemples
Channel tamponné :
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # l'élément n'est pas supprimé
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
Ferme un canal. Une exception (facultativement donnée par excp
) est levée par :
Base.bind
— Methodbind(chnl::Channel, task::Task)
Associe la durée de vie de chnl
à une tâche. Le Channel
chnl
est automatiquement fermé lorsque la tâche se termine. Toute exception non interceptée dans la tâche est propagée à tous les attendus sur chnl
.
L'objet chnl
peut être explicitement fermé indépendamment de la terminaison de la tâche. Les tâches terminées n'ont aucun effet sur les objets Channel
déjà fermés.
Lorsqu'un canal est lié à plusieurs tâches, la première tâche à se terminer fermera le canal. Lorsque plusieurs canaux sont liés à la même tâche, la terminaison de la tâche fermera tous les canaux liés.
Exemples
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
L'utilisation correcte la plus simple de schedule
est sur une Task
qui n'est pas encore commencée (planifiée). Cependant, il est possible d'utiliser 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
et wait
comme un élément de base très bas niveau pour construire des interfaces de synchronisation. Une condition préalable cruciale pour appeler schedule(task)
est que l'appelant doit "posséder" la task
; c'est-à-dire qu'il doit savoir que l'appel à wait
dans la task
donnée se produit aux emplacements connus du code appelant schedule(task)
. Une stratégie pour garantir une telle condition préalable est d'utiliser des atomiques, comme démontré dans l'exemple suivant :
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
permet à une tâche d'attendre
la notification
d'une autre tâche. C'est une interface de communication limitée puisque attendre
ne peut être utilisé qu'une seule fois par une tâche unique (notez l'assignation non atomique de ev.task
)
Dans cet exemple, notify(ev::OneWayEvent)
est autorisé à appeler schedule(ev.task)
si et seulement si il modifie l'état de OWE_WAITING
à OWE_NOTIFYING
. Cela nous permet de savoir que la tâche exécutant wait(ev::OneWayEvent)
est maintenant dans la branche ok
et qu'il ne peut pas y avoir d'autres tâches qui essaient de schedule(ev.task)
puisque leur @atomicreplace(ev.state, state => OWE_NOTIFYING)
échouera.