Tasks

Core.TaskType
Task(func[, reserved_stack::Int])

指定された関数 func(引数なしで呼び出せる必要があります)を実行する Task(すなわちコルーチン)を作成します。このタスクは、この関数が戻ると終了します。タスクは、scheduled で構築時の親からの「ワールドエイジ」で実行されます。

オプションの reserved_stack 引数は、このタスクに利用可能なスタックのサイズをバイト単位で指定します。デフォルトの 0 は、システム依存のスタックサイズのデフォルトを使用します。

Warning

デフォルトでは、タスクにはスティッキービットが true に設定されます t.sticky。これは、@async の歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawn の動作を得るには、スティッキービットを手動で false に設定してください。

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

この例では、b はまだ開始されていない実行可能な Task です。

source
Base.@taskMacro
@task

式をTaskでラップし、実行せずにTaskを返します。これはタスクを作成するだけで、実行はしません。

Warning

デフォルトでは、タスクにはスティッキービットがtrue t.stickyに設定されます。これは@asyncの歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawnの動作を得るには、スティッキービットを手動でfalseに設定してください。

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

式を Task でラップし、ローカルマシンのスケジューラーキューに追加します。

値は $ を介して @async に補間でき、これは構築された基盤のクロージャに値を直接コピーします。これにより、変数のを挿入でき、非同期コードを現在のタスクにおける変数の値の変更から隔離できます。

Warning

@async よりも常に Threads.@spawn を優先することが強く推奨されます 並列性が必要ない場合でも、特に公開されているライブラリでは。これは、@async の使用が現在の Julia の実装においてタスクのワーカースレッド間の移行を無効にするためです。したがって、ライブラリ関数内での @async の見かけ上無害な使用が、ユーザーアプリケーションの非常に異なる部分のパフォーマンスに大きな影響を与える可能性があります。

Julia 1.4

$ を介して値を補間することは、Julia 1.4 以降で利用可能です。

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

複数の同時タスクを使用して、コレクション(または複数の同じ長さのコレクション)に対して f をマッピングします。複数のコレクション引数の場合、f は要素ごとに適用されます。

出力は、コレクション c の要素と同じ順序であることが保証されています。

ntasks は同時に実行するタスクの数を指定します。コレクションの長さに応じて、ntasks が指定されていない場合、最大100のタスクが同時マッピングに使用されます。

ntasks はゼロ引数関数としても指定できます。この場合、各要素を処理する前に並行して実行するタスクの数がチェックされ、ntasks_func の値が現在のタスク数を超えている場合に新しいタスクが開始されます。

batch_size が指定されている場合、コレクションはバッチモードで処理されます。この場合、f は引数タプルの Vector を受け入れ、結果のベクターを返す関数でなければなりません。入力ベクターの長さは batch_size 以下になります。

以下の例は、マッピング関数が実行されるタスクの objectid を返すことによって、異なるタスクでの実行を強調しています。

まず、ntasks が未定義の場合、各要素は異なるタスクで処理されます。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Vector{UInt64}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2 の場合、すべての要素は2つのタスクで処理されます。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Vector{UInt64}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

batch_size が定義されている場合、マッピング関数は引数タプルの配列を受け入れ、結果の配列を返すように変更する必要があります。これを達成するために、修正されたマッピング関数では map が使用されます。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Vector{String}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

asyncmapと同様ですが、コレクションを返すのではなく、resultsに出力を格納します。

Warning

変更された引数が他の引数とメモリを共有している場合、動作が予期しないものになることがあります。

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

タスクが終了したかどうかを判断します。

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

タスクが実行を開始したかどうかを判断します。

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

タスクが例外がスローされたために終了したかどうかを判断します。

julia> a4() = error("task failed");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

この関数は少なくともJulia 1.3が必要です。

source
Base.task_local_storageMethod
task_local_storage(key)

現在のタスクのタスクローカルストレージでキーの値を調べます。

source
Base.task_local_storageMethod
task_local_storage(key, value)

現在のタスクのタスクローカルストレージにキーに値を割り当てます。

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

body 関数を修正されたタスクローカルストレージで呼び出します。このとき、valuekey に割り当てられます。key の以前の値、またはそれが存在しない場合はその状態がその後に復元されます。動的スコープをエミュレートするのに便利です。

