Tasks

Core.TaskType
Task(func)

指定された関数 func(引数なしで呼び出せる必要があります)を実行する Task(すなわちコルーチン)を作成します。このタスクは、この関数が戻ると終了します。タスクは、scheduled で構築時の親の「ワールドエイジ」で実行されます。

Warning

デフォルトでは、タスクにはスティッキービットが true t.sticky に設定されます。これは、@async の歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawn の動作を得るには、スティッキービットを手動で false に設定してください。

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

この例では、b はまだ開始されていない実行可能な Task です。

source
Base.@taskMacro
@task

式をTaskでラップし、実行せずにTaskを返します。これはタスクを作成するだけで、実行はしません。

Warning

デフォルトでは、タスクにはスティッキービットがtrue t.stickyに設定されます。これは@asyncの歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawnの動作を得るには、スティッキービットを手動でfalseに設定してください。

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

式を Task でラップし、ローカルマシンのスケジューラーキューに追加します。

値は $ を介して @async に補間でき、これは構築された基盤のクロージャに値を直接コピーします。これにより、変数のを挿入でき、非同期コードを現在のタスクにおける変数の値の変更から隔離できます。

Warning

公に配布されるライブラリでは、並列性が必要ない場合でも常に @async よりも Threads.@spawn を優先することを強く推奨します。これは、@async の使用が現在のJuliaの実装においてタスクのワーカースレッド間の移行を無効にするためです。したがって、ライブラリ関数内での @async の見かけ上無害な使用が、ユーザーアプリケーションの非常に異なる部分のパフォーマンスに大きな影響を与える可能性があります。

Julia 1.4

$ を介した値の補間は、Julia 1.4以降で利用可能です。

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

コレクション(または複数の同じ長さのコレクション)に対して f をマッピングするために、複数の同時タスクを使用します。複数のコレクション引数がある場合、f は要素ごとに適用されます。

ntasks は同時に実行するタスクの数を指定します。コレクションの長さに応じて、ntasks が指定されていない場合、最大100のタスクが同時マッピングに使用されます。

ntasks はゼロ引数関数としても指定できます。この場合、各要素を処理する前に並行して実行するタスクの数がチェックされ、ntasks_func の値が現在のタスク数を超えている場合に新しいタスクが開始されます。

batch_size が指定されている場合、コレクションはバッチモードで処理されます。この場合、f は引数タプルの Vector を受け入れ、結果のベクターを返す関数でなければなりません。入力ベクターの長さは batch_size 以下になります。

以下の例は、マッピング関数が実行されるタスクの objectid を返すことによって、異なるタスクでの実行を強調しています。

まず、ntasks が未定義の場合、各要素は異なるタスクで処理されます。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2 の場合、すべての要素は2つのタスクで処理されます。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

batch_size が定義されている場合、マッピング関数は引数タプルの配列を受け入れ、結果の配列を返すように変更する必要があります。これを達成するために、修正されたマッピング関数では map が使用されます。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

asyncmapと同様ですが、コレクションを返すのではなく、resultsに出力を格納します。

Warning

変更された引数が他の引数とメモリを共有している場合、動作が予期しないものになることがあります。

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

タスクが終了したかどうかを判断します。

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

タスクが実行を開始したかどうかを判断します。

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

タスクが例外によって終了したかどうかを判断します。

julia> a4() = error("タスクが失敗しました");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

この関数は少なくともJulia 1.3が必要です。

source
Base.task_local_storageMethod
task_local_storage(key)

現在のタスクのタスクローカルストレージ内のキーの値を取得します。

source
Base.task_local_storageMethod
task_local_storage(key, value)

現在のタスクのタスクローカルストレージにキーに値を割り当てます。

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

body 関数を修正されたタスクローカルストレージで呼び出します。このストレージでは valuekey に割り当てられ、key の以前の値、またはそれが存在しない場合はその状態がその後に復元されます。動的スコープをエミュレートするのに便利です。

source

Scheduling

Base.yieldFunction
yield()

スケジューラに切り替えて、他のスケジュールされたタスクが実行できるようにします。この関数を呼び出すタスクはまだ実行可能であり、他に実行可能なタスクがない場合はすぐに再起動されます。

source
yield(t::Task, arg = nothing)

tに即座に制御を渡す、スケジューラを呼び出す前にschedule(t, arg); yield()を実行する、高速で不公平なスケジューリングバージョンです。

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

