Tasks

Core.TaskType
Task(func)

Erstellen Sie eine Task (d.h. Coroutine), um die gegebene Funktion func auszuführen (die ohne Argumente aufrufbar sein muss). Die Aufgabe endet, wenn diese Funktion zurückkehrt. Die Aufgabe wird im "Weltalter" des Elternteils zur Zeit der Konstruktion ausgeführt, wenn sie scheduled wird.

Warning

Standardmäßig haben Aufgaben das Sticky-Bit auf true t.sticky gesetzt. Dies modelliert das historische Standardverhalten für @async. Sticky-Aufgaben können nur auf dem Worker-Thread ausgeführt werden, auf dem sie zuerst geplant wurden, und wenn sie geplant werden, machen sie die Aufgabe, von der sie geplant wurden, sticky. Um das Verhalten von Threads.@spawn zu erhalten, setzen Sie das Sticky-Bit manuell auf false.

Beispiele

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

In diesem Beispiel ist b eine ausführbare Task, die noch nicht gestartet wurde.

source
Base.@taskMacro
@task

Wickeln Sie einen Ausdruck in eine Task ein, ohne ihn auszuführen, und geben Sie die Task zurück. Dies erstellt nur eine Aufgabe und führt sie nicht aus.

Warning

Standardmäßig haben Aufgaben das Sticky-Bit auf true t.sticky gesetzt. Dies modelliert das historische Standardverhalten für @async. Sticky-Aufgaben können nur auf dem Worker-Thread ausgeführt werden, auf dem sie zuerst geplant wurden, und beim Planen wird die Aufgabe, von der sie geplant wurden, sticky. Um das Verhalten von Threads.@spawn zu erhalten, setzen Sie das Sticky-Bit manuell auf false.

Beispiele

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

Wickeln Sie einen Ausdruck in eine Task und fügen Sie ihn der Warteschlange des lokalen Maschinenplaners hinzu.

Werte können über $ in @async interpoliert werden, was den Wert direkt in den konstruierten zugrunde liegenden Closure kopiert. Dies ermöglicht es Ihnen, den Wert einer Variablen einzufügen und den asynchronen Code von Änderungen des Wertes der Variablen in der aktuellen Aufgabe zu isolieren.

Warning

Es wird dringend empfohlen, Threads.@spawn immer @async vorzuziehen, auch wenn keine Parallelität erforderlich ist, insbesondere in öffentlich verteilten Bibliotheken. Dies liegt daran, dass die Verwendung von @async die Migration der Eltern-Aufgabe über Arbeits-Threads in der aktuellen Implementierung von Julia deaktiviert. Daher kann die scheinbar harmlose Verwendung von @async in einer Bibliotheksfunktion erhebliche Auswirkungen auf die Leistung sehr unterschiedlicher Teile von Benutzeranwendungen haben.

Julia 1.4

Die Interpolation von Werten über $ ist seit Julia 1.4 verfügbar.

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

Verwendet mehrere gleichzeitige Aufgaben, um f über eine Sammlung (oder mehrere gleich lange Sammlungen) abzubilden. Bei mehreren Sammlungsargumenten wird f elementweise angewendet.

ntasks gibt die Anzahl der Aufgaben an, die gleichzeitig ausgeführt werden sollen. Abhängig von der Länge der Sammlungen werden, wenn ntasks nicht angegeben ist, bis zu 100 Aufgaben für die gleichzeitige Abbildung verwendet.

ntasks kann auch als Funktion ohne Argumente angegeben werden. In diesem Fall wird die Anzahl der parallel auszuführenden Aufgaben vor der Verarbeitung jedes Elements überprüft, und eine neue Aufgabe wird gestartet, wenn der Wert von ntasks_func größer ist als die aktuelle Anzahl der Aufgaben.

Wenn batch_size angegeben ist, wird die Sammlung im Batch-Modus verarbeitet. f muss dann eine Funktion sein, die ein Vector von Argument-Tupeln akzeptiert und ein Array von Ergebnissen zurückgibt. Der Eingangsvektor hat eine Länge von batch_size oder weniger.

