Tasks

Core.TaskType
Task(func)

创建一个 Task(即协程)来执行给定的函数 func(该函数必须可以无参数调用)。当此函数返回时,任务将退出。任务将在构造时从父级的“世界年龄”中运行,当 scheduled。

Warning

默认情况下,任务的粘性位将设置为 true t.sticky。这模拟了 @async 的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时将使它们从中调度的任务变为粘性。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false

示例

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

在这个例子中,b 是一个可运行的 Task,但尚未开始。

source
Base.@taskMacro
@task

将一个表达式包装在一个 Task 中而不执行它,并返回该 Task。这仅仅创建一个任务,而不运行它。

Warning

默认情况下,任务的粘性位将被设置为 true t.sticky。这模拟了 @async 的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时会使它们从中调度的任务变为粘性。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false

示例

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

将一个表达式包装在一个 Task 中,并将其添加到本地机器的调度器队列中。

值可以通过 $ 插入到 @async 中,这会将值直接复制到构造的底层闭包中。这允许你插入变量的 ,将异步代码与当前任务中变量值的变化隔离开来。

Warning

强烈建议始终优先使用 Threads.@spawn 而不是 @async 即使不需要并行性,特别是在公开分发的库中。这是因为在当前的 Julia 实现中,使用 @async 会禁用 任务在工作线程之间的迁移。因此,在库函数中看似无害的 @async 使用可能会对用户应用程序的非常不同部分的性能产生重大影响。

Julia 1.4

从 Julia 1.4 开始,可以通过 $ 插入值。

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

使用多个并发任务对集合(或多个相同长度的集合)进行 f 的映射。对于多个集合参数,f 是逐元素应用的。

ntasks 指定要并发运行的任务数量。根据集合的长度,如果未指定 ntasks,则最多将使用 100 个任务进行并发映射。

ntasks 也可以指定为一个无参数的函数。在这种情况下,在处理每个元素之前会检查并行运行的任务数量,如果 ntasks_func 的值大于当前任务数量,则会启动一个新任务。

如果指定了 batch_size,则集合将以批处理模式进行处理。此时,f 必须是一个接受参数元组的 Vector 的函数,并且必须返回一个结果的向量。输入向量的长度将为 batch_size 或更少。

以下示例通过返回映射函数执行的任务的 objectid 来突出不同任务中的执行。

首先,在 ntasks 未定义的情况下,每个元素在不同的任务中处理。

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

ntasks=2 时,所有元素在 2 个任务中处理。

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

定义 batch_size 后,映射函数需要更改为接受参数元组的数组并返回结果的数组。修改后的映射函数中使用 map 来实现这一点。

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

asyncmap 一样,但将输出存储在 results 中,而不是返回一个集合。

Warning

当任何被修改的参数与其他参数共享内存时,行为可能会出乎意料。

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

确定任务是否已退出。

示例

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

确定任务是否已开始执行。

示例

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

确定一个任务是否因为抛出异常而退出。

示例

julia> a4() = error("任务失败");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

此函数至少需要 Julia 1.3。

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

调用函数 body,使用修改后的任务局部存储,其中 value 被分配给 keykey 的先前值(或缺失)在之后恢复。这对于模拟动态作用域非常有用。

source

Scheduling

Base.yieldFunction
yield()

切换到调度器以允许另一个已调度的任务运行。调用此函数的任务仍然可以运行,如果没有其他可运行的任务,它将立即重新启动。

source
yield(t::Task, arg = nothing)

一个快速的、不公平调度的 schedule(t, arg); yield() 版本,它在调用调度器之前立即让出控制权给 t

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

切换到给定的任务。第一次切换到一个任务时,任务的函数会被调用而不带参数。在后续的切换中,arg 是从任务上一次调用 yieldto 返回的。这是一个低级调用,仅切换任务,不考虑状态或调度。建议避免使用。

source
Base.sleepFunction
sleep(seconds)

阻塞当前任务指定的秒数。最小睡眠时间为1毫秒或输入0.001

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

将一个 Task 添加到调度器的队列中。这会导致任务在系统其他空闲时不断运行,除非该任务执行了阻塞操作,例如 wait

如果提供了第二个参数 val,它将在任务再次运行时通过 yieldto 的返回值传递给任务。如果 errortrue,则该值将在被唤醒的任务中作为异常抛出。

Warning

在已经启动的任意 Task 上使用 schedule 是不正确的。有关更多信息,请参见 API 参考

Warning

默认情况下,任务的粘性位将设置为 true t.sticky。这模拟了 @async 的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时会使它们被调度的任务变为粘性。要获得 Threads.@spawn 的行为,请手动将粘性位设置为 false