指定されたタスクに切り替えます。タスクに初めて切り替えると、タスクの関数が引数なしで呼び出されます。以降の切り替えでは、argがタスクの最後のyieldto呼び出しから返されます。これは低レベルの呼び出しであり、タスクの切り替えのみを行い、状態やスケジューリングを考慮しません。その使用は推奨されません。

source
Base.sleepFunction
sleep(seconds)

現在のタスクを指定された秒数だけブロックします。最小スリープ時間は1ミリ秒または0.001の入力です。

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Taskをスケジューラのキューに追加します。これにより、タスクはシステムがアイドル状態のときに常に実行されますが、タスクがwaitのようなブロッキング操作を行う場合は除きます。

第二引数valが提供されると、タスクが再度実行されるときに(yieldtoの戻り値を介して)タスクに渡されます。errortrueの場合、値は起こされたタスク内で例外として発生します。

Warning

開始された任意のTaskに対してscheduleを使用するのは不正です。詳細についてはAPIリファレンスを参照してください。

Warning

デフォルトでは、タスクにはスティッキービットがtrueに設定されますt.sticky。これは@asyncの歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawnの動作を得るには、スティッキービットを手動でfalseに設定してください。

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

タスクtが失敗した場合、stderrにエラーログを出力します。

julia> Base._wait(errormonitor(Threads.@spawn error("タスクが失敗しました")))
Unhandled Task ERROR: タスクが失敗しました
Stacktrace:
[...]
source
Base.@syncMacro
@sync

すべての字句的に囲まれた @async@spawnDistributed.@spawnat および Distributed.@distributed の使用が完了するまで待機します。囲まれた非同期操作によってスローされたすべての例外は収集され、CompositeException としてスローされます。

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

Threads.Conditionに関する特別な注意:

呼び出し元は、このメソッドを呼び出す前にThreads.Conditionを所有するlockを保持している必要があります。呼び出しタスクは、通常、同じThreads.Conditionオブジェクトでnotifyを呼び出すことによって他のタスクがそれを起こすまでブロックされます。ブロック時にロックは原子的に解放され(再帰的にロックされていた場合でも)、戻る前に再取得されます。

source
wait(r::Future)

指定された Future の値が利用可能になるまで待機します。

source
wait(r::RemoteChannel, args...)

指定された RemoteChannel で値が利用可能になるまで待機します。

source
wait([x])

現在のタスクを、引数のタイプに応じて何らかのイベントが発生するまでブロックします:

  • Channel: チャンネルに値が追加されるのを待ちます。
  • Condition: 条件でnotifyが呼ばれるのを待ち、notifyに渡されたvalパラメータを返します。条件で待機する場合、first=trueを渡すことができ、これにより待機者が通常の先入れ先出しの動作の代わりにnotifyで起こすための最初の位置に置かれます。
  • Process: プロセスまたはプロセスチェーンが終了するのを待ちます。プロセスのexitcodeフィールドを使用して成功または失敗を判断できます。
  • Task: Taskが終了するのを待ちます。タスクが例外で失敗した場合、TaskFailedException(失敗したタスクをラップします)がスローされます。
  • RawFD: ファイルディスクリプタの変更を待ちます(FileWatchingパッケージを参照)。

引数が渡されない場合、タスクは未定義の期間ブロックされます。タスクは、scheduleまたはyieldtoへの明示的な呼び出しによってのみ再起動できます。

しばしばwaitwhileループ内で呼び出され、待機している条件が満たされるまで進行しないようにします。

source
wait(c::Channel)

Channel isready になるまでブロックします。

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # タスクはチャネルが準備できていないためブロックされています
false

julia> put!(c, 1);

julia> istaskdone(task)  # タスクは今やブロック解除されました
true
source
Base.fetchMethod
fetch(t::Task)

Task が完了するのを待ってから、その結果の値を返します。タスクが例外で失敗した場合、失敗したタスクをラップした TaskFailedException がスローされます。

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

testcb()true を返すか、timeout 秒が経過するまで待機します。どちらか早い方が優先されます。テスト関数は毎 pollint 秒ごとにポーリングされます。pollint の最小値は 0.001 秒、すなわち 1 ミリ秒です。

:ok または :timed_out を返します。

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