Die folgenden Beispiele heben die Ausführung in verschiedenen Aufgaben hervor, indem sie die objectid der Aufgaben zurückgeben, in denen die Abbildungsfunktion ausgeführt wird.

Zuerst, mit undefiniertem ntasks, wird jedes Element in einer anderen Aufgabe verarbeitet.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

Mit ntasks=2 werden alle Elemente in 2 Aufgaben verarbeitet.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

Mit definierter batch_size muss die Abbildungsfunktion geändert werden, um ein Array von Argument-Tupeln zu akzeptieren und ein Array von Ergebnissen zurückzugeben. map wird in der modifizierten Abbildungsfunktion verwendet, um dies zu erreichen.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Wie asyncmap, aber speichert die Ausgabe in results, anstatt eine Sammlung zurückzugeben.

Warning

Das Verhalten kann unerwartet sein, wenn ein verändertes Argument Speicher mit einem anderen Argument teilt.

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

Bestimmen Sie, ob eine Aufgabe beendet ist.

Beispiele

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

Bestimmen Sie, ob eine Aufgabe mit der Ausführung begonnen hat.

Beispiele

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

Bestimmen Sie, ob eine Aufgabe aufgrund einer Ausnahme beendet wurde.

Beispiele

julia> a4() = error("Aufgabe fehlgeschlagen");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

Diese Funktion erfordert mindestens Julia 1.3.

source
Base.task_local_storageMethod
task_local_storage(key)

Suchen Sie den Wert eines Schlüssels im aufgabenlokalen Speicher der aktuellen Aufgabe.

source
Base.task_local_storageMethod
task_local_storage(key, value)

Weisen Sie einen Wert einem Schlüssel im task-lokalen Speicher der aktuellen Aufgabe zu.

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

Rufen Sie die Funktion body mit einem modifizierten aufgabenlokalen Speicher auf, in dem value key zugewiesen wird; der vorherige Wert von key oder dessen Fehlen wird danach wiederhergestellt. Nützlich zum Emulieren von dynamischem Scoping.

source

Scheduling

Base.yieldFunction
yield()

Wechseln Sie zum Scheduler, um einem anderen geplanten Task die Ausführung zu ermöglichen. Ein Task, der diese Funktion aufruft, ist weiterhin ausführbar und wird sofort neu gestartet, wenn keine anderen ausführbaren Tasks vorhanden sind.

source
yield(t::Task, arg = nothing)

Eine schnelle, unfair-scheduling Version von schedule(t, arg); yield(), die sofort an t übergibt, bevor der Scheduler aufgerufen wird.

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

Wechseln Sie zu der angegebenen Aufgabe. Beim ersten Wechsel zu einer Aufgabe wird die Funktion der Aufgabe ohne Argumente aufgerufen. Bei nachfolgenden Wechseln wird arg von dem letzten Aufruf der Aufgabe an yieldto zurückgegeben. Dies ist ein Low-Level-Aufruf, der nur Aufgaben wechselt, ohne Zustände oder Zeitplanung in irgendeiner Weise zu berücksichtigen. Seine Verwendung wird nicht empfohlen.

source
Base.sleepFunction
sleep(seconds)

Blockiere die aktuelle Aufgabe für eine bestimmte Anzahl von Sekunden. Die minimale Schlafzeit beträgt 1 Millisekunde oder eine Eingabe von 0.001.

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Fügt eine Task zur Warteschlange des Planers hinzu. Dies bewirkt, dass die Aufgabe ständig ausgeführt wird, wenn das System ansonsten untätig ist, es sei denn, die Aufgabe führt eine blockierende Operation wie wait aus.

Wenn ein zweites Argument val bereitgestellt wird, wird es der Aufgabe (über den Rückgabewert von yieldto) übergeben, wenn sie erneut ausgeführt wird. Wenn error auf true gesetzt ist, wird der Wert als Ausnahme in der geweckten Aufgabe ausgelöst.

Warning

Es ist falsch, schedule auf einer beliebigen Task zu verwenden, die bereits gestartet wurde. Siehe die API-Referenz für weitere Informationen.

Warning

