Tasks
Core.Task
— TypeTask(func)
创建一个 Task
(即协程)来执行给定的函数 func
(该函数必须可以无参数调用)。当此函数返回时,任务将退出。任务将在构造时从父级的“世界年龄”中运行,当 schedule
d。
默认情况下,任务的粘性位将设置为 true t.sticky
。这模拟了 @async
的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时将使它们从中调度的任务变为粘性。要获得 Threads.@spawn
的行为,请手动将粘性位设置为 false
。
示例
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
在这个例子中,b
是一个可运行的 Task
,但尚未开始。
Base.@task
— Macro@task
将一个表达式包装在一个 Task
中而不执行它,并返回该 Task
。这仅仅创建一个任务,而不运行它。
默认情况下,任务的粘性位将被设置为 true t.sticky
。这模拟了 @async
的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时会使它们从中调度的任务变为粘性。要获得 Threads.@spawn
的行为,请手动将粘性位设置为 false
。
示例
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
将一个表达式包装在一个 Task
中,并将其添加到本地机器的调度器队列中。
值可以通过 $
插入到 @async
中,这会将值直接复制到构造的底层闭包中。这允许你插入变量的 值,将异步代码与当前任务中变量值的变化隔离开来。
强烈建议始终优先使用 Threads.@spawn
而不是 @async
即使不需要并行性,特别是在公开分发的库中。这是因为在当前的 Julia 实现中,使用 @async
会禁用 父 任务在工作线程之间的迁移。因此,在库函数中看似无害的 @async
使用可能会对用户应用程序的非常不同部分的性能产生重大影响。
从 Julia 1.4 开始,可以通过 $
插入值。
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
使用多个并发任务对集合(或多个相同长度的集合)进行 f
的映射。对于多个集合参数,f
是逐元素应用的。
ntasks
指定要并发运行的任务数量。根据集合的长度,如果未指定 ntasks
,则最多将使用 100 个任务进行并发映射。
ntasks
也可以指定为一个无参数的函数。在这种情况下,在处理每个元素之前会检查并行运行的任务数量,如果 ntasks_func
的值大于当前任务数量,则会启动一个新任务。
如果指定了 batch_size
,则集合将以批处理模式进行处理。此时,f
必须是一个接受参数元组的 Vector
的函数,并且必须返回一个结果的向量。输入向量的长度将为 batch_size
或更少。
以下示例通过返回映射函数执行的任务的 objectid
来突出不同任务中的执行。
首先,在 ntasks
未定义的情况下,每个元素在不同的任务中处理。
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
当 ntasks=2
时,所有元素在 2 个任务中处理。
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
定义 batch_size
后,映射函数需要更改为接受参数元组的数组并返回结果的数组。修改后的映射函数中使用 map
来实现这一点。
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
像 asyncmap
一样,但将输出存储在 results
中,而不是返回一个集合。
当任何被修改的参数与其他参数共享内存时,行为可能会出乎意料。
Base.current_task
— Functioncurrent_task()
获取当前正在运行的 Task
。
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
确定任务是否已退出。
示例
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
确定任务是否已开始执行。
示例
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
确定一个任务是否因为抛出异常而退出。
示例
julia> a4() = error("任务失败");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
此函数至少需要 Julia 1.3。
Base.task_local_storage
— Methodtask_local_storage(key)
在当前任务的任务本地存储中查找键的值。
Base.task_local_storage
— Methodtask_local_storage(key, value)
将一个值分配给当前任务的任务局部存储中的一个键。
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
调用函数 body
,使用修改后的任务局部存储,其中 value
被分配给 key
;key
的先前值(或缺失)在之后恢复。这对于模拟动态作用域非常有用。
Scheduling
Base.yield
— Functionyield()
切换到调度器以允许另一个已调度的任务运行。调用此函数的任务仍然可以运行,如果没有其他可运行的任务,它将立即重新启动。
yield(t::Task, arg = nothing)
一个快速的、不公平调度的 schedule(t, arg); yield()
版本,它在调用调度器之前立即让出控制权给 t
。
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
切换到给定的任务。第一次切换到一个任务时,任务的函数会被调用而不带参数。在后续的切换中,arg
是从任务上一次调用 yieldto
返回的。这是一个低级调用,仅切换任务,不考虑状态或调度。建议避免使用。
Base.sleep
— Functionsleep(seconds)
阻塞当前任务指定的秒数。最小睡眠时间为1毫秒或输入0.001
。
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
将一个 Task
添加到调度器的队列中。这会导致任务在系统其他空闲时不断运行,除非该任务执行了阻塞操作,例如 wait
。
如果提供了第二个参数 val
,它将在任务再次运行时通过 yieldto
的返回值传递给任务。如果 error
为 true
,则该值将在被唤醒的任务中作为异常抛出。
在已经启动的任意 Task
上使用 schedule
是不正确的。有关更多信息,请参见 API 参考。
默认情况下,任务的粘性位将设置为 true t.sticky
。这模拟了 @async
的历史默认值。粘性任务只能在它们首次调度的工作线程上运行,并且在调度时会使它们被调度的任务变为粘性。要获得 Threads.@spawn
的行为,请手动将粘性位设置为 false
。
示例
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
如果任务 t
失败,则将错误日志打印到 stderr
。
示例
julia> Base._wait(errormonitor(Threads.@spawn error("任务失败")))
未处理的任务错误:任务失败
堆栈跟踪:
[...]
Base.@sync
— Macro@sync
等待所有词法上封闭的 @async
、@spawn
、Distributed.@spawnat
和 Distributed.@distributed
的使用完成。所有由封闭的异步操作抛出的异常都会被收集并作为 CompositeException
抛出。
示例
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— Function关于 Threads.Condition
的特别说明:
调用者必须持有拥有 Threads.Condition
的 lock
,才能调用此方法。调用任务将被阻塞,直到其他任务唤醒它,通常是通过在同一 Threads.Condition
对象上调用 notify
。在阻塞时,锁将被原子释放(即使它是递归锁定的),并将在返回之前重新获取。
wait(r::Future)
等待指定的 Future
可用的值。
wait(r::RemoteChannel, args...)
等待在指定的 RemoteChannel
上可用的值。
wait([x])
阻塞当前任务,直到某个事件发生,具体取决于参数的类型:
Channel
:等待一个值被附加到通道中。Condition
:等待条件上的notify
并返回传递给notify
的val
参数。等待条件时,还可以传递first=true
,这会使等待者在notify
时被放在第一位,而不是通常的先进先出行为。Process
:等待一个进程或进程链退出。可以使用进程的exitcode
字段来确定成功或失败。Task
:等待一个Task
完成。如果任务因异常失败,将抛出TaskFailedException
(它包装了失败的任务)。RawFD
:等待文件描述符上的变化(参见FileWatching
包)。
如果没有传递参数,任务将阻塞一个未定义的时间。任务只能通过显式调用schedule
或yieldto
来重新启动。
通常在while
循环中调用wait
以确保在继续之前满足等待的条件。
wait(c::Channel)
阻塞直到 Channel
isready
。
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # 任务被阻塞,因为通道未准备好
false
julia> put!(c, 1);
julia> istaskdone(task) # 任务现在已解除阻塞
true
Base.fetch
— Methodfetch(t::Task)
等待一个 Task
完成,然后返回其结果值。如果任务因异常而失败,将抛出一个 TaskFailedException
(它包装了失败的任务)。
Base.fetch
— Methodfetch(x::Any)
返回 x
。
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
等待直到 testcb()
返回 true
或者超时 timeout
秒,以先到者为准。测试函数每 pollint
秒被轮询一次。pollint
的最小值为 0.001 秒,即 1 毫秒。
返回 :ok
或 :timed_out
。
示例
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
创建一个边缘触发的事件源,任务可以在其上等待。调用 wait
的 Condition
上的任务会被挂起并排队。当稍后在 Condition
上调用 notify
时,任务会被唤醒。等待条件时,如果使用了 notify
的可选参数,可以返回一个值或引发错误。边缘触发意味着只有在调用 notify
时等待的任务才能被唤醒。对于水平触发的通知,您必须保持额外的状态以跟踪通知是否发生。Channel
和 Threads.Event
类型可以做到这一点,并可用于水平触发事件。
此对象不是线程安全的。有关线程安全版本,请参见 Threads.Condition
。
Base.Threads.Condition
— TypeThreads.Condition([lock])
一个线程安全的 Base.Condition
版本。
要在 Threads.Condition
上调用 wait
或 notify
,您必须首先在其上调用 lock
。当调用 wait
时,锁在阻塞期间会被原子释放,并将在 wait
返回之前重新获取。因此,Threads.Condition
c
的惯用用法如下所示:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
此功能至少需要 Julia 1.2。
Base.Event
— TypeEvent([autoreset=false])
创建一个级别触发的事件源。调用 wait
的任务在 Event
上被挂起并排队,直到在 Event
上调用 notify
。在调用 notify
后,Event
保持在已信号状态,任务在等待它时将不再阻塞,直到调用 reset
。
如果 autoreset
为真,则每次调用 notify
时,最多会释放一个任务从 wait
中。
这提供了在 notify/wait 上的获取和释放内存顺序。
此功能至少需要 Julia 1.1。
autoreset
功能和内存顺序保证至少需要 Julia 1.8。
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
唤醒等待条件的任务,并传递给它们 val
。如果 all
为 true
(默认值),则唤醒所有等待的任务,否则只唤醒一个。如果 error
为 true
,则在被唤醒的任务中将传递的值作为异常抛出。
返回唤醒的任务数量。如果没有任务在 condition
上等待,则返回 0。
Base.reset
— MethodBase.Semaphore
— TypeSemaphore(sem_size)
创建一个计数信号量,允许最多 sem_size
次获取在任何时候使用。每次获取必须与一次释放相匹配。
这为获取/释放调用提供了获取和释放的内存顺序。
Base.acquire
— Functionacquire(s::Semaphore)
等待可用的 sem_size
许可之一,阻塞直到可以获取一个。
acquire(f, s::Semaphore)
在从信号量 s
获取后执行 f
,并在完成或出错时 release
。
例如,确保同时只有 2 次 foo
调用处于活动状态的 do-block 形式:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
此方法至少需要 Julia 1.8。
Base.release
— Functionrelease(s::Semaphore)
将一个许可返回到池中,可能允许另一个任务获取它并恢复执行。
Base.AbstractLock
— TypeBase.lock
— Functionlock(f::Function, lock)
获取 lock
,在持有 lock
的情况下执行 f
,并在 f
返回时释放 lock
。如果 lock
已被其他任务/线程锁定,则等待其变为可用。
当此函数返回时,lock
已被释放,因此调用者不应尝试 unlock
它。
另见: @lock
。
将 Channel
作为第二个参数需要 Julia 1.7 或更高版本。
lock(f::Function, l::Lockable)
获取与 l
相关联的锁,执行持锁的 f
,并在 f
返回时释放锁。f
将接收一个位置参数:被 l
包裹的值。如果锁已经被其他任务/线程锁定,则等待其变为可用。当此函数返回时,lock
已被释放,因此调用者不应尝试 unlock
它。
至少需要 Julia 1.11。
Base.unlock
— Functionunlock(lock)
释放对 lock
的拥有权。
如果这是一个之前已被获取的递归锁,则递减内部计数器并立即返回。
Base.trylock
— Functiontrylock(lock) -> Success (Boolean)
如果锁可用,则获取锁,并在成功时返回 true
。如果锁已被其他任务/线程锁定,则返回 false
。
每个成功的 trylock
必须与 unlock
匹配。
函数 trylock
结合 islocked
可用于编写测试-测试-设置或指数退避算法 如果 typeof(lock)
支持此功能(请阅读其文档)。
Base.islocked
— Functionislocked(lock) -> 状态 (布尔值)
检查 lock
是否被任何任务/线程持有。此函数本身不应用于同步。然而,islocked
结合 trylock
可以用于编写测试-测试-设置或指数退避算法 如果 typeof(lock)
支持的话(请阅读其文档)。
扩展帮助
例如,如果 lock
的实现满足下面文档中记录的属性,可以实现指数退避,如下所示。
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("超时")
end
trylock(lock) && break
backoff()
end
实现
建议锁的实现定义 islocked
,并在其文档字符串中注明以下属性。
islocked(lock)
是无数据竞争的。- 如果
islocked(lock)
返回false
,则在没有其他任务干扰的情况下,立即调用trylock(lock)
必须成功(返回true
)。
Base.ReentrantLock
— TypeReentrantLock()
创建一个可重入锁用于同步 Task
s。相同的任务可以根据需要多次获取锁(这就是名称中“可重入”部分的意思)。每个 lock
必须与一个 unlock
匹配。
调用 lock
还会抑制该线程上终结器的运行,直到相应的 unlock
。下面所示的标准锁模式的使用应该自然得到支持,但要小心反转尝试/锁定顺序或完全遗漏尝试块(例如,尝试在仍持有锁的情况下返回):
这提供了锁/解锁调用的获取/释放内存顺序。
lock(l)
try
<atomic work>
finally
unlock(l)
end
如果 !islocked(lck::ReentrantLock)
为真,则 trylock(lck)
成功,除非有其他任务试图“同时”持有锁。
Base.@lock
— Macro@lock l expr
lock(f, l::AbstractLock)
的宏版本,但使用 expr
而不是 f
函数。展开为:
lock(l)
try
expr
finally
unlock(l)
end
这类似于使用 lock
和 do
块,但避免了创建闭包,从而可以提高性能。
@lock
在 Julia 1.3 中添加,并在 Julia 1.10 中导出。
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
创建一个 Lockable
对象,该对象包装 value
并将其与提供的 lock
关联。此对象支持 @lock
、lock
、trylock
、unlock
。要访问值,必须在持有锁的情况下索引可锁对象。
至少需要 Julia 1.11。
示例
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # 必须持有锁才能访问值
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
表示传递类型为 T
的对象的通道。
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
构造一个 Channel
,其内部缓冲区最多可以容纳 size
个类型为 T
的对象。对满的通道调用 put!
会阻塞,直到通过 take!
移除一个对象。
Channel(0)
构造一个无缓冲通道。put!
会阻塞,直到调用匹配的 take!
。反之亦然。
其他构造函数:
Channel()
: 默认构造函数,相当于Channel{Any}(0)
Channel(Inf)
: 相当于Channel{Any}(typemax(Int))
Channel(sz)
: 相当于Channel{Any}(sz)
默认构造函数 Channel()
和默认 size=0
是在 Julia 1.3 中添加的。
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
从 func
创建一个新任务,将其绑定到类型为 T
和大小为 size
的新通道,并在一次调用中调度该任务。当任务终止时,通道会自动关闭。
func
必须将绑定的通道作为其唯一参数。
如果您需要对创建的任务的引用,请通过关键字参数 taskref
传递一个 Ref{Task}
对象。
如果 spawn=true
,为 func
创建的 Task
可能会在另一个线程上并行调度,相当于通过 Threads.@spawn
创建任务。
如果 spawn=true
且未设置 threadpool
参数,则默认为 :default
。
如果设置了 threadpool
参数(为 :default
或 :interactive
),这意味着 spawn=true
,并且新的任务被分配到指定的线程池。
返回一个 Channel
。
示例
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
引用创建的任务:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
spawn=
参数是在 Julia 1.3 中添加的。此构造函数是在 Julia 1.3 中添加的。在早期版本的 Julia 中,Channel 使用关键字参数来设置 size
和 T
,但这些构造函数已被弃用。
threadpool=
参数是在 Julia 1.9 中添加的。
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
将项 v
添加到通道 c
。如果通道已满,则会阻塞。
对于无缓冲通道,直到其他任务执行 take!
时才会阻塞。
当调用 put!
时,v
现在会通过 convert
转换为通道的类型。
Base.take!
— Methodtake!(c::Channel)
从 Channel
中按顺序移除并返回一个值。阻塞直到数据可用。对于无缓冲通道,阻塞直到其他任务执行 put!
。
示例
缓冲通道:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
无缓冲通道:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
确定一个 Channel
是否有值存储在其中。立即返回,不会阻塞。
对于无缓冲通道,如果有任务在等待 put!
,则返回 true
。
示例
缓冲通道:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
无缓冲通道:
julia> c = Channel();
julia> isready(c) # 没有任务在等待放入!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # 安排一个 put! 任务
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
等待并返回(不移除)Channel
中第一个可用的项目。注意:fetch
在无缓冲(0大小)Channel
上不受支持。
示例
缓冲通道:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # 项目未被移除
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
关闭一个通道。一个异常(可选地由 excp
给出)会在以下情况下抛出:
Base.bind
— Methodbind(chnl::Channel, task::Task)
将 chnl
的生命周期与一个任务关联。当任务终止时,Channel
chnl
会自动关闭。任务中未捕获的任何异常会传播给所有在 chnl
上等待的任务。
chnl
对象可以在任务终止时显式关闭。终止的任务对已经关闭的 Channel
对象没有影响。
当一个通道绑定到多个任务时,第一个终止的任务将关闭该通道。当多个通道绑定到同一个任务时,任务的终止将关闭所有绑定的通道。
示例
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
schedule
的最简单正确用法是在尚未开始(已调度)的 Task
上。然而,可以将 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
和 wait
作为构建同步接口的非常低级的构建块。 调用 schedule(task)
的一个关键前提是调用者必须“拥有”该 task
;即,它必须知道在调用 schedule(task)
的代码已知的位置中,给定 task
中的 wait
调用正在发生。 确保此前提的一种策略是使用原子操作,如以下示例所示:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
允许一个任务 wait
另一个任务的 notify
。这是一个有限的通信接口,因为 wait
只能从单个任务中使用一次(注意 ev.task
的非原子赋值)。
在这个例子中,notify(ev::OneWayEvent)
仅在 它 将状态从 OWE_WAITING
修改为 OWE_NOTIFYING
时被允许调用 schedule(ev.task)
。这让我们知道执行 wait(ev::OneWayEvent)
的任务现在处于 ok
分支,并且不会有其他任务尝试 schedule(ev.task)
,因为它们的 @atomicreplace(ev.state, state => OWE_NOTIFYING)
将会失败。