Tasks
Core.Task
— TypeTask(func)
指定された関数 func
(引数なしで呼び出せる必要があります)を実行する Task
(すなわちコルーチン)を作成します。このタスクは、この関数が戻ると終了します。タスクは、schedule
d で構築時の親の「ワールドエイジ」で実行されます。
デフォルトでは、タスクにはスティッキービットが true t.sticky
に設定されます。これは、@async
の歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawn
の動作を得るには、スティッキービットを手動で false
に設定してください。
例
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
この例では、b
はまだ開始されていない実行可能な Task
です。
Base.@task
— Macro@task
式をTask
でラップし、実行せずにTask
を返します。これはタスクを作成するだけで、実行はしません。
デフォルトでは、タスクにはスティッキービットがtrue t.sticky
に設定されます。これは@async
の歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawn
の動作を得るには、スティッキービットを手動でfalse
に設定してください。
例
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
式を Task
でラップし、ローカルマシンのスケジューラーキューに追加します。
値は $
を介して @async
に補間でき、これは構築された基盤のクロージャに値を直接コピーします。これにより、変数の値を挿入でき、非同期コードを現在のタスクにおける変数の値の変更から隔離できます。
公に配布されるライブラリでは、並列性が必要ない場合でも常に @async
よりも Threads.@spawn
を優先することを強く推奨します。これは、@async
の使用が現在のJuliaの実装において親タスクのワーカースレッド間の移行を無効にするためです。したがって、ライブラリ関数内での @async
の見かけ上無害な使用が、ユーザーアプリケーションの非常に異なる部分のパフォーマンスに大きな影響を与える可能性があります。
$
を介した値の補間は、Julia 1.4以降で利用可能です。
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
コレクション(または複数の同じ長さのコレクション)に対して f
をマッピングするために、複数の同時タスクを使用します。複数のコレクション引数がある場合、f
は要素ごとに適用されます。
ntasks
は同時に実行するタスクの数を指定します。コレクションの長さに応じて、ntasks
が指定されていない場合、最大100のタスクが同時マッピングに使用されます。
ntasks
はゼロ引数関数としても指定できます。この場合、各要素を処理する前に並行して実行するタスクの数がチェックされ、ntasks_func
の値が現在のタスク数を超えている場合に新しいタスクが開始されます。
batch_size
が指定されている場合、コレクションはバッチモードで処理されます。この場合、f
は引数タプルの Vector
を受け入れ、結果のベクターを返す関数でなければなりません。入力ベクターの長さは batch_size
以下になります。
以下の例は、マッピング関数が実行されるタスクの objectid
を返すことによって、異なるタスクでの実行を強調しています。
まず、ntasks
が未定義の場合、各要素は異なるタスクで処理されます。
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
ntasks=2
の場合、すべての要素は2つのタスクで処理されます。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
batch_size
が定義されている場合、マッピング関数は引数タプルの配列を受け入れ、結果の配列を返すように変更する必要があります。これを達成するために、修正されたマッピング関数では map
が使用されます。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
asyncmap
と同様ですが、コレクションを返すのではなく、results
に出力を格納します。
変更された引数が他の引数とメモリを共有している場合、動作が予期しないものになることがあります。
Base.current_task
— Functioncurrent_task()
現在実行中のTask
を取得します。
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
タスクが終了したかどうかを判断します。
例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
タスクが実行を開始したかどうかを判断します。
例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
タスクが例外によって終了したかどうかを判断します。
例
julia> a4() = error("タスクが失敗しました");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
この関数は少なくともJulia 1.3が必要です。
Base.task_local_storage
— Methodtask_local_storage(key)
現在のタスクのタスクローカルストレージ内のキーの値を取得します。
Base.task_local_storage
— Methodtask_local_storage(key, value)
現在のタスクのタスクローカルストレージにキーに値を割り当てます。
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
body
関数を修正されたタスクローカルストレージで呼び出します。このストレージでは value
が key
に割り当てられ、key
の以前の値、またはそれが存在しない場合はその状態がその後に復元されます。動的スコープをエミュレートするのに便利です。
Scheduling
Base.yield
— Functionyield()
スケジューラに切り替えて、他のスケジュールされたタスクが実行できるようにします。この関数を呼び出すタスクはまだ実行可能であり、他に実行可能なタスクがない場合はすぐに再起動されます。
yield(t::Task, arg = nothing)
t
に即座に制御を渡す、スケジューラを呼び出す前にschedule(t, arg); yield()
を実行する、高速で不公平なスケジューリングバージョンです。
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
指定されたタスクに切り替えます。タスクに初めて切り替えると、タスクの関数が引数なしで呼び出されます。以降の切り替えでは、arg
がタスクの最後のyieldto
呼び出しから返されます。これは低レベルの呼び出しであり、タスクの切り替えのみを行い、状態やスケジューリングを考慮しません。その使用は推奨されません。
Base.sleep
— Functionsleep(seconds)
現在のタスクを指定された秒数だけブロックします。最小スリープ時間は1ミリ秒または0.001
の入力です。
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
Task
をスケジューラのキューに追加します。これにより、タスクはシステムがアイドル状態のときに常に実行されますが、タスクがwait
のようなブロッキング操作を行う場合は除きます。
第二引数val
が提供されると、タスクが再度実行されるときに(yieldto
の戻り値を介して)タスクに渡されます。error
がtrue
の場合、値は起こされたタスク内で例外として発生します。
開始された任意のTask
に対してschedule
を使用するのは不正です。詳細についてはAPIリファレンスを参照してください。
デフォルトでは、タスクにはスティッキービットがtrue
に設定されますt.sticky
。これは@async
の歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawn
の動作を得るには、スティッキービットを手動でfalse
に設定してください。
例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
タスクt
が失敗した場合、stderr
にエラーログを出力します。
例
julia> Base._wait(errormonitor(Threads.@spawn error("タスクが失敗しました")))
Unhandled Task ERROR: タスクが失敗しました
Stacktrace:
[...]
Base.@sync
— Macro@sync
すべての字句的に囲まれた @async
、@spawn
、Distributed.@spawnat
および Distributed.@distributed
の使用が完了するまで待機します。囲まれた非同期操作によってスローされたすべての例外は収集され、CompositeException
としてスローされます。
例
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— FunctionThreads.Condition
に関する特別な注意:
呼び出し元は、このメソッドを呼び出す前にThreads.Condition
を所有するlock
を保持している必要があります。呼び出しタスクは、通常、同じThreads.Condition
オブジェクトでnotify
を呼び出すことによって他のタスクがそれを起こすまでブロックされます。ブロック時にロックは原子的に解放され(再帰的にロックされていた場合でも)、戻る前に再取得されます。
wait(r::Future)
指定された Future
の値が利用可能になるまで待機します。
wait(r::RemoteChannel, args...)
指定された RemoteChannel
で値が利用可能になるまで待機します。
wait([x])
現在のタスクを、引数のタイプに応じて何らかのイベントが発生するまでブロックします:
Channel
: チャンネルに値が追加されるのを待ちます。Condition
: 条件でnotify
が呼ばれるのを待ち、notify
に渡されたval
パラメータを返します。条件で待機する場合、first=true
を渡すことができ、これにより待機者が通常の先入れ先出しの動作の代わりにnotify
で起こすための最初の位置に置かれます。Process
: プロセスまたはプロセスチェーンが終了するのを待ちます。プロセスのexitcode
フィールドを使用して成功または失敗を判断できます。Task
:Task
が終了するのを待ちます。タスクが例外で失敗した場合、TaskFailedException
(失敗したタスクをラップします)がスローされます。RawFD
: ファイルディスクリプタの変更を待ちます(FileWatching
パッケージを参照)。
引数が渡されない場合、タスクは未定義の期間ブロックされます。タスクは、schedule
またはyieldto
への明示的な呼び出しによってのみ再起動できます。
しばしばwait
はwhile
ループ内で呼び出され、待機している条件が満たされるまで進行しないようにします。
wait(c::Channel)
Channel
isready
になるまでブロックします。
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # タスクはチャネルが準備できていないためブロックされています
false
julia> put!(c, 1);
julia> istaskdone(task) # タスクは今やブロック解除されました
true
Base.fetch
— Methodfetch(t::Task)
Task
が完了するのを待ってから、その結果の値を返します。タスクが例外で失敗した場合、失敗したタスクをラップした TaskFailedException
がスローされます。
Base.fetch
— Methodfetch(x::Any)
x
を返します。
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
testcb()
が true
を返すか、timeout
秒が経過するまで待機します。どちらか早い方が優先されます。テスト関数は毎 pollint
秒ごとにポーリングされます。pollint
の最小値は 0.001 秒、すなわち 1 ミリ秒です。
:ok
または :timed_out
を返します。
例
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
タスクが待機できるエッジトリガーイベントソースを作成します。Condition
上でwait
を呼び出すタスクは、一時停止され、キューに入れられます。後でCondition
上でnotify
が呼び出されると、タスクが起こされます。条件で待機することは、値を返すか、notify
のオプション引数が使用されている場合にエラーを発生させることがあります。エッジトリガーは、notify
が呼び出された時点で待機しているタスクのみが起こされることを意味します。レベルトリガー通知の場合、通知が発生したかどうかを追跡するために追加の状態を保持する必要があります。Channel
およびThreads.Event
タイプはこれを行い、レベルトリガーイベントに使用できます。
このオブジェクトはスレッドセーフではありません。スレッドセーフなバージョンについては、Threads.Condition
を参照してください。
Base.Threads.Condition
— TypeThreads.Condition([lock])
Base.Condition
のスレッドセーフなバージョンです。
Threads.Condition
で wait
または notify
を呼び出すには、まずそれに対して lock
を呼び出す必要があります。wait
が呼び出されると、ブロッキング中にロックが原子的に解放され、wait
が戻る前に再取得されます。したがって、Threads.Condition
c
の慣用的な使用法は次のようになります。
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
この機能は少なくとも Julia 1.2 が必要です。
Base.Event
— TypeEvent([autoreset=false])
レベルトリガーイベントソースを作成します。Event
上でwait
を呼び出すタスクは、notify
がEvent
上で呼び出されるまで、一時停止されてキューに入れられます。notify
が呼び出されると、Event
はシグナル状態のままとなり、reset
が呼び出されるまで、タスクはそれを待っている間にブロックされなくなります。
autoreset
がtrueの場合、notify
の呼び出しごとに、wait
から解放されるタスクは最大で1つになります。
これは、notify/waitに対して取得と解放のメモリ順序を提供します。
この機能は、少なくともJulia 1.1が必要です。
autoreset
機能とメモリ順序の保証には、少なくともJulia 1.8が必要です。
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
条件を待っているタスクを起こし、val
を渡します。all
がtrue
(デフォルト)であれば、すべての待機中のタスクが起こされ、そうでなければ1つだけが起こされます。error
がtrue
であれば、渡された値が起こされたタスクで例外として発生します。
起こされたタスクの数を返します。condition
で待機中のタスクがない場合は0を返します。
Base.reset
— MethodBase.Semaphore
— TypeSemaphore(sem_size)
sem_size
の同時取得数を最大とするカウントセマフォを作成します。各取得はリリースと対になっている必要があります。
これにより、取得/リリース呼び出しに対して取得とリリースのメモリ順序が提供されます。
Base.acquire
— Functionacquire(s::Semaphore)
sem_size
の許可のいずれかが利用可能になるまで待機し、取得できるまでブロックします。
acquire(f, s::Semaphore)
Semaphore s
から取得した後に f
を実行し、完了またはエラー時に release
します。
例えば、同時に foo
の呼び出しが2回だけアクティブであることを保証する do-block 形式は次の通りです:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
このメソッドは少なくとも Julia 1.8 を必要とします。
Base.release
— Functionrelease(s::Semaphore)
プールに1つの許可を返し、別のタスクがそれを取得して実行を再開できるようにする可能性があります。
Base.AbstractLock
— TypeBase.lock
— Functionlock(lock)
lock
が利用可能になったときに取得します。ロックが別のタスク/スレッドによってすでにロックされている場合は、利用可能になるまで待機します。
各lock
はunlock
で対応する必要があります。
lock(f::Function, lock)
lock
を取得し、lock
を保持した状態でf
を実行し、f
が返るときにlock
を解放します。もしlock
が別のタスク/スレッドによってすでにロックされている場合は、それが利用可能になるまで待機します。
この関数が返るとき、lock
は解放されているため、呼び出し元はそれをunlock
しようとしないでください。
参照: @lock
.
第二引数としてChannel
を使用するには、Julia 1.7以降が必要です。
lock(f::Function, l::Lockable)
l
に関連付けられたロックを取得し、ロックを保持した状態でf
を実行し、f
が返るとロックを解放します。f
は1つの位置引数を受け取ります:l
によってラップされた値です。ロックが別のタスク/スレッドによってすでにロックされている場合は、それが利用可能になるまで待機します。この関数が返ると、lock
は解放されるため、呼び出し元はそれをunlock
しようとしないでください。
Julia 1.11以上が必要です。
Base.unlock
— Functionunlock(lock)
lock
の所有権を解放します。
これは以前に取得された再帰的ロックである場合、内部カウンタを減少させてすぐに戻ります。
Base.trylock
— Functiontrylock(lock) -> 成功 (Boolean)
ロックが利用可能な場合はロックを取得し、成功した場合は true
を返します。ロックがすでに別のタスク/スレッドによってロックされている場合は false
を返します。
各成功した trylock
は unlock
によって対応する必要があります。
関数 trylock
は islocked
と組み合わせて、テスト・アンド・テスト・アンド・セットまたは指数バックオフアルゴリズムを書くために使用できます (typeof(lock)
によってサポートされている場合)(そのドキュメントを参照してください)。
Base.islocked
— Functionislocked(lock) -> ステータス (ブール値)
lock
が任意のタスク/スレッドによって保持されているかどうかを確認します。この関数単独では同期に使用すべきではありません。しかし、islocked
をtrylock
と組み合わせることで、typeof(lock)
がサポートしている場合に限り、テスト・アンド・テスト・アンド・セットや指数バックオフアルゴリズムを書くために使用できます(そのドキュメントを参照してください)。
拡張ヘルプ
例えば、以下のようにlock
の実装が下記のプロパティを満たす場合、指数バックオフを実装できます。
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("タイムアウト")
end
trylock(lock) && break
backoff()
end
実装
ロックの実装は、以下のプロパティを持つislocked
を定義し、そのことをドキュメントに記載することを推奨します。
islocked(lock)
はデータ競合がありません。islocked(lock)
がfalse
を返す場合、他のタスクからの干渉がなければ、trylock(lock)
の即時呼び出しは成功しなければなりません(true
を返す)。
Base.ReentrantLock
— TypeReentrantLock()
再入可能なロックを作成し、Task
sの同期を行います。同じタスクは必要に応じてロックを何度でも取得できます(これが「再入可能」という名前の意味です)。各lock
はunlock
と対になっている必要があります。
lock
を呼び出すと、そのスレッド上でのファイナライザの実行が対応するunlock
まで抑制されます。以下に示す標準的なロックパターンの使用は自然にサポートされるべきですが、try/lockの順序を逆にしたり、tryブロックを完全に欠落させたりすることには注意が必要です(例:ロックを保持したまま戻ろうとすること)。
これは、ロック/アンロック呼び出しに対して取得/解放のメモリ順序を提供します。
lock(l)
try
<atomic work>
finally
unlock(l)
end
!islocked(lck::ReentrantLock)
が成立する場合、trylock(lck)
は、他のタスクが「同時に」ロックを保持しようとしていない限り成功します。
Base.@lock
— Macro@lock l expr
lock(f, l::AbstractLock)
のマクロ版ですが、f
関数の代わりにexpr
を使用します。次のように展開されます:
lock(l)
try
expr
finally
unlock(l)
end
これは、lock
をdo
ブロックで使用するのと似ていますが、クロージャを作成せず、したがってパフォーマンスを向上させることができます。
@lock
はJulia 1.3で追加され、Julia 1.10でエクスポートされました。
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
value
をラップし、提供されたlock
に関連付けるLockable
オブジェクトを作成します。このオブジェクトは@lock
、lock
、trylock
、unlock
をサポートします。値にアクセスするには、ロックを保持しながらロック可能オブジェクトのインデックスを指定します。
Julia 1.11以上が必要です。
例
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # 値にアクセスするにはロックを保持する必要があります
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
型 T
のオブジェクトを渡すチャネルの表現。
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
T
型のオブジェクトを最大size
個保持できる内部バッファを持つChannel
を構築します。 put!
が満杯のチャネルで呼び出されると、take!
でオブジェクトが削除されるまでブロックします。
Channel(0)
はバッファなしのチャネルを構築します。put!
は対応するtake!
が呼ばれるまでブロックします。そしてその逆も同様です。
他のコンストラクタ:
Channel()
: デフォルトコンストラクタで、Channel{Any}(0)
と同等です。Channel(Inf)
:Channel{Any}(typemax(Int))
と同等です。Channel(sz)
:Channel{Any}(sz)
と同等です。
デフォルトコンストラクタChannel()
とデフォルトsize=0
はJulia 1.3で追加されました。
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
func
から新しいタスクを作成し、それを型T
およびサイズsize
の新しいチャネルにバインドし、タスクをスケジュールします。すべてが1回の呼び出しで行われます。タスクが終了すると、チャネルは自動的に閉じられます。
func
は、バインドされたチャネルを唯一の引数として受け入れる必要があります。
作成されたタスクへの参照が必要な場合は、キーワード引数taskref
を介してRef{Task}
オブジェクトを渡してください。
spawn=true
の場合、func
のために作成されたTask
は、別のスレッドで並行してスケジュールされる可能性があり、これはThreads.@spawn
を介してタスクを作成することに相当します。
spawn=true
で、threadpool
引数が設定されていない場合、デフォルトは:default
です。
threadpool
引数が設定されている場合(:default
または:interactive
に)、これはspawn=true
を意味し、新しいタスクは指定されたスレッドプールにスパーンされます。
Channel
を返します。
例
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
作成されたタスクを参照する:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
spawn=
パラメータはJulia 1.3で追加されました。このコンストラクタはJulia 1.3で追加されました。以前のバージョンのJuliaでは、Channelはキーワード引数を使用してsize
とT
を設定していましたが、それらのコンストラクタは非推奨です。
threadpool=
引数はJulia 1.9で追加されました。
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
アイテム v
をチャネル c
に追加します。チャネルが満杯の場合はブロックします。
バッファなしチャネルの場合、別のタスクによって take!
が実行されるまでブロックします。
v
は、put!
が呼び出されるときに convert
を使ってチャネルの型に変換されます。
Base.take!
— Methodtake!(c::Channel)
Channel
から値を順番に削除して返します。データが利用可能になるまでブロックします。バッファなしのチャネルの場合、別のタスクによって put!
が実行されるまでブロックします。
例
バッファ付きチャネル:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
バッファなしチャネル:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
Channel
に値が格納されているかどうかを判断します。即座に返され、ブロックしません。
バッファなしのチャネルの場合、put!
を待機しているタスクがある場合は true
を返します。
例
バッファ付きチャネル:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
バッファなしチャネル:
julia> c = Channel();
julia> isready(c) # 値を入れるために待機しているタスクはありません!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # put! タスクをスケジュールする
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
Channel
から最初に利用可能なアイテムを待機して返します(削除せずに)。注意: fetch
はバッファなし(サイズ0)のChannel
ではサポートされていません。
例
バッファ付きチャネル:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # アイテムは削除されません
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
チャネルを閉じます。例外(オプションでexcp
によって指定される)は、次の操作によってスローされます:
Base.bind
— Methodbind(chnl::Channel, task::Task)
chnl
のライフタイムをタスクに関連付けます。タスクが終了すると、Channel
chnl
は自動的に閉じられます。タスク内で捕捉されない例外は、chnl
のすべての待機者に伝播されます。
chnl
オブジェクトは、タスクの終了に関係なく明示的に閉じることができます。終了したタスクは、すでに閉じられたChannel
オブジェクトには影響を与えません。
チャネルが複数のタスクにバインドされている場合、最初に終了したタスクがチャネルを閉じます。複数のチャネルが同じタスクにバインドされている場合、タスクの終了はすべてのバインドされたチャネルを閉じます。
例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
schedule
の最も簡単で正しい使用法は、まだ開始されていない(スケジュールされた)Task
上です。しかし、4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
とwait
を、同期インターフェースを構築するための非常に低レベルのビルディングブロックとして使用することも可能です。schedule(task)
を呼び出すための重要な前提条件は、呼び出し元がtask
を「所有」している必要があることです。つまり、schedule(task)
を呼び出すコードが、指定されたtask
内でwait
が発生する場所を知っている必要があります。このような前提条件を確保するための戦略の一つは、以下の例に示すようにアトミックを使用することです。
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
は、1つのタスクが別のタスクのnotify
をwait
することを可能にします。これは、wait
が単一のタスクから1回しか使用できないため、制限された通信インターフェースです(ev.task
の非原子的な割り当てに注意してください)。
この例では、notify(ev::OneWayEvent)
は、それが状態をOWE_WAITING
からOWE_NOTIFYING
に変更する場合に限り、schedule(ev.task)
を呼び出すことが許可されています。これにより、wait(ev::OneWayEvent)
を実行しているタスクが現在ok
ブランチにあり、他のタスクが@atomicreplace(ev.state, state => OWE_NOTIFYING)
を失敗させるため、schedule(ev.task)
を試みることができないことがわかります。