Standardmäßig haben Aufgaben das Sticky-Bit auf true t.sticky gesetzt. Dies modelliert das historische Standardverhalten für @async. Sticky-Aufgaben können nur auf dem Worker-Thread ausgeführt werden, auf dem sie zuerst geplant wurden, und wenn sie geplant werden, machen sie die Aufgabe, von der sie geplant wurden, sticky. Um das Verhalten von Threads.@spawn zu erhalten, setzen Sie das Sticky-Bit manuell auf false.

Beispiele

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

Druckt ein Fehlerprotokoll auf stderr, wenn die Aufgabe t fehlschlägt.

Beispiele

julia> Base._wait(errormonitor(Threads.@spawn error("Aufgabe fehlgeschlagen")))
Unhandled Task ERROR: Aufgabe fehlgeschlagen
Stacktrace:
[...]
source
Base.@syncMacro
@sync

Warten Sie, bis alle lexikalisch eingeschlossenen Verwendungen von @async, @spawn, Distributed.@spawnat und Distributed.@distributed abgeschlossen sind. Alle Ausnahmen, die von den eingeschlossenen asynchronen Operationen ausgelöst werden, werden gesammelt und als CompositeException ausgelöst.

Beispiele

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

Besondere Anmerkung für Threads.Condition:

Der Aufrufer muss den lock halten, der ein Threads.Condition besitzt, bevor er diese Methode aufruft. Die aufrufende Aufgabe wird blockiert, bis eine andere Aufgabe sie weckt, normalerweise durch den Aufruf von notify auf demselben Threads.Condition-Objekt. Der Lock wird atomar freigegeben, wenn die Blockierung erfolgt (auch wenn er rekursiv gesperrt war), und wird vor der Rückkehr erneut erworben.

source
wait(r::Future)

Warten Sie, bis ein Wert für die angegebene Future verfügbar ist.

source
wait(r::RemoteChannel, args...)

Warten Sie, bis ein Wert auf dem angegebenen RemoteChannel verfügbar ist.

source
wait([x])

Blockiere die aktuelle Aufgabe, bis ein Ereignis eintritt, abhängig von der Art des Arguments:

  • Channel: Warte darauf, dass ein Wert zum Kanal hinzugefügt wird.
  • Condition: Warte auf notify bei einer Bedingung und gib den val-Parameter zurück, der an notify übergeben wurde. Das Warten auf eine Bedingung ermöglicht zusätzlich das Übergeben von first=true, was dazu führt, dass der Wartende zuerst in der Reihe steht, um bei notify aufgeweckt zu werden, anstatt dem üblichen First-in-First-out-Verhalten zu folgen.
  • Process: Warte darauf, dass ein Prozess oder eine Prozesskette beendet wird. Das Feld exitcode eines Prozesses kann verwendet werden, um Erfolg oder Misserfolg zu bestimmen.
  • Task: Warte darauf, dass eine Task abgeschlossen wird. Wenn die Aufgabe mit einer Ausnahme fehlschlägt, wird eine TaskFailedException (die die fehlgeschlagene Aufgabe umschließt) ausgelöst.
  • RawFD: Warte auf Änderungen an einem Dateideskriptor (siehe das FileWatching-Paket).

Wenn kein Argument übergeben wird, blockiert die Aufgabe für einen undefinierten Zeitraum. Eine Aufgabe kann nur durch einen expliziten Aufruf von schedule oder yieldto neu gestartet werden.

Oft wird wait innerhalb einer while-Schleife aufgerufen, um sicherzustellen, dass eine wartende Bedingung erfüllt ist, bevor fortgefahren wird.

source
wait(c::Channel)

Blockiert, bis der Channel isready ist.

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # Aufgabe ist blockiert, weil der Kanal nicht bereit ist
false

julia> put!(c, 1);

julia> istaskdone(task)  # Aufgabe ist jetzt nicht mehr blockiert
true
source
Base.fetchMethod
fetch(t::Task)

Warten Sie, bis eine Task abgeschlossen ist, und geben Sie dann ihren Ergebniswert zurück. Wenn die Aufgabe mit einer Ausnahme fehlschlägt, wird eine TaskFailedException (die die fehlgeschlagene Aufgabe umschließt) ausgelöst.

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

