Tasks
Core.Task — TypeTask(func)创建一个 Task(即协程)来执行给定的函数 func(该函数必须可以无参数调用)。当此函数返回时,任务将退出。任务将在构造时从父级的“世界年龄”中运行,当 scheduled。
默认情况下,任务的粘性位将设置为 true t.sticky。这模拟了 @async 的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时将使它们从中调度的任务变为粘性。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false。
示例
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);在这个例子中,b 是一个可运行的 Task,但尚未开始。
Base.@task — Macro@task将一个表达式包装在一个 Task 中而不执行它,并返回该 Task。这仅仅创建一个任务,而不运行它。
默认情况下,任务的粘性位将被设置为 true t.sticky。这模拟了 @async 的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时会使它们从中调度的任务变为粘性。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false。
示例
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.@async — Macro@async将一个表达式包装在一个 Task 中,并将其添加到本地机器的调度器队列中。
值可以通过 $ 插入到 @async 中,这会将值直接复制到构造的底层闭包中。这允许你插入变量的 值,将异步代码与当前任务中变量值的变化隔离开来。
强烈建议始终优先使用 Threads.@spawn 而不是 @async 即使不需要并行性,特别是在公开分发的库中。这是因为在当前的 Julia 实现中,使用 @async 会禁用 父 任务在工作线程之间的迁移。因此,在库函数中看似无害的 @async 使用可能会对用户应用程序的非常不同部分的性能产生重大影响。
从 Julia 1.4 开始,可以通过 $ 插入值。
Base.asyncmap — Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)使用多个并发任务对集合(或多个相同长度的集合)进行 f 的映射。对于多个集合参数,f 是逐元素应用的。
ntasks 指定要并发运行的任务数量。根据集合的长度,如果未指定 ntasks,则最多将使用 100 个任务进行并发映射。
ntasks 也可以指定为一个无参数的函数。在这种情况下,在处理每个元素之前会检查并行运行的任务数量,如果 ntasks_func 的值大于当前任务数量,则会启动一个新任务。
如果指定了 batch_size,则集合将以批处理模式进行处理。此时,f 必须是一个接受参数元组的 Vector 的函数,并且必须返回一个结果的向量。输入向量的长度将为 batch_size 或更少。
以下示例通过返回映射函数执行的任务的 objectid 来突出不同任务中的执行。
首先,在 ntasks 未定义的情况下,每个元素在不同的任务中处理。
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5当 ntasks=2 时,所有元素在 2 个任务中处理。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2定义 batch_size 后,映射函数需要更改为接受参数元组的数组并返回结果的数组。修改后的映射函数中使用 map 来实现这一点。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"Base.asyncmap! — Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)像 asyncmap 一样,但将输出存储在 results 中,而不是返回一个集合。
当任何被修改的参数与其他参数共享内存时,行为可能会出乎意料。
Base.current_task — Functioncurrent_task()获取当前正在运行的 Task。
Base.istaskdone — Functionistaskdone(t::Task) -> Bool确定任务是否已退出。
示例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
trueBase.istaskstarted — Functionistaskstarted(t::Task) -> Bool确定任务是否已开始执行。
示例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
falseBase.istaskfailed — Functionistaskfailed(t::Task) -> Bool确定一个任务是否因为抛出异常而退出。
示例
julia> a4() = error("任务失败");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true此函数至少需要 Julia 1.3。
Base.task_local_storage — Methodtask_local_storage(key)在当前任务的任务本地存储中查找键的值。
Base.task_local_storage — Methodtask_local_storage(key, value)将一个值分配给当前任务的任务局部存储中的一个键。
Base.task_local_storage — Methodtask_local_storage(body, key, value)调用函数 body,使用修改后的任务局部存储,其中 value 被分配给 key;key 的先前值(或缺失)在之后恢复。这对于模拟动态作用域非常有用。
Scheduling
Base.yield — Functionyield()切换到调度器以允许另一个已调度的任务运行。调用此函数的任务仍然可以运行,如果没有其他可运行的任务,它将立即重新启动。
yield(t::Task, arg = nothing)一个快速的、不公平调度的 schedule(t, arg); yield() 版本,它在调用调度器之前立即让出控制权给 t。
Base.yieldto — Functionyieldto(t::Task, arg = nothing)切换到给定的任务。第一次切换到一个任务时,任务的函数会被调用而不带参数。在后续的切换中,arg 是从任务上一次调用 yieldto 返回的。这是一个低级调用,仅切换任务,不考虑状态或调度。建议避免使用。
Base.sleep — Functionsleep(seconds)阻塞当前任务指定的秒数。最小睡眠时间为1毫秒或输入0.001。
Base.schedule — Functionschedule(t::Task, [val]; error=false)将一个 Task 添加到调度器的队列中。这会导致任务在系统其他空闲时不断运行,除非该任务执行了阻塞操作,例如 wait。
如果提供了第二个参数 val,它将在任务再次运行时通过 yieldto 的返回值传递给任务。如果 error 为 true,则该值将在被唤醒的任务中作为异常抛出。
在已经启动的任意 Task 上使用 schedule 是不正确的。有关更多信息,请参见 API 参考。
默认情况下,任务的粘性位将设置为 true t.sticky。这模拟了 @async 的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时会使它们被调度的任务变为粘性。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false。
示例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
trueSynchronization
Base.errormonitor — Functionerrormonitor(t::Task)如果任务 t 失败,则将错误日志打印到 stderr。
示例
julia> Base._wait(errormonitor(Threads.@spawn error("任务失败")))
未处理的任务错误:任务失败
堆栈跟踪:
[...]Base.@sync — Macro@sync等待所有词法上封闭的 @async、@spawn、Distributed.@spawnat 和 Distributed.@distributed 的使用完成。所有由封闭的异步操作抛出的异常都会被收集并作为 CompositeException 抛出。
示例
julia> Threads.nthreads()
4
julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2Base.wait — Function关于 Threads.Condition 的特别说明:
调用者必须持有拥有 Threads.Condition 的 lock,才能调用此方法。调用任务将被阻塞,直到其他任务唤醒它,通常是通过在同一 Threads.Condition 对象上调用 notify。在阻塞时,锁将被原子释放(即使它是递归锁定的),并将在返回之前重新获取。
wait(r::Future)等待指定的 Future 可用的值。
wait(r::RemoteChannel, args...)等待在指定的 RemoteChannel 上可用的值。
wait([x])阻塞当前任务,直到某个事件发生,具体取决于参数的类型:
Channel:等待一个值被附加到通道中。Condition:等待条件上的notify并返回传递给notify的val参数。等待条件时,还可以传递first=true,这会使等待者在notify时被放在第一位,而不是通常的先进先出行为。Process:等待一个进程或进程链退出。可以使用进程的exitcode字段来确定成功或失败。Task:等待一个Task完成。如果任务因异常失败,将抛出TaskFailedException(它包装了失败的任务)。RawFD:等待文件描述符上的变化(参见FileWatching包)。
如果没有传递参数,任务将阻塞一个未定义的时间。任务只能通过显式调用schedule或yieldto来重新启动。
通常在while循环中调用wait以确保在继续之前满足等待的条件。
wait(c::Channel)阻塞直到 Channel isready。
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task)  # 任务被阻塞,因为通道未准备好
false
julia> put!(c, 1);
julia> istaskdone(task)  # 任务现在已解除阻塞
trueBase.fetch — Methodfetch(t::Task)等待一个 Task 完成,然后返回其结果值。如果任务因异常而失败,将抛出一个 TaskFailedException(它包装了失败的任务)。
Base.fetch — Methodfetch(x::Any)返回 x。
Base.timedwait — Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)等待直到 testcb() 返回 true 或者超时 timeout 秒,以先到者为准。测试函数每 pollint 秒被轮询一次。pollint 的最小值为 0.001 秒,即 1 毫秒。
返回 :ok 或 :timed_out。
示例
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:okBase.Condition — TypeCondition()创建一个边缘触发的事件源,任务可以在其上等待。调用 wait 的 Condition 上的任务会被挂起并排队。当稍后在 Condition 上调用 notify 时,任务会被唤醒。等待条件时,如果使用了 notify 的可选参数,可以返回一个值或引发错误。边缘触发意味着只有在调用 notify 时等待的任务才能被唤醒。对于水平触发的通知,您必须保持额外的状态以跟踪通知是否发生。Channel 和 Threads.Event 类型可以做到这一点,并可用于水平触发事件。
此对象不是线程安全的。有关线程安全版本,请参见 Threads.Condition。
Base.Threads.Condition — TypeThreads.Condition([lock])一个线程安全的 Base.Condition 版本。
要在 Threads.Condition 上调用 wait 或 notify,您必须首先在其上调用 lock。当调用 wait 时,锁在阻塞期间会被原子释放,并将在 wait 返回之前重新获取。因此,Threads.Condition c 的惯用用法如下所示:
lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end此功能至少需要 Julia 1.2。
Base.Event — TypeEvent([autoreset=false])创建一个级别触发的事件源。调用 wait 的任务在 Event 上被挂起并排队,直到在 Event 上调用 notify。在调用 notify 后,Event 保持在已信号状态,任务在等待它时将不再阻塞,直到调用 reset。
如果 autoreset 为真,则每次调用 notify 时,最多会释放一个任务从 wait 中。
这提供了在 notify/wait 上的获取和释放内存顺序。
此功能至少需要 Julia 1.1。
autoreset 功能和内存顺序保证至少需要 Julia 1.8。
Base.notify — Functionnotify(condition, val=nothing; all=true, error=false)唤醒等待条件的任务,并传递给它们 val。如果 all 为 true(默认值),则唤醒所有等待的任务,否则只唤醒一个。如果 error 为 true,则在被唤醒的任务中将传递的值作为异常抛出。
返回唤醒的任务数量。如果没有任务在 condition 上等待,则返回 0。
Base.reset — MethodBase.Semaphore — TypeSemaphore(sem_size)创建一个计数信号量,允许最多 sem_size 次获取在任何时候使用。每次获取必须与一次释放相匹配。
这为获取/释放调用提供了获取和释放的内存顺序。
Base.acquire — Functionacquire(s::Semaphore)等待可用的 sem_size 许可之一,阻塞直到可以获取一个。
acquire(f, s::Semaphore)在从信号量 s 获取后执行 f,并在完成或出错时 release。
例如,确保同时只有 2 次 foo 调用处于活动状态的 do-block 形式:
s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end此方法至少需要 Julia 1.8。
Base.release — Functionrelease(s::Semaphore)将一个许可返回到池中,可能允许另一个任务获取它并恢复执行。
Base.AbstractLock — TypeBase.lock — Functionlock(f::Function, lock)获取 lock,在持有 lock 的情况下执行 f,并在 f 返回时释放 lock。如果 lock 已被其他任务/线程锁定,则等待其变为可用。
当此函数返回时,lock 已被释放,因此调用者不应尝试 unlock 它。
另见: @lock。
将 Channel 作为第二个参数需要 Julia 1.7 或更高版本。
lock(f::Function, l::Lockable)
获取与 l 相关联的锁,执行持锁的 f,并在 f 返回时释放锁。f 将接收一个位置参数:被 l 包裹的值。如果锁已经被其他任务/线程锁定,则等待其变为可用。当此函数返回时,lock 已被释放,因此调用者不应尝试 unlock 它。
至少需要 Julia 1.11。
Base.unlock — Functionunlock(lock)释放对 lock 的拥有权。
如果这是一个之前已被获取的递归锁,则递减内部计数器并立即返回。
Base.trylock — Functiontrylock(lock) -> Success (Boolean)如果锁可用,则获取锁,并在成功时返回 true。如果锁已被其他任务/线程锁定,则返回 false。
每个成功的 trylock 必须与 unlock 匹配。
函数 trylock 结合 islocked 可用于编写测试-测试-设置或指数退避算法 如果 typeof(lock) 支持此功能(请阅读其文档)。
Base.islocked — Functionislocked(lock) -> 状态 (布尔值)检查 lock 是否被任何任务/线程持有。此函数本身不应用于同步。然而,islocked 结合 trylock 可以用于编写测试-测试-设置或指数退避算法 如果 typeof(lock) 支持的话(请阅读其文档)。
扩展帮助
例如,如果 lock 的实现满足下面文档中记录的属性,可以实现指数退避,如下所示。
nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("超时")
    end
    trylock(lock) && break
    backoff()