示例

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

如果任务 t 失败,则将错误日志打印到 stderr

示例

julia> Base._wait(errormonitor(Threads.@spawn error("任务失败")))
未处理的任务错误:任务失败
堆栈跟踪:
[...]
source
Base.@syncMacro
@sync

等待所有词法上封闭的 @async@spawnDistributed.@spawnatDistributed.@distributed 的使用完成。所有由封闭的异步操作抛出的异常都会被收集并作为 CompositeException 抛出。

示例

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

关于 Threads.Condition 的特别说明:

调用者必须持有拥有 Threads.Conditionlock,才能调用此方法。调用任务将被阻塞,直到其他任务唤醒它,通常是通过在同一 Threads.Condition 对象上调用 notify。在阻塞时,锁将被原子释放(即使它是递归锁定的),并将在返回之前重新获取。

source
wait(r::Future)

等待指定的 Future 可用的值。

source
wait(r::RemoteChannel, args...)

等待在指定的 RemoteChannel 上可用的值。

source
wait([x])

阻塞当前任务,直到某个事件发生,具体取决于参数的类型:

  • Channel:等待一个值被附加到通道中。
  • Condition:等待条件上的notify并返回传递给notifyval参数。等待条件时,还可以传递first=true,这会使等待者在notify时被放在第一位,而不是通常的先进先出行为。
  • Process:等待一个进程或进程链退出。可以使用进程的exitcode字段来确定成功或失败。
  • Task:等待一个Task完成。如果任务因异常失败,将抛出TaskFailedException(它包装了失败的任务)。
  • RawFD:等待文件描述符上的变化(参见FileWatching包)。

如果没有传递参数,任务将阻塞一个未定义的时间。任务只能通过显式调用scheduleyieldto来重新启动。

通常在while循环中调用wait以确保在继续之前满足等待的条件。

source
wait(c::Channel)

阻塞直到 Channel isready

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # 任务被阻塞,因为通道未准备好
false

julia> put!(c, 1);

julia> istaskdone(task)  # 任务现在已解除阻塞
true
source
Base.fetchMethod
fetch(t::Task)

等待一个 Task 完成,然后返回其结果值。如果任务因异常而失败,将抛出一个 TaskFailedException(它包装了失败的任务)。

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

等待直到 testcb() 返回 true 或者超时 timeout 秒,以先到者为准。测试函数每 pollint 秒被轮询一次。pollint 的最小值为 0.001 秒,即 1 毫秒。

返回 :ok:timed_out

示例

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

创建一个边缘触发的事件源,任务可以在其上等待。调用 waitCondition 上的任务会被挂起并排队。当稍后在 Condition 上调用 notify 时,任务会被唤醒。等待条件时,如果使用了 notify 的可选参数,可以返回一个值或引发错误。边缘触发意味着只有在调用 notify 时等待的任务才能被唤醒。对于水平触发的通知,您必须保持额外的状态以跟踪通知是否发生。ChannelThreads.Event 类型可以做到这一点,并可用于水平触发事件。

此对象不是线程安全的。有关线程安全版本,请参见 Threads.Condition

source
Base.Threads.ConditionType
Threads.Condition([lock])

一个线程安全的 Base.Condition 版本。

要在 Threads.Condition 上调用 waitnotify,您必须首先在其上调用 lock。当调用 wait 时,锁在阻塞期间会被原子释放,并将在 wait 返回之前重新获取。因此,Threads.Condition c 的惯用用法如下所示:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

此功能至少需要 Julia 1.2。

source
Base.EventType
Event([autoreset=false])

创建一个级别触发的事件源。调用 wait 的任务在 Event 上被挂起并排队,直到在 Event 上调用 notify。在调用 notify 后,Event 保持在已信号状态,任务在等待它时将不再阻塞,直到调用 reset

如果 autoreset 为真,则每次调用 notify 时,最多会释放一个任务从 wait 中。

这提供了在 notify/wait 上的获取和释放内存顺序。

Julia 1.1

此功能至少需要 Julia 1.1。

Julia 1.8

autoreset 功能和内存顺序保证至少需要 Julia 1.8。

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

唤醒等待条件的任务,并传递给它们 val。如果 alltrue(默认值),则唤醒所有等待的任务,否则只唤醒一个。如果 errortrue,则在被唤醒的任务中将传递的值作为异常抛出。

返回唤醒的任务数量。如果没有任务在 condition 上等待,则返回 0。

source
Base.resetMethod
reset(::Event)