Warten, bis testcb() true zurückgibt oder timeout Sekunden vergangen sind, je nachdem, was zuerst eintritt. Die Testfunktion wird alle pollint Sekunden abgefragt. Der Mindestwert für pollint beträgt 0,001 Sekunden, das sind 1 Millisekunde.

Gibt :ok oder :timed_out zurück.

Beispiele

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

Erstellen Sie eine kantengetriggerte Ereignisquelle, auf die Aufgaben warten können. Aufgaben, die wait auf einer Condition aufrufen, werden ausgesetzt und in eine Warteschlange gestellt. Aufgaben werden geweckt, wenn später notify auf der Condition aufgerufen wird. Das Warten auf eine Bedingung kann einen Wert zurückgeben oder einen Fehler auslösen, wenn die optionalen Argumente von notify verwendet werden. Kantentrigger bedeutet, dass nur Aufgaben, die zum Zeitpunkt des Aufrufs von notify warten, geweckt werden können. Für levelgetriggerte Benachrichtigungen müssen Sie zusätzlichen Zustand beibehalten, um zu verfolgen, ob eine Benachrichtigung stattgefunden hat. Die Typen Channel und Threads.Event tun dies und können für levelgetriggerte Ereignisse verwendet werden.

Dieses Objekt ist NICHT threadsicher. Siehe Threads.Condition für eine threadsichere Version.

source
Base.Threads.ConditionType
Threads.Condition([lock])

Eine threadsichere Version von Base.Condition.

Um wait oder notify auf einem Threads.Condition aufzurufen, müssen Sie zuerst lock darauf aufrufen. Wenn wait aufgerufen wird, wird das Lock während des Blockierens atomar freigegeben und vor der Rückkehr von wait wieder erworben. Daher sieht die idiomatische Verwendung eines Threads.Condition c wie folgt aus:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

Diese Funktionalität erfordert mindestens Julia 1.2.

source
Base.EventType
Event([autoreset=false])

Erstellen Sie eine level-triggered Ereignisquelle. Aufgaben, die wait auf einem Event aufrufen, werden ausgesetzt und in eine Warteschlange gestellt, bis notify auf dem Event aufgerufen wird. Nachdem notify aufgerufen wurde, bleibt das Event im signalisierenden Zustand, und Aufgaben werden nicht mehr blockiert, wenn sie darauf warten, bis reset aufgerufen wird.

Wenn autoreset wahr ist, wird höchstens eine Aufgabe aus wait für jeden Aufruf von notify freigegeben.

Dies bietet eine Erwerbs- und Freigabereihenfolge im Speicher für notify/wait.

Julia 1.1

Diese Funktionalität erfordert mindestens Julia 1.1.

Julia 1.8

Die autoreset-Funktionalität und die Garantie der Speicherreihenfolge erfordern mindestens Julia 1.8.

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

Wecke Aufgaben, die auf eine Bedingung warten, und übergebe ihnen val. Wenn all true ist (der Standardwert), werden alle wartenden Aufgaben geweckt, andernfalls nur eine. Wenn error true ist, wird der übergebene Wert als Ausnahme in den geweckten Aufgaben ausgelöst.

Gib die Anzahl der geweckten Aufgaben zurück. Gib 0 zurück, wenn keine Aufgaben auf condition warten.

source
Base.resetMethod
reset(::Event)

Setzt ein Event zurück in einen nicht gesetzten Zustand. Zukünftige Aufrufe von wait blockieren dann, bis notify erneut aufgerufen wird.

source
Base.SemaphoreType
Semaphore(sem_size)

Erstellen Sie ein Zählsemaphore, das höchstens sem_size Erfassungen gleichzeitig zulässt. Jede Erfassung muss mit einer Freigabe übereinstimmen.

Dies bietet eine Erfassungs- und Freigabereihenfolge bei Erfassungs-/Freigabeverrufen.

source
Base.acquireFunction
acquire(s::Semaphore)

Warten Sie, bis einer der sem_size Erlaubnisse verfügbar ist, und blockieren Sie, bis eine erworben werden kann.

source
acquire(f, s::Semaphore)

Führen Sie f aus, nachdem Sie von Semaphore s erworben haben, und release nach Abschluss oder Fehler.