タスクが待機できるエッジトリガーイベントソースを作成します。Condition上でwaitを呼び出すタスクは、一時停止され、キューに入れられます。後でCondition上でnotifyが呼び出されると、タスクが起こされます。条件で待機することは、値を返すか、notifyのオプション引数が使用されている場合にエラーを発生させることがあります。エッジトリガーは、notifyが呼び出された時点で待機しているタスクのみが起こされることを意味します。レベルトリガー通知の場合、通知が発生したかどうかを追跡するために追加の状態を保持する必要があります。ChannelおよびThreads.Eventタイプはこれを行い、レベルトリガーイベントに使用できます。

このオブジェクトはスレッドセーフではありません。スレッドセーフなバージョンについては、Threads.Conditionを参照してください。

source
Base.Threads.ConditionType
Threads.Condition([lock])

Base.Condition のスレッドセーフなバージョンです。

Threads.Conditionwait または notify を呼び出すには、まずそれに対して lock を呼び出す必要があります。wait が呼び出されると、ブロッキング中にロックが原子的に解放され、wait が戻る前に再取得されます。したがって、Threads.Condition c の慣用的な使用法は次のようになります。

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

この機能は少なくとも Julia 1.2 が必要です。

source
Base.EventType
Event([autoreset=false])

レベルトリガーイベントソースを作成します。Event上でwaitを呼び出すタスクは、notifyEvent上で呼び出されるまで、一時停止されてキューに入れられます。notifyが呼び出されると、Eventはシグナル状態のままとなり、resetが呼び出されるまで、タスクはそれを待っている間にブロックされなくなります。

autoresetがtrueの場合、notifyの呼び出しごとに、waitから解放されるタスクは最大で1つになります。

これは、notify/waitに対して取得と解放のメモリ順序を提供します。

Julia 1.1

この機能は、少なくともJulia 1.1が必要です。

Julia 1.8

autoreset機能とメモリ順序の保証には、少なくともJulia 1.8が必要です。

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

条件を待っているタスクを起こし、valを渡します。alltrue(デフォルト)であれば、すべての待機中のタスクが起こされ、そうでなければ1つだけが起こされます。errortrueであれば、渡された値が起こされたタスクで例外として発生します。

起こされたタスクの数を返します。conditionで待機中のタスクがない場合は0を返します。

source
Base.resetMethod
reset(::Event)

Eventを未設定の状態にリセットします。その後、waitへの将来の呼び出しは、再度notifyが呼び出されるまでブロックされます。

source
Base.SemaphoreType
Semaphore(sem_size)

sem_size の同時取得数を最大とするカウントセマフォを作成します。各取得はリリースと対になっている必要があります。

これにより、取得/リリース呼び出しに対して取得とリリースのメモリ順序が提供されます。

source
Base.acquireFunction
acquire(s::Semaphore)

sem_size の許可のいずれかが利用可能になるまで待機し、取得できるまでブロックします。

source
acquire(f, s::Semaphore)

Semaphore s から取得した後に f を実行し、完了またはエラー時に release します。

例えば、同時に foo の呼び出しが2回だけアクティブであることを保証する do-block 形式は次の通りです:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

このメソッドは少なくとも Julia 1.8 を必要とします。

source
Base.releaseFunction
release(s::Semaphore)

プールに1つの許可を返し、別のタスクがそれを取得して実行を再開できるようにする可能性があります。

source
Base.lockFunction
lock(lock)

lockが利用可能になったときに取得します。ロックが別のタスク/スレッドによってすでにロックされている場合は、利用可能になるまで待機します。

lockunlockで対応する必要があります。

source
lock(f::Function, lock)

lockを取得し、lockを保持した状態でfを実行し、fが返るときにlockを解放します。もしlockが別のタスク/スレッドによってすでにロックされている場合は、それが利用可能になるまで待機します。

この関数が返るとき、lockは解放されているため、呼び出し元はそれをunlockしようとしないでください。

参照: @lock.

Julia 1.7

第二引数としてChannelを使用するには、Julia 1.7以降が必要です。

source

lock(f::Function, l::Lockable)

lに関連付けられたロックを取得し、ロックを保持した状態でfを実行し、fが返るとロックを解放します。fは1つの位置引数を受け取ります:lによってラップされた値です。ロックが別のタスク/スレッドによってすでにロックされている場合は、それが利用可能になるまで待機します。この関数が返ると、lockは解放されるため、呼び出し元はそれをunlockしようとしないでください。