Event重置为未设置状态。然后,任何未来对wait的调用将会阻塞,直到再次调用notify

source
Base.SemaphoreType
Semaphore(sem_size)

创建一个计数信号量,允许最多 sem_size 次获取在任何时候使用。每次获取必须与一次释放相匹配。

这为获取/释放调用提供了获取和释放的内存顺序。

source
Base.acquireFunction
acquire(s::Semaphore)

等待可用的 sem_size 许可之一,阻塞直到可以获取一个。

source
acquire(f, s::Semaphore)

在从信号量 s 获取后执行 f,并在完成或出错时 release

例如,确保同时只有 2 次 foo 调用处于活动状态的 do-block 形式:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

此方法至少需要 Julia 1.8。

source
Base.releaseFunction
release(s::Semaphore)

将一个许可返回到池中,可能允许另一个任务获取它并恢复执行。

source
Base.lockFunction
lock(lock)

lock 可用时获取它。如果锁已经被其他任务/线程锁定,则等待它变得可用。

每个 lock 必须与一个 unlock 匹配。

source
lock(f::Function, lock)

获取 lock,在持有 lock 的情况下执行 f,并在 f 返回时释放 lock。如果 lock 已被其他任务/线程锁定,则等待其变为可用。

当此函数返回时,lock 已被释放,因此调用者不应尝试 unlock 它。

另见: @lock

Julia 1.7

Channel 作为第二个参数需要 Julia 1.7 或更高版本。

source

lock(f::Function, l::Lockable)

获取与 l 相关联的锁,执行持锁的 f,并在 f 返回时释放锁。f 将接收一个位置参数:被 l 包裹的值。如果锁已经被其他任务/线程锁定,则等待其变为可用。当此函数返回时,lock 已被释放,因此调用者不应尝试 unlock 它。

Julia 1.11

至少需要 Julia 1.11。

source
Base.unlockFunction
unlock(lock)

释放对 lock 的拥有权。

如果这是一个之前已被获取的递归锁,则递减内部计数器并立即返回。

source
Base.trylockFunction
trylock(lock) -> Success (Boolean)

如果锁可用,则获取锁,并在成功时返回 true。如果锁已被其他任务/线程锁定,则返回 false

每个成功的 trylock 必须与 unlock 匹配。

函数 trylock 结合 islocked 可用于编写测试-测试-设置或指数退避算法 如果 typeof(lock) 支持此功能(请阅读其文档)。

source
Base.islockedFunction
islocked(lock) -> 状态 (布尔值)

检查 lock 是否被任何任务/线程持有。此函数本身不应用于同步。然而,islocked 结合 trylock 可以用于编写测试-测试-设置或指数退避算法 如果 typeof(lock) 支持的话(请阅读其文档)。

扩展帮助

例如,如果 lock 的实现满足下面文档中记录的属性,可以实现指数退避,如下所示。

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("超时")
    end
    trylock(lock) && break
    backoff()
end

实现

建议锁的实现定义 islocked,并在其文档字符串中注明以下属性。

  • islocked(lock) 是无数据竞争的。
  • 如果 islocked(lock) 返回 false,则在没有其他任务干扰的情况下,立即调用 trylock(lock) 必须成功(返回 true)。
source
Base.ReentrantLockType
ReentrantLock()

创建一个可重入锁用于同步 Tasks。相同的任务可以根据需要多次获取锁(这就是名称中“可重入”部分的意思)。每个 lock 必须与一个 unlock 匹配。

调用 lock 还会抑制该线程上终结器的运行,直到相应的 unlock。下面所示的标准锁模式的使用应该自然得到支持,但要小心反转尝试/锁定顺序或完全遗漏尝试块(例如,尝试在仍持有锁的情况下返回):

这提供了锁/解锁调用的获取/释放内存顺序。

lock(l)
try
    <atomic work>
finally
    unlock(l)
end

如果 !islocked(lck::ReentrantLock) 为真,则 trylock(lck) 成功,除非有其他任务试图“同时”持有锁。

source
Base.@lockMacro
@lock l expr

lock(f, l::AbstractLock) 的宏版本,但使用 expr 而不是 f 函数。展开为:

lock(l)
try
    expr
finally
    unlock(l)
end

这类似于使用 lockdo 块,但避免了创建闭包,从而可以提高性能。

Compat

@lock 在 Julia 1.3 中添加,并在 Julia 1.10 中导出。

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

创建一个 Lockable 对象,该对象包装 value 并将其与提供的 lock 关联。此对象支持 @locklocktrylockunlock。要访问值,必须在持有锁的情况下索引可锁对象。