end实现
建议锁的实现定义 islocked,并在其文档字符串中注明以下属性。
islocked(lock)是无数据竞争的。- 如果 
islocked(lock)返回false,则在没有其他任务干扰的情况下,立即调用trylock(lock)必须成功(返回true)。 
Base.ReentrantLock — TypeReentrantLock()创建一个可重入锁用于同步 Tasks。相同的任务可以根据需要多次获取锁(这就是名称中“可重入”部分的意思)。每个 lock 必须与一个 unlock 匹配。
调用 lock 还会抑制该线程上终结器的运行,直到相应的 unlock。下面所示的标准锁模式的使用应该自然得到支持,但要小心反转尝试/锁定顺序或完全遗漏尝试块(例如,尝试在仍持有锁的情况下返回):
这提供了锁/解锁调用的获取/释放内存顺序。
lock(l)
try
    <atomic work>
finally
    unlock(l)
end如果 !islocked(lck::ReentrantLock) 为真,则 trylock(lck) 成功,除非有其他任务试图“同时”持有锁。
Base.@lock — Macro@lock l exprlock(f, l::AbstractLock) 的宏版本,但使用 expr 而不是 f 函数。展开为:
lock(l)
try
    expr
finally
    unlock(l)
end这类似于使用 lock 和 do 块,但避免了创建闭包,从而可以提高性能。
@lock 在 Julia 1.3 中添加,并在 Julia 1.10 中导出。
Base.Lockable — TypeLockable(value, lock = ReentrantLock())
创建一个 Lockable 对象,该对象包装 value 并将其与提供的 lock 关联。此对象支持 @lock、lock、trylock、unlock。要访问值,必须在持有锁的情况下索引可锁对象。
至少需要 Julia 1.11。
示例
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # 必须持有锁才能访问值
1-element Vector{Int64}:
 1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"Channels