Julia 1.11

Julia 1.11以上が必要です。

source
Base.unlockFunction
unlock(lock)

lockの所有権を解放します。

これは以前に取得された再帰的ロックである場合、内部カウンタを減少させてすぐに戻ります。

source
Base.trylockFunction
trylock(lock) -> 成功 (Boolean)

ロックが利用可能な場合はロックを取得し、成功した場合は true を返します。ロックがすでに別のタスク/スレッドによってロックされている場合は false を返します。

各成功した trylockunlock によって対応する必要があります。

関数 trylockislocked と組み合わせて、テスト・アンド・テスト・アンド・セットまたは指数バックオフアルゴリズムを書くために使用できます typeof(lock) によってサポートされている場合)(そのドキュメントを参照してください)。

source
Base.islockedFunction
islocked(lock) -> ステータス (ブール値)

lockが任意のタスク/スレッドによって保持されているかどうかを確認します。この関数単独では同期に使用すべきではありません。しかし、islockedtrylockと組み合わせることで、typeof(lock)がサポートしている場合に限り、テスト・アンド・テスト・アンド・セットや指数バックオフアルゴリズムを書くために使用できます(そのドキュメントを参照してください)。

拡張ヘルプ

例えば、以下のようにlockの実装が下記のプロパティを満たす場合、指数バックオフを実装できます。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("タイムアウト")
    end
    trylock(lock) && break
    backoff()
end

実装

ロックの実装は、以下のプロパティを持つislockedを定義し、そのことをドキュメントに記載することを推奨します。

  • islocked(lock)はデータ競合がありません。
  • islocked(lock)falseを返す場合、他のタスクからの干渉がなければ、trylock(lock)の即時呼び出しは成功しなければなりません(trueを返す)。
source
Base.ReentrantLockType
ReentrantLock()

再入可能なロックを作成し、Tasksの同期を行います。同じタスクは必要に応じてロックを何度でも取得できます(これが「再入可能」という名前の意味です)。各lockunlockと対になっている必要があります。

lockを呼び出すと、そのスレッド上でのファイナライザの実行が対応するunlockまで抑制されます。以下に示す標準的なロックパターンの使用は自然にサポートされるべきですが、try/lockの順序を逆にしたり、tryブロックを完全に欠落させたりすることには注意が必要です(例:ロックを保持したまま戻ろうとすること)。

これは、ロック/アンロック呼び出しに対して取得/解放のメモリ順序を提供します。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

!islocked(lck::ReentrantLock)が成立する場合、trylock(lck)は、他のタスクが「同時に」ロックを保持しようとしていない限り成功します。

source
Base.@lockMacro
@lock l expr

lock(f, l::AbstractLock)のマクロ版ですが、f関数の代わりにexprを使用します。次のように展開されます:

lock(l)
try
    expr
finally
    unlock(l)
end

これは、lockdoブロックで使用するのと似ていますが、クロージャを作成せず、したがってパフォーマンスを向上させることができます。

Compat

@lockはJulia 1.3で追加され、Julia 1.10でエクスポートされました。

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

valueをラップし、提供されたlockに関連付けるLockableオブジェクトを作成します。このオブジェクトは@locklocktrylockunlockをサポートします。値にアクセスするには、ロックを保持しながらロック可能オブジェクトのインデックスを指定します。

Julia 1.11

Julia 1.11以上が必要です。

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # 値にアクセスするにはロックを保持する必要があります
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

T型のオブジェクトを最大size個保持できる内部バッファを持つChannelを構築します。 put!が満杯のチャネルで呼び出されると、take!でオブジェクトが削除されるまでブロックします。

Channel(0)はバッファなしのチャネルを構築します。put!は対応するtake!が呼ばれるまでブロックします。そしてその逆も同様です。

他のコンストラクタ:

  • Channel(): デフォルトコンストラクタで、Channel{Any}(0)と同等です。
  • Channel(Inf): Channel{Any}(typemax(Int))と同等です。
  • Channel(sz): Channel{Any}(sz)と同等です。
Julia 1.3

デフォルトコンストラクタChannel()とデフォルトsize=0はJulia 1.3で追加されました。

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

funcから新しいタスクを作成し、それを型Tおよびサイズsizeの新しいチャネルにバインドし、タスクをスケジュールします。すべてが1回の呼び出しで行われます。タスクが終了すると、チャネルは自動的に閉じられます。