Julia 1.11

至少需要 Julia 1.11。

示例

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # 必须持有锁才能访问值
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.ChannelType
Channel{T=Any}(size::Int=0)

构造一个 Channel,其内部缓冲区最多可以容纳 size 个类型为 T 的对象。对满的通道调用 put! 会阻塞,直到通过 take! 移除一个对象。

Channel(0) 构造一个无缓冲通道。put! 会阻塞,直到调用匹配的 take!。反之亦然。

其他构造函数:

  • Channel(): 默认构造函数,相当于 Channel{Any}(0)
  • Channel(Inf): 相当于 Channel{Any}(typemax(Int))
  • Channel(sz): 相当于 Channel{Any}(sz)
Julia 1.3

默认构造函数 Channel() 和默认 size=0 是在 Julia 1.3 中添加的。

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

func 创建一个新任务,将其绑定到类型为 T 和大小为 size 的新通道,并在一次调用中调度该任务。当任务终止时,通道会自动关闭。

func 必须将绑定的通道作为其唯一参数。

如果您需要对创建的任务的引用,请通过关键字参数 taskref 传递一个 Ref{Task} 对象。

如果 spawn=true,为 func 创建的 Task 可能会在另一个线程上并行调度,相当于通过 Threads.@spawn 创建任务。

如果 spawn=true 且未设置 threadpool 参数,则默认为 :default

如果设置了 threadpool 参数(为 :default:interactive),这意味着 spawn=true,并且新的任务被分配到指定的线程池。

返回一个 Channel

示例

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

引用创建的任务:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

spawn= 参数是在 Julia 1.3 中添加的。此构造函数是在 Julia 1.3 中添加的。在早期版本的 Julia 中,Channel 使用关键字参数来设置 sizeT,但这些构造函数已被弃用。

Julia 1.9

threadpool= 参数是在 Julia 1.9 中添加的。

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

将项 v 添加到通道 c。如果通道已满,则会阻塞。

对于无缓冲通道,直到其他任务执行 take! 时才会阻塞。

Julia 1.1

当调用 put! 时,v 现在会通过 convert 转换为通道的类型。

source
Base.take!Method
take!(c::Channel)

Channel 中按顺序移除并返回一个值。阻塞直到数据可用。对于无缓冲通道,阻塞直到其他任务执行 put!

示例

缓冲通道:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

无缓冲通道:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

确定一个 Channel 是否有值存储在其中。立即返回,不会阻塞。

对于无缓冲通道,如果有任务在等待 put!,则返回 true

示例

缓冲通道:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

无缓冲通道:

julia> c = Channel();

julia> isready(c)  # 没有任务在等待放入!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # 安排一个 put! 任务

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

等待并返回(不移除)Channel 中第一个可用的项目。注意:fetch 在无缓冲(0大小)Channel 上不受支持。

示例

缓冲通道:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # 项目未被移除
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

关闭一个通道。一个异常(可选地由 excp 给出)会在以下情况下抛出:

  • 在一个已关闭的通道上使用 put!
  • 在一个空的、已关闭的通道上使用 take!fetch
source
Base.bindMethod
bind(chnl::Channel, task::Task)

chnl 的生命周期与一个任务关联。当任务终止时,Channel chnl 会自动关闭。任务中未捕获的任何异常会传播给所有在 chnl 上等待的任务。

chnl 对象可以在任务终止时显式关闭。终止的任务对已经关闭的 Channel 对象没有影响。

当一个通道绑定到多个任务时,第一个终止的任务将关闭该通道。当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。

示例

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

schedule 的最简单正确用法是在尚未开始(已调度)的 Task 上。然而,可以将 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566wait 作为构建同步接口的非常低级的构建块。 调用 schedule(task) 的一个关键前提是调用者必须“拥有”该 task;即,它必须知道在调用 schedule(task) 的代码已知的位置中,给定 task 中的 wait 调用正在发生。 确保此前提的一种策略是使用原子操作,如以下示例所示:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent 允许一个任务 wait 另一个任务的 notify。这是一个有限的通信接口,因为 wait 只能从单个任务中使用一次(注意 ev.task 的非原子赋值)。

在这个例子中,notify(ev::OneWayEvent) 仅在 将状态从 OWE_WAITING 修改为 OWE_NOTIFYING 时被允许调用 schedule(ev.task)。这让我们知道执行 wait(ev::OneWayEvent) 的任务现在处于 ok 分支,并且不会有其他任务尝试 schedule(ev.task),因为它们的 @atomicreplace(ev.state, state => OWE_NOTIFYING) 将会失败。