Base.AbstractChannel — TypeAbstractChannel{T}表示传递类型为 T 的对象的通道。
Base.Channel — TypeChannel{T=Any}(size::Int=0)构造一个 Channel,其内部缓冲区最多可以容纳 size 个类型为 T 的对象。对满的通道调用 put! 会阻塞,直到通过 take! 移除一个对象。
Channel(0) 构造一个无缓冲通道。put! 会阻塞,直到调用匹配的 take!。反之亦然。
其他构造函数:
Channel(): 默认构造函数,相当于Channel{Any}(0)Channel(Inf): 相当于Channel{Any}(typemax(Int))Channel(sz): 相当于Channel{Any}(sz)
默认构造函数 Channel() 和默认 size=0 是在 Julia 1.3 中添加的。
Base.Channel — MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)从 func 创建一个新任务,将其绑定到类型为 T 和大小为 size 的新通道,并在一次调用中调度该任务。当任务终止时,通道会自动关闭。
func 必须将绑定的通道作为其唯一参数。
如果您需要对创建的任务的引用,请通过关键字参数 taskref 传递一个 Ref{Task} 对象。
如果 spawn=true,为 func 创建的 Task 可能会在另一个线程上并行调度,相当于通过 Threads.@spawn 创建任务。
如果 spawn=true 且未设置 threadpool 参数,则默认为 :default。
如果设置了 threadpool 参数(为 :default 或 :interactive),这意味着 spawn=true,并且新的任务被分配到指定的线程池。
返回一个 Channel。
示例
julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4引用创建的任务:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
truespawn= 参数是在 Julia 1.3 中添加的。此构造函数是在 Julia 1.3 中添加的。在早期版本的 Julia 中,Channel 使用关键字参数来设置 size 和 T,但这些构造函数已被弃用。
threadpool= 参数是在 Julia 1.9 中添加的。
julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"Base.put! — Methodput!(c::Channel, v)将项 v 添加到通道 c。如果通道已满,则会阻塞。
对于无缓冲通道,直到其他任务执行 take! 时才会阻塞。
当调用 put! 时,v 现在会通过 convert 转换为通道的类型。
Base.take! — Methodtake!(c::Channel)从 Channel 中按顺序移除并返回一个值。阻塞直到数据可用。对于无缓冲通道,阻塞直到其他任务执行 put!。
示例
缓冲通道:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1无缓冲通道:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1Base.isready — Methodisready(c::Channel)确定一个 Channel 是否有值存储在其中。立即返回,不会阻塞。
对于无缓冲通道,如果有任务在等待 put!,则返回 true。
示例
缓冲通道:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true无缓冲通道:
julia> c = Channel();
julia> isready(c)  # 没有任务在等待放入!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);  # 安排一个 put! 任务
julia> isready(c)
trueBase.fetch — Methodfetch(c::Channel)等待并返回(不移除)Channel 中第一个可用的项目。注意:fetch 在无缓冲(0大小)Channel 上不受支持。
示例
缓冲通道:
julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;
julia> fetch(c)
1
julia> collect(c)  # 项目未被移除
3-element Vector{Any}:
 1
 2
 3Base.close — Methodclose(c::Channel[, excp::Exception])关闭一个通道。一个异常(可选地由 excp 给出)会在以下情况下抛出:
Base.bind — Methodbind(chnl::Channel, task::Task)将 chnl 的生命周期与一个任务关联。当任务终止时,Channel chnl 会自动关闭。任务中未捕获的任何异常会传播给所有在 chnl 上等待的任务。
chnl 对象可以在任务终止时显式关闭。终止的任务对已经关闭的 Channel 对象没有影响。
当一个通道绑定到多个任务时,第一个终止的任务将关闭该通道。当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。
示例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
falsejulia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]Low-level synchronization using schedule and wait
schedule 的最简单正确用法是在尚未开始(已调度)的 Task 上。然而,可以将 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 和 wait 作为构建同步接口的非常低级的构建块。 调用 schedule(task) 的一个关键前提是调用者必须“拥有”该 task;即,它必须知道在调用 schedule(task) 的代码已知的位置中,给定 task 中的 wait 调用正在发生。 确保此前提的一种策略是使用原子操作,如以下示例所示:
@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end
mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end
function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end
ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end
# output
notifying...
doneOneWayEvent 允许一个任务 wait 另一个任务的 notify。这是一个有限的通信接口,因为 wait 只能从单个任务中使用一次(注意 ev.task 的非原子赋值)。
在这个例子中,notify(ev::OneWayEvent) 仅在 它 将状态从 OWE_WAITING 修改为 OWE_NOTIFYING 时被允许调用 schedule(ev.task)。这让我们知道执行 wait(ev::OneWayEvent) 的任务现在处于 ok 分支,并且不会有其他任务尝试 schedule(ev.task),因为它们的 @atomicreplace(ev.state, state => OWE_NOTIFYING) 将会失败。