Zum Beispiel eine do-Block-Form, die sicherstellt, dass nur 2 Aufrufe von foo gleichzeitig aktiv sind:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

Diese Methode erfordert mindestens Julia 1.8.

source
Base.releaseFunction
release(s::Semaphore)

Gibt ein Erlaubnis an den Pool zurück, wodurch möglicherweise eine andere Aufgabe sie erwerben und die Ausführung fortsetzen kann.

source
Base.lockFunction
lock(lock)

Erwerben Sie das lock, wenn es verfügbar wird. Wenn das Lock bereits von einer anderen Aufgabe/Thread gesperrt ist, warten Sie, bis es verfügbar wird.

Jedes lock muss durch ein unlock ausgeglichen werden.

source
lock(f::Function, lock)

Erwirbt das lock, führt f mit dem gehaltenen lock aus und gibt das lock frei, wenn f zurückkehrt. Wenn das lock bereits von einer anderen Aufgabe/Thread gesperrt ist, wird gewartet, bis es verfügbar wird.

Wenn diese Funktion zurückkehrt, wurde das lock freigegeben, sodass der Aufrufer nicht versuchen sollte, es zu unlock.

Siehe auch: @lock.

Julia 1.7

Die Verwendung eines Channel als zweiten Parameter erfordert Julia 1.7 oder höher.

source

lock(f::Function, l::Lockable)

Erwirbt das mit l verbundene Schloss, führt f mit dem gehaltenen Schloss aus und gibt das Schloss zurück, wenn f zurückkehrt. f erhält ein positionsbasiertes Argument: den von l umschlossenen Wert. Wenn das Schloss bereits von einer anderen Aufgabe/Thread gesperrt ist, wird gewartet, bis es verfügbar wird. Wenn diese Funktion zurückkehrt, wurde das lock freigegeben, sodass der Aufrufer nicht versuchen sollte, es zu unlocken.

Julia 1.11

Erfordert mindestens Julia 1.11.

source
Base.unlockFunction
unlock(lock)

Gibt das Eigentum des lock frei.

Wenn dies ein rekursiver Lock ist, der zuvor erworben wurde, wird ein interner Zähler verringert und sofort zurückgegeben.

source
Base.trylockFunction
trylock(lock) -> Erfolg (Boolean)

Erwirbt das Schloss, wenn es verfügbar ist, und gibt true zurück, wenn es erfolgreich war. Wenn das Schloss bereits von einer anderen Aufgabe/Thread gesperrt ist, gibt es false zurück.

Jedes erfolgreiche trylock muss mit einem unlock übereinstimmen.

Die Funktion trylock in Kombination mit islocked kann verwendet werden, um die Test-und-Test-und-Set- oder exponentielle Backoff-Algorithmen zu schreiben wenn es von typeof(lock) unterstützt wird (lesen Sie die Dokumentation dazu).

source
Base.islockedFunction
islocked(lock) -> Status (Boolean)

Überprüfen, ob das lock von einer Aufgabe/Thread gehalten wird. Diese Funktion allein sollte nicht zur Synchronisation verwendet werden. Allerdings kann islocked in Kombination mit trylock verwendet werden, um die Test-and-Test-and-Set- oder Exponential-Backoff-Algorithmen zu schreiben, wenn dies von typeof(lock) unterstützt wird (lesen Sie die Dokumentation).

Erweiterte Hilfe

Ein exponentieller Backoff kann beispielsweise wie folgt implementiert werden, wenn die lock-Implementierung die unten dokumentierten Eigenschaften erfüllt.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Implementierung

Es wird empfohlen, dass eine Lock-Implementierung islocked mit den folgenden Eigenschaften definiert und in ihrer Dokumentation vermerkt:

  • islocked(lock) ist datarace-frei.
  • Wenn islocked(lock) false zurückgibt, muss ein sofortiger Aufruf von trylock(lock) erfolgreich sein (gibt true zurück), wenn es keine Störungen durch andere Aufgaben gibt.
source
Base.ReentrantLockType
ReentrantLock()

Erstellt ein re-entrantes Schloss zur Synchronisierung von Tasks. Derselbe Task kann das Schloss so oft erwerben, wie erforderlich (das ist es, was der "Reentrant"-Teil des Namens bedeutet). Jedes lock muss mit einem unlock übereinstimmen.