source
Core.ConcurrencyViolationErrorType
ConcurrencyViolationError(msg) <: Exception

同時セマンティクスの検出可能な違反が発生したときにスローされるエラーです。

これが使用される例の非網羅的なリストには以下が含まれます:

  • デッドロックが検出されたときにスローされる(例:wait(current_task())
  • 知られている安全でない動作が試みられたとき(例:yield(current_task)
  • 複数の同時タスクから変更されようとしている既知のスレッドセーフでないデータ構造
  • このタスクによってロックされていないロックが解除されている
source

Scheduling

Base.yieldFunction
yield(t::Task, arg = nothing)

schedule(t, arg); yield() の高速で不公平なスケジューリングバージョンで、スケジューラを呼び出す前に t に即座に制御を渡します。

t が現在実行中のタスクである場合、ConcurrencyViolationError をスローします。

source
yield()

スケジューラに切り替えて、他のスケジュールされたタスクが実行できるようにします。この関数を呼び出すタスクはまだ実行可能であり、他に実行可能なタスクがない場合はすぐに再起動されます。

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

指定されたタスクに切り替えます。タスクに初めて切り替えると、タスクの関数が引数なしで呼び出されます。以降の切り替えでは、arg がタスクの最後の yieldto 呼び出しから返されます。これは低レベルの呼び出しであり、タスクの切り替えのみを行い、状態やスケジューリングを考慮しません。その使用は推奨されません。

source
Base.sleepFunction
sleep(seconds)

指定された秒数だけ現在のタスクをブロックします。最小スリープ時間は1ミリ秒または0.001の入力です。

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Taskをスケジューラのキューに追加します。これにより、タスクはシステムがアイドル状態のときに常に実行されますが、タスクがwaitのようなブロッキング操作を行う場合は除きます。

第二引数valが提供されると、タスクが再び実行されるときに(yieldtoの戻り値を介して)タスクに渡されます。errortrueの場合、値は起こされたタスクで例外として発生します。

Warning

開始された任意のTaskに対してscheduleを使用するのは不正です。詳細についてはAPIリファレンスを参照してください。

Warning

デフォルトでは、タスクにはスティッキービットがtrueに設定されますt.sticky。これは@asyncの歴史的なデフォルトをモデル化しています。スティッキーなタスクは、最初にスケジュールされたワーカースレッドでのみ実行でき、スケジュールされると、スケジュール元のタスクもスティッキーになります。Threads.@spawnの動作を得るには、スティッキービットを手動でfalseに設定してください。

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

タスク t が失敗した場合、stderr にエラーログを出力します。

julia> wait(errormonitor(Threads.@spawn error("task failed")); throw = false)
Unhandled Task ERROR: task failed
Stacktrace:
[...]
source
Base.@syncMacro
@sync

すべての字句的に囲まれた @async@spawnDistributed.@spawnat および Distributed.@distributed の使用が完了するまで待機します。囲まれた非同期操作によってスローされたすべての例外は収集され、CompositeException としてスローされます。

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction
wait([x])

現在のタスクを、何らかのイベントが発生するまでブロックします。

  • Channel: チャンネルに値が追加されるのを待ちます。
  • Condition: 条件でnotifyが呼び出されるのを待ち、notifyに渡されたvalパラメータを返します。正確な動作については、waitCondition特有のドキュメントを参照してください。
  • Process: プロセスまたはプロセスチェーンが終了するのを待ちます。プロセスのexitcodeフィールドを使用して、成功または失敗を判断できます。
  • Task: Taskが終了するのを待ちます。正確な動作については、waitTask特有のドキュメントを参照してください。
  • RawFD: ファイルディスクリプタの変更を待ちます(FileWatchingパッケージを参照)。

引数が渡されない場合、タスクは未定義の期間ブロックされます。タスクは、scheduleまたはyieldtoへの明示的な呼び出しによってのみ再起動できます。

しばしばwaitwhileループ内で呼び出され、待機している条件が満たされるまで進行しないようにします。

source
Base.waitanyFunction
waitany(tasks; throw=true) -> (done_tasks, remaining_tasks)

与えられたタスクのうち、少なくとも1つが完了するまで待機します。

throwtrueの場合、完了したタスクの1つが例外で完了したときにCompositeExceptionをスローします。

戻り値は2つのタスクベクターから構成されます。最初のベクターは完了したタスクで構成され、もう1つは未完了のタスクで構成されます。

Warning

これは、各タスクが直列に実行される複数の個別タスクを使用するコードを書くことと比較して、スケールが悪くなる可能性があります。なぜなら、これを呼び出すたびにtasksのリストをスキャンし、各タスクと同期する必要があるからです。あるいは、代わりにwaitall(tasks; failfast=true)を使用することを検討してください。

source
Base.waitallFunction
waitall(tasks; failfast=true, throw=true) -> (done_tasks, remaining_tasks)

与えられたすべてのタスクが完了するまで待機します。

failfasttrue の場合、関数は与えられたタスクのうち少なくとも1つが例外によって終了したときに戻ります。throwtrue の場合、完了したタスクの1つが失敗したときに CompositeException をスローします。

failfastthrow のキーワード引数は独立して機能します。throw=true のみが指定された場合、この関数はすべてのタスクが完了するまで待機します。

戻り値は2つのタスクベクターから構成されます。最初のベクターは完了したタスクで構成され、もう1つは未完了のタスクで構成されます。

source
Base.fetchMethod
fetch(t::Task)

Task が終了するのを待ってから、その結果の値を返します。タスクが例外で失敗した場合、失敗したタスクをラップした TaskFailedException がスローされます。

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

testcb()true を返すか、timeout 秒が経過するまで待機します。どちらか早い方が優先されます。テスト関数は毎 pollint 秒ごとにポーリングされます。pollint の最小値は 0.001 秒、すなわち 1 ミリ秒です。

:ok または :timed_out を返します。

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

タスクが待機できるエッジトリガーイベントソースを作成します。Condition上でwaitを呼び出すタスクは、一時停止され、キューに入れられます。後でCondition上でnotifyが呼び出されると、タスクが起こされます。条件で待機することは、値を返すか、notifyのオプション引数が使用されるとエラーを発生させることがあります。エッジトリガーは、notifyが呼び出された時点で待機しているタスクのみが起こされることを意味します。レベルトリガー通知の場合、通知が発生したかどうかを追跡するために追加の状態を保持する必要があります。ChannelおよびThreads.Event型はこれを行い、レベルトリガーイベントに使用できます。

このオブジェクトはスレッドセーフではありません。スレッドセーフなバージョンについてはThreads.Conditionを参照してください。

source
Base.Threads.ConditionType
Threads.Condition([lock])

Base.Condition のスレッドセーフなバージョンです。

Threads.Conditionwait または notify を呼び出すには、最初にそれに対して lock を呼び出す必要があります。wait が呼び出されると、ロックはブロッキング中に原子的に解放され、wait が戻る前に再取得されます。したがって、Threads.Condition c の慣用的な使用法は次のようになります。

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

この機能は少なくとも Julia 1.2 が必要です。

source
Base.EventType
Event([autoreset=false])

レベルトリガーイベントソースを作成します。Event上でwaitを呼び出すタスクは、一時停止され、notifyEvent上で呼び出されるまでキューに入れられます。notifyが呼び出されると、Eventはシグナル状態のままとなり、resetが呼び出されるまでタスクは待機中にブロックされなくなります。

autoresetがtrueの場合、notifyの呼び出しごとに、waitから解放されるタスクは最大で1つになります。

これにより、notify/waitに対する取得と解放のメモリ順序が提供されます。

Julia 1.1

この機能は少なくともJulia 1.1が必要です。

Julia 1.8

autoreset機能とメモリ順序の保証には、少なくともJulia 1.8が必要です。

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

条件を待っているタスクを起こし、valを渡します。alltrue(デフォルト)であれば、すべての待機中のタスクが起こされ、そうでなければ1つだけが起こされます。errortrueであれば、渡された値が起こされたタスクで例外として発生します。

起こされたタスクの数を返します。conditionで待機中のタスクがない場合は0を返します。

source
Base.resetMethod
reset(::Event)

Event を未設定の状態にリセットします。その後、wait への将来の呼び出しは、再度 notify が呼び出されるまでブロックされます。

source
Base.SemaphoreType
Semaphore(sem_size)

カウントセマフォを作成し、同時に使用できるのは最大で sem_size の取得のみです。各取得は解放と一致する必要があります。

これにより、取得/解放呼び出しに対して取得と解放のメモリ順序が提供されます。

source
Base.acquireFunction
acquire(f, s::Semaphore)

Semaphore s から取得した後に f を実行し、完了またはエラー時に release します。

例えば、同時に foo の呼び出しが2回だけアクティブであることを保証する do-block 形式の例:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

このメソッドは少なくとも Julia 1.8 を必要とします。

source
acquire(s::Semaphore)

sem_size の許可のいずれかが利用可能になるまで待機し、取得できるまでブロックします。

source
Base.releaseFunction
release(s::Semaphore)

プールに1つの許可を返し、別のタスクがそれを取得して実行を再開できるようにする可能性があります。

source
Base.lockFunction
lock(f::Function, l::Lockable)

lに関連付けられたロックを取得し、ロックを保持した状態でfを実行し、fが返るとロックを解放します。fは1つの位置引数を受け取ります:lによってラップされた値です。ロックが別のタスク/スレッドによってすでにロックされている場合は、それが利用可能になるまで待機します。この関数が返ると、lockは解放されているため、呼び出し元はそれをunlockしようとすべきではありません。

Julia 1.11

Julia 1.11以上が必要です。

source
lock(f::Function, lock)

lockを取得し、lockが保持されている状態でfを実行し、fが返るときにlockを解放します。ロックが別のタスク/スレッドによってすでにロックされている場合は、それが利用可能になるまで待機します。

この関数が返るとき、lockは解放されているため、呼び出し元はそれをunlockしようとすべきではありません。

参照: @lock.

Julia 1.7

第二引数としてChannelを使用するには、Julia 1.7以降が必要です。

source
lock(lock)

lockが利用可能になったときに取得します。ロックが別のタスク/スレッドによってすでにロックされている場合は、利用可能になるまで待機します。

lockunlockで対応する必要があります。

source
Base.unlockFunction
unlock(lock)

lockの所有権を解放します。

これは以前に取得された再帰的ロックである場合、内部カウンタを減少させてすぐに戻ります。

source
Base.trylockFunction
trylock(lock) -> Success (Boolean)

ロックが利用可能な場合はロックを取得し、成功した場合は true を返します。ロックが別のタスク/スレッドによってすでにロックされている場合は false を返します。

各成功した trylockunlock によって対応する必要があります。

関数 trylockislocked と組み合わせて、ロックの typeof(lock) がサポートされている場合 にテスト・アンド・テスト・アンド・セットまたは指数バックオフアルゴリズムを書くために使用できます(そのドキュメントを参照してください)。

source
Base.islockedFunction
islocked(lock) -> Status (Boolean)

lockが任意のタスク/スレッドによって保持されているかどうかを確認します。この関数単独では同期に使用すべきではありません。しかし、islockedtrylockと組み合わせることで、typeof(lock)がサポートしている場合にテスト・アンド・テスト・アンド・セットや指数バックオフアルゴリズムを書くために使用できます(そのドキュメントを参照してください)。

Extended help

例えば、以下のように指数バックオフを実装することができます、もしlockの実装が以下に文書化された特性を満たしている場合。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Implementation

ロックの実装は、以下の特性を持つislockedを定義し、そのドキュメントに記載することを推奨します。

  • islocked(lock)はデータ競合がありません。
  • islocked(lock)falseを返す場合、他のタスクからの干渉がなければ、trylock(lock)の即時呼び出しは成功しなければなりません(trueを返します)。
source
Base.ReentrantLockType
ReentrantLock()

再入ロックを作成して、Tasksを同期させます。同じタスクは必要な回数だけロックを取得できます(これが名前の「再入可能」という部分を意味します)。各lockunlockと対になっている必要があります。

lockを呼び出すと、そのスレッドでのファイナライザの実行が対応するunlockまで抑制されます。以下に示す標準的なロックパターンの使用は自然にサポートされるべきですが、try/lockの順序を逆にしたり、tryブロックを完全に欠落させたりすることには注意してください(例えば、ロックを保持したまま戻ろうとすること)。

これは、ロック/アンロック呼び出しに対して取得/解放のメモリ順序を提供します。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

!islocked(lck::ReentrantLock)が成立する場合、trylock(lck)は、他のタスクが「同時に」ロックを保持しようとしていない限り成功します。

source
Base.@lockMacro
@lock l expr

lock(f, l::AbstractLock)のマクロ版ですが、f関数の代わりにexprを使用します。次のように展開されます:

lock(l)
try
    expr
finally
    unlock(l)
end

これは、doブロックを使用したlockと似ていますが、クロージャを作成せず、したがってパフォーマンスを向上させることができます。

Compat

@lockはJulia 1.3で追加され、Julia 1.10でエクスポートされました。

source
Base.LockableType
Lockable(value, lock = ReentrantLock())

valueをラップし、提供されたlockに関連付けられたLockableオブジェクトを作成します。このオブジェクトは@locklocktrylockunlockをサポートします。値にアクセスするには、ロックを保持しながらロック可能オブジェクトをインデックスします。

Julia 1.11

Julia 1.11以上が必要です。

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # 値にアクセスするにはロックを保持する必要があります
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

T型の最大sizeオブジェクトを保持できる内部バッファを持つChannelを構築します。 put!が満杯のチャネルで呼び出されると、take!でオブジェクトが削除されるまでブロックします。

Channel(0)はバッファなしのチャネルを構築します。put!は対応するtake!が呼ばれるまでブロックします。そしてその逆も同様です。

他のコンストラクタ:

  • Channel(): デフォルトコンストラクタで、Channel{Any}(0)と同等です。
  • Channel(Inf): Channel{Any}(typemax(Int))と同等です。
  • Channel(sz): Channel{Any}(sz)と同等です。
Julia 1.3

デフォルトコンストラクタChannel()とデフォルトsize=0はJulia 1.3で追加されました。

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

funcから新しいタスクを作成し、型Tとサイズsizeの新しいチャネルにbindし、タスクをスケジュールします。すべてが1回の呼び出しで行われます。タスクが終了すると、チャネルは自動的に閉じられます。

funcは、バウンドチャネルを唯一の引数として受け取る必要があります。

作成されたタスクへの参照が必要な場合は、キーワード引数taskrefを介してRef{Task}オブジェクトを渡してください。

spawn=trueの場合、funcのために作成されたTaskは、Threads.@spawnを介してタスクを作成するのと同等に、別のスレッドで並行してスケジュールされる可能性があります。

spawn=truethreadpool引数が設定されていない場合、デフォルトは:defaultです。

threadpool引数が設定されている場合(:defaultまたは:interactiveに)、これはspawn=trueを意味し、新しいタスクは指定されたスレッドプールにスパーンされます。

Channelを返します。

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

作成されたタスクを参照する:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

spawn=パラメータはJulia 1.3で追加されました。このコンストラクタはJulia 1.3で追加されました。以前のバージョンのJuliaでは、Channelはキーワード引数を使用してsizeTを設定しましたが、それらのコンストラクタは非推奨です。

Julia 1.9

threadpool=引数はJulia 1.9で追加されました。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end;

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

チャネル c にアイテム v を追加します。チャネルが満杯の場合はブロックします。

バッファなしのチャネルの場合、別のタスクによって take! が実行されるまでブロックします。

Julia 1.1

v は、put! が呼び出されるときに convert を使ってチャネルの型に変換されます。

source
Base.take!Method
take!(c::Channel)

Channel から値を順番に削除して返します。データが利用可能になるまでブロックします。バッファなしのチャネルの場合、別のタスクによって put! が実行されるまでブロックします。

バッファ付きチャネル:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

バッファなしチャネル:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isfullMethod
isfull(c::Channel)

Channel が満杯かどうかを判断します。つまり、put!(c, some_value) を呼び出すとブロックされることになります。即座に返し、ブロックしません。

このメソッドが true を返した後、put! がブロックされないことが頻繁にあることに注意してください。ユーザーは、このメソッドを呼び出すことでコードにライブロックバグを誤って作成しないように注意する必要があります。これらは一般的にデッドロックよりもデバッグが難しいです。また、複数のプロデューサタスクが並行して put! を呼び出している場合、この呼び出しが false を返した後に put! がブロックされる可能性もあります。

バッファ付きチャネル:

julia> c = Channel(1); # capacity = 1

julia> isfull(c)
false

julia> put!(c, 1);

julia> isfull(c)
true

バッファなしチャネル:

julia> c = Channel(); # capacity = 0

julia> isfull(c) # バッファなしチャネルは常に満杯
true
source
Base.isreadyMethod
isready(c::Channel)

Channel に値が格納されているかどうかを判断します。即座に返され、ブロックしません。

バッファなしのチャネルの場合、put! を待機しているタスクがある場合は true を返します。

バッファ付きチャネル:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

バッファなしチャネル:

julia> c = Channel();

julia> isready(c)  # 値を入れるために待機しているタスクはありません!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # put! タスクをスケジュールする

julia> isready(c)
true
source
Base.isopenMethod
isopen(c::Channel)

新しい put! 操作のために Channel が開いているかどうかを判断します。Channel は閉じていても、take! で消費できるバッファされた要素を持っている可能性があることに注意してください。

タスクを持つバッファ付きチャネル:

julia> c = Channel(ch -> put!(ch, 1), 1);

julia> isopen(c) # チャネルは新しい `put!` に対して閉じています
false

julia> isready(c) # チャネルは閉じていますが、まだ要素を含んでいます
true

julia> take!(c)
1

julia> isready(c)
false

バッファなしチャネル:

julia> c = Channel{Int}();

julia> isopen(c)
true

julia> close(c)

julia> isopen(c)
false
source
Base.fetchMethod
fetch(c::Channel)

Channelから最初に利用可能なアイテムを待機して返します(削除せずに)。注意: fetchはバッファなし(サイズ0)のChannelではサポートされていません。

バッファ付きチャネル:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # アイテムは削除されません
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

チャネルを閉じます。例外(オプションでexcpによって指定される)は、次のようにスローされます:

  • put! が閉じたチャネルで呼び出されたとき。
  • take! および fetch が空の閉じたチャネルで呼び出されたとき。
source
Base.bindMethod
bind(chnl::Channel, task::Task)

chnlのライフタイムをタスクに関連付けます。タスクが終了すると、Channel chnlは自動的に閉じられます。タスク内で捕捉されない例外は、chnlのすべての待機者に伝播されます。

chnlオブジェクトは、タスクの終了とは独立して明示的に閉じることができます。終了したタスクは、すでに閉じられたChannelオブジェクトには影響を与えません。

チャネルが複数のタスクにバインドされている場合、最初に終了したタスクがチャネルを閉じます。同じタスクに複数のチャネルがバインドされている場合、タスクの終了はすべてのバインドされたチャネルを閉じます。

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

scheduleの最も簡単で正しい使用法は、まだ開始されていない(スケジュールされた)Task上で行うことです。しかし、4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566waitを、同期インターフェースを構築するための非常に低レベルのビルディングブロックとして使用することも可能です。schedule(task)を呼び出すための重要な前提条件は、呼び出し元がtaskを「所有」している必要があることです。つまり、schedule(task)を呼び出すコードが、指定されたtask内でwaitが発生する場所を知っている必要があります。このような前提条件を確保するための戦略の一つは、以下の例で示されているようにアトミックを使用することです。

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
        # `wait()` immediately. In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    Threads.@spawn begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEventは、1つのタスクが別のタスクのnotifywaitすることを可能にします。これは、waitが単一のタスクから1回しか使用できないため、制限された通信インターフェースです(ev.taskの非原子的な割り当てに注意してください)。

この例では、notify(ev::OneWayEvent)は、それが状態をOWE_WAITINGからOWE_NOTIFYINGに変更する場合に限り、schedule(ev.task)を呼び出すことが許可されています。これにより、wait(ev::OneWayEvent)を実行しているタスクが現在okブランチにあり、他のタスクがschedule(ev.task)を試みることはできないことがわかります。なぜなら、それらの@atomicreplace(ev.state, state => OWE_NOTIFYING)は失敗するからです。