Tasks
Core.Task
— TypeTask(func)
Erstellen Sie eine Task
(d.h. Coroutine), um die gegebene Funktion func
auszuführen (die ohne Argumente aufrufbar sein muss). Die Aufgabe endet, wenn diese Funktion zurückkehrt. Die Aufgabe wird im "Weltalter" des Elternteils zur Zeit der Konstruktion ausgeführt, wenn sie schedule
d wird.
Standardmäßig haben Aufgaben das Sticky-Bit auf true t.sticky
gesetzt. Dies modelliert das historische Standardverhalten für @async
. Sticky-Aufgaben können nur auf dem Worker-Thread ausgeführt werden, auf dem sie zuerst geplant wurden, und wenn sie geplant werden, machen sie die Aufgabe, von der sie geplant wurden, sticky. Um das Verhalten von Threads.@spawn
zu erhalten, setzen Sie das Sticky-Bit manuell auf false
.
Beispiele
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
In diesem Beispiel ist b
eine ausführbare Task
, die noch nicht gestartet wurde.
Base.@task
— Macro@task
Wickeln Sie einen Ausdruck in eine Task
ein, ohne ihn auszuführen, und geben Sie die Task
zurück. Dies erstellt nur eine Aufgabe und führt sie nicht aus.
Standardmäßig haben Aufgaben das Sticky-Bit auf true t.sticky
gesetzt. Dies modelliert das historische Standardverhalten für @async
. Sticky-Aufgaben können nur auf dem Worker-Thread ausgeführt werden, auf dem sie zuerst geplant wurden, und beim Planen wird die Aufgabe, von der sie geplant wurden, sticky. Um das Verhalten von Threads.@spawn
zu erhalten, setzen Sie das Sticky-Bit manuell auf false
.
Beispiele
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
Wickeln Sie einen Ausdruck in eine Task
und fügen Sie ihn der Warteschlange des lokalen Maschinenplaners hinzu.
Werte können über $
in @async
interpoliert werden, was den Wert direkt in den konstruierten zugrunde liegenden Closure kopiert. Dies ermöglicht es Ihnen, den Wert einer Variablen einzufügen und den asynchronen Code von Änderungen des Wertes der Variablen in der aktuellen Aufgabe zu isolieren.
Es wird dringend empfohlen, Threads.@spawn
immer @async
vorzuziehen, auch wenn keine Parallelität erforderlich ist, insbesondere in öffentlich verteilten Bibliotheken. Dies liegt daran, dass die Verwendung von @async
die Migration der Eltern-Aufgabe über Arbeits-Threads in der aktuellen Implementierung von Julia deaktiviert. Daher kann die scheinbar harmlose Verwendung von @async
in einer Bibliotheksfunktion erhebliche Auswirkungen auf die Leistung sehr unterschiedlicher Teile von Benutzeranwendungen haben.
Die Interpolation von Werten über $
ist seit Julia 1.4 verfügbar.
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
Verwendet mehrere gleichzeitige Aufgaben, um f
über eine Sammlung (oder mehrere gleich lange Sammlungen) abzubilden. Bei mehreren Sammlungsargumenten wird f
elementweise angewendet.
ntasks
gibt die Anzahl der Aufgaben an, die gleichzeitig ausgeführt werden sollen. Abhängig von der Länge der Sammlungen werden, wenn ntasks
nicht angegeben ist, bis zu 100 Aufgaben für die gleichzeitige Abbildung verwendet.
ntasks
kann auch als Funktion ohne Argumente angegeben werden. In diesem Fall wird die Anzahl der parallel auszuführenden Aufgaben vor der Verarbeitung jedes Elements überprüft, und eine neue Aufgabe wird gestartet, wenn der Wert von ntasks_func
größer ist als die aktuelle Anzahl der Aufgaben.
Wenn batch_size
angegeben ist, wird die Sammlung im Batch-Modus verarbeitet. f
muss dann eine Funktion sein, die ein Vector
von Argument-Tupeln akzeptiert und ein Array von Ergebnissen zurückgibt. Der Eingangsvektor hat eine Länge von batch_size
oder weniger.
Die folgenden Beispiele heben die Ausführung in verschiedenen Aufgaben hervor, indem sie die objectid
der Aufgaben zurückgeben, in denen die Abbildungsfunktion ausgeführt wird.
Zuerst, mit undefiniertem ntasks
, wird jedes Element in einer anderen Aufgabe verarbeitet.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
Mit ntasks=2
werden alle Elemente in 2 Aufgaben verarbeitet.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
Mit definierter batch_size
muss die Abbildungsfunktion geändert werden, um ein Array von Argument-Tupeln zu akzeptieren und ein Array von Ergebnissen zurückzugeben. map
wird in der modifizierten Abbildungsfunktion verwendet, um dies zu erreichen.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
Wie asyncmap
, aber speichert die Ausgabe in results
, anstatt eine Sammlung zurückzugeben.
Das Verhalten kann unerwartet sein, wenn ein verändertes Argument Speicher mit einem anderen Argument teilt.
Base.current_task
— Functioncurrent_task()
Holen Sie sich die aktuell laufende Task
.
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
Bestimmen Sie, ob eine Aufgabe beendet ist.
Beispiele
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
Bestimmen Sie, ob eine Aufgabe mit der Ausführung begonnen hat.
Beispiele
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
Bestimmen Sie, ob eine Aufgabe aufgrund einer Ausnahme beendet wurde.
Beispiele
julia> a4() = error("Aufgabe fehlgeschlagen");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Diese Funktion erfordert mindestens Julia 1.3.
Base.task_local_storage
— Methodtask_local_storage(key)
Suchen Sie den Wert eines Schlüssels im aufgabenlokalen Speicher der aktuellen Aufgabe.
Base.task_local_storage
— Methodtask_local_storage(key, value)
Weisen Sie einen Wert einem Schlüssel im task-lokalen Speicher der aktuellen Aufgabe zu.
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
Rufen Sie die Funktion body
mit einem modifizierten aufgabenlokalen Speicher auf, in dem value
key
zugewiesen wird; der vorherige Wert von key
oder dessen Fehlen wird danach wiederhergestellt. Nützlich zum Emulieren von dynamischem Scoping.
Scheduling
Base.yield
— Functionyield()
Wechseln Sie zum Scheduler, um einem anderen geplanten Task die Ausführung zu ermöglichen. Ein Task, der diese Funktion aufruft, ist weiterhin ausführbar und wird sofort neu gestartet, wenn keine anderen ausführbaren Tasks vorhanden sind.
yield(t::Task, arg = nothing)
Eine schnelle, unfair-scheduling Version von schedule(t, arg); yield()
, die sofort an t
übergibt, bevor der Scheduler aufgerufen wird.
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
Wechseln Sie zu der angegebenen Aufgabe. Beim ersten Wechsel zu einer Aufgabe wird die Funktion der Aufgabe ohne Argumente aufgerufen. Bei nachfolgenden Wechseln wird arg
von dem letzten Aufruf der Aufgabe an yieldto
zurückgegeben. Dies ist ein Low-Level-Aufruf, der nur Aufgaben wechselt, ohne Zustände oder Zeitplanung in irgendeiner Weise zu berücksichtigen. Seine Verwendung wird nicht empfohlen.
Base.sleep
— Functionsleep(seconds)
Blockiere die aktuelle Aufgabe für eine bestimmte Anzahl von Sekunden. Die minimale Schlafzeit beträgt 1 Millisekunde oder eine Eingabe von 0.001
.
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
Fügt eine Task
zur Warteschlange des Planers hinzu. Dies bewirkt, dass die Aufgabe ständig ausgeführt wird, wenn das System ansonsten untätig ist, es sei denn, die Aufgabe führt eine blockierende Operation wie wait
aus.
Wenn ein zweites Argument val
bereitgestellt wird, wird es der Aufgabe (über den Rückgabewert von yieldto
) übergeben, wenn sie erneut ausgeführt wird. Wenn error
auf true
gesetzt ist, wird der Wert als Ausnahme in der geweckten Aufgabe ausgelöst.
Es ist falsch, schedule
auf einer beliebigen Task
zu verwenden, die bereits gestartet wurde. Siehe die API-Referenz für weitere Informationen.
Standardmäßig haben Aufgaben das Sticky-Bit auf true t.sticky
gesetzt. Dies modelliert das historische Standardverhalten für @async
. Sticky-Aufgaben können nur auf dem Worker-Thread ausgeführt werden, auf dem sie zuerst geplant wurden, und wenn sie geplant werden, machen sie die Aufgabe, von der sie geplant wurden, sticky. Um das Verhalten von Threads.@spawn
zu erhalten, setzen Sie das Sticky-Bit manuell auf false
.
Beispiele
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
Druckt ein Fehlerprotokoll auf stderr
, wenn die Aufgabe t
fehlschlägt.
Beispiele
julia> Base._wait(errormonitor(Threads.@spawn error("Aufgabe fehlgeschlagen")))
Unhandled Task ERROR: Aufgabe fehlgeschlagen
Stacktrace:
[...]
Base.@sync
— Macro@sync
Warten Sie, bis alle lexikalisch eingeschlossenen Verwendungen von @async
, @spawn
, Distributed.@spawnat
und Distributed.@distributed
abgeschlossen sind. Alle Ausnahmen, die von den eingeschlossenen asynchronen Operationen ausgelöst werden, werden gesammelt und als CompositeException
ausgelöst.
Beispiele
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— FunctionBesondere Anmerkung für Threads.Condition
:
Der Aufrufer muss den lock
halten, der ein Threads.Condition
besitzt, bevor er diese Methode aufruft. Die aufrufende Aufgabe wird blockiert, bis eine andere Aufgabe sie weckt, normalerweise durch den Aufruf von notify
auf demselben Threads.Condition
-Objekt. Der Lock wird atomar freigegeben, wenn die Blockierung erfolgt (auch wenn er rekursiv gesperrt war), und wird vor der Rückkehr erneut erworben.
wait(r::Future)
Warten Sie, bis ein Wert für die angegebene Future
verfügbar ist.
wait(r::RemoteChannel, args...)
Warten Sie, bis ein Wert auf dem angegebenen RemoteChannel
verfügbar ist.
wait([x])
Blockiere die aktuelle Aufgabe, bis ein Ereignis eintritt, abhängig von der Art des Arguments:
Channel
: Warte darauf, dass ein Wert zum Kanal hinzugefügt wird.Condition
: Warte aufnotify
bei einer Bedingung und gib denval
-Parameter zurück, der annotify
übergeben wurde. Das Warten auf eine Bedingung ermöglicht zusätzlich das Übergeben vonfirst=true
, was dazu führt, dass der Wartende zuerst in der Reihe steht, um beinotify
aufgeweckt zu werden, anstatt dem üblichen First-in-First-out-Verhalten zu folgen.Process
: Warte darauf, dass ein Prozess oder eine Prozesskette beendet wird. Das Feldexitcode
eines Prozesses kann verwendet werden, um Erfolg oder Misserfolg zu bestimmen.Task
: Warte darauf, dass eineTask
abgeschlossen wird. Wenn die Aufgabe mit einer Ausnahme fehlschlägt, wird eineTaskFailedException
(die die fehlgeschlagene Aufgabe umschließt) ausgelöst.RawFD
: Warte auf Änderungen an einem Dateideskriptor (siehe dasFileWatching
-Paket).
Wenn kein Argument übergeben wird, blockiert die Aufgabe für einen undefinierten Zeitraum. Eine Aufgabe kann nur durch einen expliziten Aufruf von schedule
oder yieldto
neu gestartet werden.
Oft wird wait
innerhalb einer while
-Schleife aufgerufen, um sicherzustellen, dass eine wartende Bedingung erfüllt ist, bevor fortgefahren wird.
wait(c::Channel)
Blockiert, bis der Channel
isready
ist.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # Aufgabe ist blockiert, weil der Kanal nicht bereit ist
false
julia> put!(c, 1);
julia> istaskdone(task) # Aufgabe ist jetzt nicht mehr blockiert
true
Base.fetch
— Methodfetch(t::Task)
Warten Sie, bis eine Task
abgeschlossen ist, und geben Sie dann ihren Ergebniswert zurück. Wenn die Aufgabe mit einer Ausnahme fehlschlägt, wird eine TaskFailedException
(die die fehlgeschlagene Aufgabe umschließt) ausgelöst.
Base.fetch
— Methodfetch(x::Any)
Gibt x
zurück.
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
Warten, bis testcb()
true
zurückgibt oder timeout
Sekunden vergangen sind, je nachdem, was zuerst eintritt. Die Testfunktion wird alle pollint
Sekunden abgefragt. Der Mindestwert für pollint
beträgt 0,001 Sekunden, das sind 1 Millisekunde.
Gibt :ok
oder :timed_out
zurück.
Beispiele
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
Erstellen Sie eine kantengetriggerte Ereignisquelle, auf die Aufgaben warten können. Aufgaben, die wait
auf einer Condition
aufrufen, werden ausgesetzt und in eine Warteschlange gestellt. Aufgaben werden geweckt, wenn später notify
auf der Condition
aufgerufen wird. Das Warten auf eine Bedingung kann einen Wert zurückgeben oder einen Fehler auslösen, wenn die optionalen Argumente von notify
verwendet werden. Kantentrigger bedeutet, dass nur Aufgaben, die zum Zeitpunkt des Aufrufs von notify
warten, geweckt werden können. Für levelgetriggerte Benachrichtigungen müssen Sie zusätzlichen Zustand beibehalten, um zu verfolgen, ob eine Benachrichtigung stattgefunden hat. Die Typen Channel
und Threads.Event
tun dies und können für levelgetriggerte Ereignisse verwendet werden.
Dieses Objekt ist NICHT threadsicher. Siehe Threads.Condition
für eine threadsichere Version.
Base.Threads.Condition
— TypeThreads.Condition([lock])
Eine threadsichere Version von Base.Condition
.
Um wait
oder notify
auf einem Threads.Condition
aufzurufen, müssen Sie zuerst lock
darauf aufrufen. Wenn wait
aufgerufen wird, wird das Lock während des Blockierens atomar freigegeben und vor der Rückkehr von wait
wieder erworben. Daher sieht die idiomatische Verwendung eines Threads.Condition
c
wie folgt aus:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
Diese Funktionalität erfordert mindestens Julia 1.2.
Base.Event
— TypeEvent([autoreset=false])
Erstellen Sie eine level-triggered Ereignisquelle. Aufgaben, die wait
auf einem Event
aufrufen, werden ausgesetzt und in eine Warteschlange gestellt, bis notify
auf dem Event
aufgerufen wird. Nachdem notify
aufgerufen wurde, bleibt das Event
im signalisierenden Zustand, und Aufgaben werden nicht mehr blockiert, wenn sie darauf warten, bis reset
aufgerufen wird.
Wenn autoreset
wahr ist, wird höchstens eine Aufgabe aus wait
für jeden Aufruf von notify
freigegeben.
Dies bietet eine Erwerbs- und Freigabereihenfolge im Speicher für notify/wait.
Diese Funktionalität erfordert mindestens Julia 1.1.
Die autoreset
-Funktionalität und die Garantie der Speicherreihenfolge erfordern mindestens Julia 1.8.
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
Wecke Aufgaben, die auf eine Bedingung warten, und übergebe ihnen val
. Wenn all
true
ist (der Standardwert), werden alle wartenden Aufgaben geweckt, andernfalls nur eine. Wenn error
true
ist, wird der übergebene Wert als Ausnahme in den geweckten Aufgaben ausgelöst.
Gib die Anzahl der geweckten Aufgaben zurück. Gib 0 zurück, wenn keine Aufgaben auf condition
warten.
Base.reset
— Methodreset(::Event)
Setzt ein Event
zurück in einen nicht gesetzten Zustand. Zukünftige Aufrufe von wait
blockieren dann, bis notify
erneut aufgerufen wird.
Base.Semaphore
— TypeSemaphore(sem_size)
Erstellen Sie ein Zählsemaphore, das höchstens sem_size
Erfassungen gleichzeitig zulässt. Jede Erfassung muss mit einer Freigabe übereinstimmen.
Dies bietet eine Erfassungs- und Freigabereihenfolge bei Erfassungs-/Freigabeverrufen.
Base.acquire
— Functionacquire(s::Semaphore)
Warten Sie, bis einer der sem_size
Erlaubnisse verfügbar ist, und blockieren Sie, bis eine erworben werden kann.
acquire(f, s::Semaphore)
Führen Sie f
aus, nachdem Sie von Semaphore s
erworben haben, und release
nach Abschluss oder Fehler.
Zum Beispiel eine do-Block-Form, die sicherstellt, dass nur 2 Aufrufe von foo
gleichzeitig aktiv sind:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Diese Methode erfordert mindestens Julia 1.8.
Base.release
— Functionrelease(s::Semaphore)
Gibt ein Erlaubnis an den Pool zurück, wodurch möglicherweise eine andere Aufgabe sie erwerben und die Ausführung fortsetzen kann.
Base.AbstractLock
— TypeAbstractLock
Abstrakte Supertype, die Typen beschreibt, die die Synchronisationsprimitive implementieren: lock
, trylock
, unlock
und islocked
.
Base.lock
— Functionlock(lock)
Erwerben Sie das lock
, wenn es verfügbar wird. Wenn das Lock bereits von einer anderen Aufgabe/Thread gesperrt ist, warten Sie, bis es verfügbar wird.
Jedes lock
muss durch ein unlock
ausgeglichen werden.
lock(f::Function, lock)
Erwirbt das lock
, führt f
mit dem gehaltenen lock
aus und gibt das lock
frei, wenn f
zurückkehrt. Wenn das lock
bereits von einer anderen Aufgabe/Thread gesperrt ist, wird gewartet, bis es verfügbar wird.
Wenn diese Funktion zurückkehrt, wurde das lock
freigegeben, sodass der Aufrufer nicht versuchen sollte, es zu unlock
.
Siehe auch: @lock
.
Die Verwendung eines Channel
als zweiten Parameter erfordert Julia 1.7 oder höher.
lock(f::Function, l::Lockable)
Erwirbt das mit l
verbundene Schloss, führt f
mit dem gehaltenen Schloss aus und gibt das Schloss zurück, wenn f
zurückkehrt. f
erhält ein positionsbasiertes Argument: den von l
umschlossenen Wert. Wenn das Schloss bereits von einer anderen Aufgabe/Thread gesperrt ist, wird gewartet, bis es verfügbar wird. Wenn diese Funktion zurückkehrt, wurde das lock
freigegeben, sodass der Aufrufer nicht versuchen sollte, es zu unlock
en.
Erfordert mindestens Julia 1.11.
Base.unlock
— Functionunlock(lock)
Gibt das Eigentum des lock
frei.
Wenn dies ein rekursiver Lock ist, der zuvor erworben wurde, wird ein interner Zähler verringert und sofort zurückgegeben.
Base.trylock
— Functiontrylock(lock) -> Erfolg (Boolean)
Erwirbt das Schloss, wenn es verfügbar ist, und gibt true
zurück, wenn es erfolgreich war. Wenn das Schloss bereits von einer anderen Aufgabe/Thread gesperrt ist, gibt es false
zurück.
Jedes erfolgreiche trylock
muss mit einem unlock
übereinstimmen.
Die Funktion trylock
in Kombination mit islocked
kann verwendet werden, um die Test-und-Test-und-Set- oder exponentielle Backoff-Algorithmen zu schreiben wenn es von typeof(lock)
unterstützt wird (lesen Sie die Dokumentation dazu).
Base.islocked
— Functionislocked(lock) -> Status (Boolean)
Überprüfen, ob das lock
von einer Aufgabe/Thread gehalten wird. Diese Funktion allein sollte nicht zur Synchronisation verwendet werden. Allerdings kann islocked
in Kombination mit trylock
verwendet werden, um die Test-and-Test-and-Set- oder Exponential-Backoff-Algorithmen zu schreiben, wenn dies von typeof(lock)
unterstützt wird (lesen Sie die Dokumentation).
Erweiterte Hilfe
Ein exponentieller Backoff kann beispielsweise wie folgt implementiert werden, wenn die lock
-Implementierung die unten dokumentierten Eigenschaften erfüllt.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Implementierung
Es wird empfohlen, dass eine Lock-Implementierung islocked
mit den folgenden Eigenschaften definiert und in ihrer Dokumentation vermerkt:
islocked(lock)
ist datarace-frei.- Wenn
islocked(lock)
false
zurückgibt, muss ein sofortiger Aufruf vontrylock(lock)
erfolgreich sein (gibttrue
zurück), wenn es keine Störungen durch andere Aufgaben gibt.
Base.ReentrantLock
— TypeReentrantLock()
Erstellt ein re-entrantes Schloss zur Synchronisierung von Task
s. Derselbe Task kann das Schloss so oft erwerben, wie erforderlich (das ist es, was der "Reentrant"-Teil des Namens bedeutet). Jedes lock
muss mit einem unlock
übereinstimmen.
Der Aufruf von lock
wird auch die Ausführung von Finalisierern in diesem Thread bis zum entsprechenden unlock
verhindern. Die Verwendung des standardmäßigen Schlossmusters, das unten veranschaulicht wird, sollte natürlich unterstützt werden, aber seien Sie vorsichtig, die Reihenfolge von try/lock umzukehren oder den try-Block ganz zu verpassen (z. B. zu versuchen, zurückzukehren, während das Schloss noch gehalten wird):
Dies bietet eine Erwerb/Freigabe-Speicherreihenfolge bei lock/unlock-Aufrufen.
lock(l)
try
<atomare Arbeit>
finally
unlock(l)
end
Wenn !islocked(lck::ReentrantLock)
gilt, gelingt trylock(lck)
, es sei denn, es gibt andere Tasks, die versuchen, das Schloss "zur gleichen Zeit" zu halten.
Base.@lock
— Macro@lock l expr
Makroversion von lock(f, l::AbstractLock)
, aber mit expr
anstelle der f
-Funktion. Erweitert sich zu:
lock(l)
try
expr
finally
unlock(l)
end
Dies ist ähnlich wie die Verwendung von lock
mit einem do
-Block, vermeidet jedoch die Erstellung einer Closure und kann somit die Leistung verbessern.
@lock
wurde in Julia 1.3 hinzugefügt und in Julia 1.10 exportiert.
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
Erstellt ein Lockable
-Objekt, das value
umschließt und es mit dem bereitgestellten lock
verknüpft. Dieses Objekt unterstützt @lock
, lock
, trylock
, unlock
. Um auf den Wert zuzugreifen, indizieren Sie das lockable Objekt, während Sie den Lock halten.
Erfordert mindestens Julia 1.11.
Beispiel
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # muss den Lock halten, um auf den Wert zuzugreifen
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
Darstellung eines Kanals, der Objekte vom Typ T
überträgt.
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
Konstruiert einen Channel
mit einem internen Puffer, der maximal size
Objekte vom Typ T
halten kann. put!
Aufrufe auf einem vollen Kanal blockieren, bis ein Objekt mit take!
entfernt wird.
Channel(0)
konstruiert einen unbuffered Kanal. put!
blockiert, bis ein passendes take!
aufgerufen wird. Und umgekehrt.
Weitere Konstruktoren:
Channel()
: Standardkonstruktor, äquivalent zuChannel{Any}(0)
Channel(Inf)
: äquivalent zuChannel{Any}(typemax(Int))
Channel(sz)
: äquivalent zuChannel{Any}(sz)
Der Standardkonstruktor Channel()
und die Standard size=0
wurden in Julia 1.3 hinzugefügt.
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Erstellt eine neue Aufgabe aus func
, bindet sie an einen neuen Kanal vom Typ T
und der Größe size
und plant die Aufgabe, alles in einem einzigen Aufruf. Der Kanal wird automatisch geschlossen, wenn die Aufgabe endet.
func
muss den gebundenen Kanal als einziges Argument akzeptieren.
Wenn Sie eine Referenz auf die erstellte Aufgabe benötigen, übergeben Sie ein Ref{Task}
-Objekt über das Schlüsselwortargument taskref
.
Wenn spawn=true
, kann die für func
erstellte Task
parallel auf einem anderen Thread geplant werden, was dem Erstellen einer Aufgabe über Threads.@spawn
entspricht.
Wenn spawn=true
und das Argument threadpool
nicht gesetzt ist, wird es standardmäßig auf :default
gesetzt.
Wenn das Argument threadpool
gesetzt ist (auf :default
oder :interactive
), impliziert dies, dass spawn=true
und die neue Aufgabe im angegebenen Threadpool erstellt wird.
Gibt einen Channel
zurück.
Beispiele
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Referenzierung der erstellten Aufgabe:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
Der Parameter spawn=
wurde in Julia 1.3 hinzugefügt. Dieser Konstruktor wurde in Julia 1.3 hinzugefügt. In früheren Versionen von Julia verwendete Channel Schlüsselwortargumente, um size
und T
festzulegen, aber diese Konstruktoren sind veraltet.
Das Argument threadpool=
wurde in Julia 1.9 hinzugefügt.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
Fügt ein Element v
zum Kanal c
hinzu. Blockiert, wenn der Kanal voll ist.
Für unpufferte Kanäle blockiert es, bis ein take!
von einer anderen Aufgabe ausgeführt wird.
v
wird jetzt in den Typ des Kanals mit convert
umgewandelt, wenn put!
aufgerufen wird.
Base.take!
— Methodtake!(c::Channel)
Entfernt und gibt einen Wert aus einem Channel
in der richtigen Reihenfolge zurück. Blockiert, bis Daten verfügbar sind. Bei unpufferten Kanälen blockiert es, bis ein put!
von einer anderen Aufgabe ausgeführt wird.
Beispiele
Pufferkanal:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Unpufferter Kanal:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
Bestimmt, ob ein Channel
einen Wert gespeichert hat. Gibt sofort zurück, blockiert nicht.
Für unbuffered Channels gibt es true
zurück, wenn Aufgaben warten, um einen put!
auszuführen.
Beispiele
Buffered Channel:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Unbuffered Channel:
julia> c = Channel();
julia> isready(c) # keine Aufgaben warten, um zu put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # eine put! Aufgabe planen
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
Wartet auf und gibt (ohne zu entfernen) das erste verfügbare Element aus dem Channel
zurück. Hinweis: fetch
wird bei einem unbuffered (0-Größe) Channel
nicht unterstützt.
Beispiele
Pufferkanal:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # Element wird nicht entfernt
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
Schließt einen Kanal. Eine Ausnahme (optional angegeben durch excp
) wird ausgelöst durch:
Base.bind
— Methodbind(chnl::Channel, task::Task)
Assoziiere die Lebensdauer von chnl
mit einer Aufgabe. Channel
chnl
wird automatisch geschlossen, wenn die Aufgabe endet. Jede nicht abgefangene Ausnahme in der Aufgabe wird an alle Wartenden auf chnl
weitergegeben.
Das chnl
-Objekt kann unabhängig vom Abschluss der Aufgabe explizit geschlossen werden. Beendete Aufgaben haben keinen Einfluss auf bereits geschlossene Channel
-Objekte.
Wenn ein Kanal an mehrere Aufgaben gebunden ist, wird der erste, der beendet wird, den Kanal schließen. Wenn mehrere Kanäle an dieselbe Aufgabe gebunden sind, wird das Beenden der Aufgabe alle gebundenen Kanäle schließen.
Beispiele
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
Die einfachste korrekte Verwendung von schedule
ist bei einer Task
, die noch nicht gestartet (geplant) ist. Es ist jedoch möglich, 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
und wait
als sehr grundlegenden Baustein zum Konstruieren von Synchronisationsschnittstellen zu verwenden. Eine entscheidende Vorbedingung für den Aufruf von schedule(task)
ist, dass der Aufrufer die task
"besitzen" muss; d.h., er muss wissen, dass der Aufruf von wait
in der gegebenen task
an den Stellen erfolgt, die dem Code bekannt sind, der schedule(task)
aufruft. Eine Strategie zur Sicherstellung dieser Vorbedingung ist die Verwendung von Atomics, wie im folgenden Beispiel demonstriert:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
ermöglicht es einer Aufgabe, auf die notify
einer anderen Aufgabe zu warten
. Es handelt sich um eine eingeschränkte Kommunikationsschnittstelle, da wait
nur einmal von einer einzelnen Aufgabe verwendet werden kann (beachten Sie die nicht-atomare Zuweisung von ev.task
).
In diesem Beispiel darf notify(ev::OneWayEvent)
schedule(ev.task)
nur dann aufrufen, wenn es den Zustand von OWE_WAITING
auf OWE_NOTIFYING
ändert. Dies lässt uns wissen, dass die Aufgabe, die wait(ev::OneWayEvent)
ausführt, sich jetzt im ok
-Zweig befindet und dass es keine anderen Aufgaben geben kann, die versuchen, schedule(ev.task)
aufzurufen, da ihr @atomicreplace(ev.state, state => OWE_NOTIFYING)
fehlschlagen wird.