Der Aufruf von lock wird auch die Ausführung von Finalisierern in diesem Thread bis zum entsprechenden unlock verhindern. Die Verwendung des standardmäßigen Schlossmusters, das unten veranschaulicht wird, sollte natürlich unterstützt werden, aber seien Sie vorsichtig, die Reihenfolge von try/lock umzukehren oder den try-Block ganz zu verpassen (z. B. zu versuchen, zurückzukehren, während das Schloss noch gehalten wird):

Dies bietet eine Erwerb/Freigabe-Speicherreihenfolge bei lock/unlock-Aufrufen.

lock(l)
try
    <atomare Arbeit>
finally
    unlock(l)
end

Wenn !islocked(lck::ReentrantLock) gilt, gelingt trylock(lck), es sei denn, es gibt andere Tasks, die versuchen, das Schloss "zur gleichen Zeit" zu halten.

source
Base.@lockMacro
@lock l expr

Makroversion von lock(f, l::AbstractLock), aber mit expr anstelle der f-Funktion. Erweitert sich zu:

lock(l)
try
    expr
finally
    unlock(l)
end

Dies ist ähnlich wie die Verwendung von lock mit einem do-Block, vermeidet jedoch die Erstellung einer Closure und kann somit die Leistung verbessern.

Kompatibel

@lock wurde in Julia 1.3 hinzugefügt und in Julia 1.10 exportiert.

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

Erstellt ein Lockable-Objekt, das value umschließt und es mit dem bereitgestellten lock verknüpft. Dieses Objekt unterstützt @lock, lock, trylock, unlock. Um auf den Wert zuzugreifen, indizieren Sie das lockable Objekt, während Sie den Lock halten.

Julia 1.11

Erfordert mindestens Julia 1.11.

Beispiel

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # muss den Lock halten, um auf den Wert zuzugreifen
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

Konstruiert einen Channel mit einem internen Puffer, der maximal size Objekte vom Typ T halten kann. put! Aufrufe auf einem vollen Kanal blockieren, bis ein Objekt mit take! entfernt wird.

Channel(0) konstruiert einen unbuffered Kanal. put! blockiert, bis ein passendes take! aufgerufen wird. Und umgekehrt.

Weitere Konstruktoren:

  • Channel(): Standardkonstruktor, äquivalent zu Channel{Any}(0)
  • Channel(Inf): äquivalent zu Channel{Any}(typemax(Int))
  • Channel(sz): äquivalent zu Channel{Any}(sz)
Julia 1.3

Der Standardkonstruktor Channel() und die Standard size=0 wurden in Julia 1.3 hinzugefügt.

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Erstellt eine neue Aufgabe aus func, bindet sie an einen neuen Kanal vom Typ T und der Größe size und plant die Aufgabe, alles in einem einzigen Aufruf. Der Kanal wird automatisch geschlossen, wenn die Aufgabe endet.

func muss den gebundenen Kanal als einziges Argument akzeptieren.

Wenn Sie eine Referenz auf die erstellte Aufgabe benötigen, übergeben Sie ein Ref{Task}-Objekt über das Schlüsselwortargument taskref.

Wenn spawn=true, kann die für func erstellte Task parallel auf einem anderen Thread geplant werden, was dem Erstellen einer Aufgabe über Threads.@spawn entspricht.

Wenn spawn=true und das Argument threadpool nicht gesetzt ist, wird es standardmäßig auf :default gesetzt.

Wenn das Argument threadpool gesetzt ist (auf :default oder :interactive), impliziert dies, dass spawn=true und die neue Aufgabe im angegebenen Threadpool erstellt wird.

Gibt einen Channel zurück.

Beispiele

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

Referenzierung der erstellten Aufgabe:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

Der Parameter spawn= wurde in Julia 1.3 hinzugefügt. Dieser Konstruktor wurde in Julia 1.3 hinzugefügt. In früheren Versionen von Julia verwendete Channel Schlüsselwortargumente, um size und T festzulegen, aber diese Konstruktoren sind veraltet.

Julia 1.9