funcは、バインドされたチャネルを唯一の引数として受け入れる必要があります。

作成されたタスクへの参照が必要な場合は、キーワード引数taskrefを介してRef{Task}オブジェクトを渡してください。

spawn=trueの場合、funcのために作成されたTaskは、別のスレッドで並行してスケジュールされる可能性があり、これはThreads.@spawnを介してタスクを作成することに相当します。

spawn=trueで、threadpool引数が設定されていない場合、デフォルトは:defaultです。

threadpool引数が設定されている場合(:defaultまたは:interactiveに)、これはspawn=trueを意味し、新しいタスクは指定されたスレッドプールにスパーンされます。

Channelを返します。

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

作成されたタスクを参照する:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

spawn=パラメータはJulia 1.3で追加されました。このコンストラクタはJulia 1.3で追加されました。以前のバージョンのJuliaでは、Channelはキーワード引数を使用してsizeTを設定していましたが、それらのコンストラクタは非推奨です。

Julia 1.9

threadpool=引数はJulia 1.9で追加されました。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

アイテム v をチャネル c に追加します。チャネルが満杯の場合はブロックします。

バッファなしチャネルの場合、別のタスクによって take! が実行されるまでブロックします。

Julia 1.1

v は、put! が呼び出されるときに convert を使ってチャネルの型に変換されます。

source
Base.take!Method
take!(c::Channel)

Channel から値を順番に削除して返します。データが利用可能になるまでブロックします。バッファなしのチャネルの場合、別のタスクによって put! が実行されるまでブロックします。

バッファ付きチャネル:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

バッファなしチャネル:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

Channel に値が格納されているかどうかを判断します。即座に返され、ブロックしません。

バッファなしのチャネルの場合、put! を待機しているタスクがある場合は true を返します。

バッファ付きチャネル:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

バッファなしチャネル:

julia> c = Channel();

julia> isready(c)  # 値を入れるために待機しているタスクはありません!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # put! タスクをスケジュールする

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

Channelから最初に利用可能なアイテムを待機して返します(削除せずに)。注意: fetchはバッファなし(サイズ0)のChannelではサポートされていません。

バッファ付きチャネル:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # アイテムは削除されません
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

チャネルを閉じます。例外(オプションでexcpによって指定される)は、次の操作によってスローされます:

  • put! が閉じたチャネルで呼び出された場合。
  • take! および fetch が空の閉じたチャネルで呼び出された場合。
source
Base.bindMethod
bind(chnl::Channel, task::Task)

chnlのライフタイムをタスクに関連付けます。タスクが終了すると、Channel chnlは自動的に閉じられます。タスク内で捕捉されない例外は、chnlのすべての待機者に伝播されます。

chnlオブジェクトは、タスクの終了に関係なく明示的に閉じることができます。終了したタスクは、すでに閉じられたChannelオブジェクトには影響を与えません。

チャネルが複数のタスクにバインドされている場合、最初に終了したタスクがチャネルを閉じます。複数のチャネルが同じタスクにバインドされている場合、タスクの終了はすべてのバインドされたチャネルを閉じます。

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

scheduleの最も簡単で正しい使用法は、まだ開始されていない(スケジュールされた)Task上です。しかし、4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566waitを、同期インターフェースを構築するための非常に低レベルのビルディングブロックとして使用することも可能です。schedule(task)を呼び出すための重要な前提条件は、呼び出し元がtaskを「所有」している必要があることです。つまり、schedule(task)を呼び出すコードが、指定されたtask内でwaitが発生する場所を知っている必要があります。このような前提条件を確保するための戦略の一つは、以下の例に示すようにアトミックを使用することです。

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEventは、1つのタスクが別のタスクのnotifywaitすることを可能にします。これは、waitが単一のタスクから1回しか使用できないため、制限された通信インターフェースです(ev.taskの非原子的な割り当てに注意してください)。

この例では、notify(ev::OneWayEvent)は、それが状態をOWE_WAITINGからOWE_NOTIFYINGに変更する場合に限り、schedule(ev.task)を呼び出すことが許可されています。これにより、wait(ev::OneWayEvent)を実行しているタスクが現在okブランチにあり、他のタスクが@atomicreplace(ev.state, state => OWE_NOTIFYING)を失敗させるため、schedule(ev.task)を試みることができないことがわかります。