Das Argument threadpool= wurde in Julia 1.9 hinzugefügt.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

Fügt ein Element v zum Kanal c hinzu. Blockiert, wenn der Kanal voll ist.

Für unpufferte Kanäle blockiert es, bis ein take! von einer anderen Aufgabe ausgeführt wird.

Julia 1.1

v wird jetzt in den Typ des Kanals mit convert umgewandelt, wenn put! aufgerufen wird.

source
Base.take!Method
take!(c::Channel)

Entfernt und gibt einen Wert aus einem Channel in der richtigen Reihenfolge zurück. Blockiert, bis Daten verfügbar sind. Bei unpufferten Kanälen blockiert es, bis ein put! von einer anderen Aufgabe ausgeführt wird.

Beispiele

Pufferkanal:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

Unpufferter Kanal:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

Bestimmt, ob ein Channel einen Wert gespeichert hat. Gibt sofort zurück, blockiert nicht.

Für unbuffered Channels gibt es true zurück, wenn Aufgaben warten, um einen put! auszuführen.

Beispiele

Buffered Channel:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

Unbuffered Channel:

julia> c = Channel();

julia> isready(c)  # keine Aufgaben warten, um zu put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # eine put! Aufgabe planen

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

Wartet auf und gibt (ohne zu entfernen) das erste verfügbare Element aus dem Channel zurück. Hinweis: fetch wird bei einem unbuffered (0-Größe) Channel nicht unterstützt.

Beispiele

Pufferkanal:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # Element wird nicht entfernt
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

Schließt einen Kanal. Eine Ausnahme (optional angegeben durch excp) wird ausgelöst durch:

  • put! auf einem geschlossenen Kanal.
  • take! und fetch auf einem leeren, geschlossenen Kanal.
source
Base.bindMethod
bind(chnl::Channel, task::Task)

Assoziiere die Lebensdauer von chnl mit einer Aufgabe. Channel chnl wird automatisch geschlossen, wenn die Aufgabe endet. Jede nicht abgefangene Ausnahme in der Aufgabe wird an alle Wartenden auf chnl weitergegeben.

Das chnl-Objekt kann unabhängig vom Abschluss der Aufgabe explizit geschlossen werden. Beendete Aufgaben haben keinen Einfluss auf bereits geschlossene Channel-Objekte.

Wenn ein Kanal an mehrere Aufgaben gebunden ist, wird der erste, der beendet wird, den Kanal schließen. Wenn mehrere Kanäle an dieselbe Aufgabe gebunden sind, wird das Beenden der Aufgabe alle gebundenen Kanäle schließen.

Beispiele

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

Die einfachste korrekte Verwendung von schedule ist bei einer Task, die noch nicht gestartet (geplant) ist. Es ist jedoch möglich, 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 und wait als sehr grundlegenden Baustein zum Konstruieren von Synchronisationsschnittstellen zu verwenden. Eine entscheidende Vorbedingung für den Aufruf von schedule(task) ist, dass der Aufrufer die task "besitzen" muss; d.h., er muss wissen, dass der Aufruf von wait in der gegebenen task an den Stellen erfolgt, die dem Code bekannt sind, der schedule(task) aufruft. Eine Strategie zur Sicherstellung dieser Vorbedingung ist die Verwendung von Atomics, wie im folgenden Beispiel demonstriert:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent ermöglicht es einer Aufgabe, auf die notify einer anderen Aufgabe zu warten. Es handelt sich um eine eingeschränkte Kommunikationsschnittstelle, da wait nur einmal von einer einzelnen Aufgabe verwendet werden kann (beachten Sie die nicht-atomare Zuweisung von ev.task).

In diesem Beispiel darf notify(ev::OneWayEvent) schedule(ev.task) nur dann aufrufen, wenn es den Zustand von OWE_WAITING auf OWE_NOTIFYING ändert. Dies lässt uns wissen, dass die Aufgabe, die wait(ev::OneWayEvent) ausführt, sich jetzt im ok-Zweig befindet und dass es keine anderen Aufgaben geben kann, die versuchen, schedule(ev.task) aufzurufen, da ihr @atomicreplace(ev.state, state => OWE_NOTIFYING) fehlschlagen wird.