Multi-Threading
Base.Threads.@threads — MacroThreads.@threads [schedule] for ... end一个宏,用于并行执行 for 循环。迭代空间被分配给粗粒度的任务。此策略可以通过 schedule 参数指定。循环的执行会等待所有迭代的评估。
另请参见: @spawn 和 Distributed 中的 pmap。
扩展帮助
语义
除非调度选项指定了更强的保证,否则由 @threads 宏执行的循环具有以下语义。
@threads 宏以未指定的顺序和潜在的并发方式执行循环体。它不指定任务和工作线程的确切分配。每次执行的分配可能不同。循环体代码(包括从中递归调用的任何代码)不得对迭代分配给任务或执行它们的工作线程的分布做出任何假设。每次迭代的循环体必须能够独立于其他迭代向前推进,并且不受数据竞争的影响。因此,跨迭代的无效同步可能导致死锁,而未同步的内存访问可能导致未定义的行为。
例如,上述条件意味着:
- 在一个迭代中获取的锁 必须 在同一迭代中释放。
- 使用阻塞原语如
Channel在迭代之间进行通信是不正确的。 - 仅写入在迭代之间不共享的位置(除非使用锁或原子操作)。
- 除非使用
:static调度,否则threadid()的值可能在单个迭代内发生变化。请参见Task Migration。
调度器
如果没有调度器参数,则确切的调度是未指定的,并且在 Julia 版本之间有所不同。目前,当未指定调度器时,使用 :dynamic。
从 Julia 1.5 开始,schedule 参数可用。
:dynamic(默认)
:dynamic 调度器动态地将迭代分配给可用的工作线程。当前实现假设每个迭代的工作负载是均匀的。然而,这一假设在未来可能会被移除。
此调度选项仅仅是对底层执行机制的提示。然而,可以预期一些属性。:dynamic 调度器使用的 Task 数量受可用工作线程数量的一个小常数倍的限制(Threads.threadpoolsize())。每个任务处理迭代空间的连续区域。因此,@threads :dynamic for x in xs; f(x); end 通常比 @sync for x in xs; @spawn f(x); end 更高效,前提是 length(xs) 显著大于工作线程的数量,并且 f(x) 的运行时间相对小于生成和同步任务的成本(通常小于 10 微秒)。
从 Julia 1.8 开始,schedule 参数的 :dynamic 选项可用且为默认值。
:greedy
:greedy 调度器最多生成 Threads.threadpoolsize() 个任务,每个任务贪婪地处理生成的给定迭代值。只要一个任务完成其工作,它就会从迭代器中获取下一个值。任何单个任务完成的工作不一定是来自迭代器的连续值。给定的迭代器可能会无限生成值,只需要迭代器接口(不需要索引)。
如果单个迭代的工作负载不均匀/有较大差异,这个调度选项通常是一个不错的选择。
从 Julia 1.11 开始,schedule 参数的 :greedy 选项可用。
:static
:static 调度器为每个线程创建一个任务,并在它们之间均匀分配迭代,将每个任务专门分配给每个线程。特别地,threadid() 的值在一个迭代内是恒定的。如果在另一个 @threads 循环内部或从线程 1 以外的线程使用,则指定 :static 是错误的。
:static 调度的存在是为了支持在 Julia 1.3 之前编写的代码的过渡。在新编写的库函数中,不鼓励使用 :static 调度,因为使用此选项的函数不能从任意工作线程调用。
示例
为了说明不同的调度策略,考虑以下包含非让步定时循环的函数 busywait,该循环运行给定的秒数。
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time):dynamic 示例耗时 2 秒,因为一个未占用的线程能够运行两个 1 秒的迭代以完成 for 循环。
Base.Threads.foreach — FunctionThreads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())类似于 foreach(f, channel),但对 channel 的迭代和对 f 的调用是在 Threads.@spawn 生成的 ntasks 任务之间分配的。此函数将在所有内部生成的任务完成之前等待返回。
如果 schedule isa FairSchedule,Threads.foreach 将尝试以一种方式生成任务,使得 Julia 的调度器能够更自由地在线程之间负载平衡工作项。这种方法通常具有更高的每项开销,但在与其他多线程工作负载并发时可能表现得比 StaticSchedule 更好。
如果 schedule isa StaticSchedule,Threads.foreach 将以一种产生比 FairSchedule 更低的每项开销的方式生成任务,但不太适合负载平衡。因此,这种方法可能更适合细粒度、均匀的工作负载,但在与其他多线程工作负载并发时可能表现得比 FairSchedule 更差。
示例
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]此函数需要 Julia 1.6 或更高版本。
Base.Threads.@spawn — MacroThreads.@spawn [:default|:interactive] expr创建一个 Task 并将其 schedule 到指定线程池中的任何可用线程(如果未指定,则为 :default)。一旦有线程可用,任务就会被分配到该线程。要等待任务完成,请在此宏的结果上调用 wait,或调用 fetch 以等待并获取其返回值。
可以通过 $ 将值插入到 @spawn 中,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 值,将异步代码与当前任务中变量值的变化隔离开来。
任务运行的线程可能会在任务让出时改变,因此 threadid() 不应被视为任务的常量。请参见 Task Migration,以及更广泛的 multi-threading 手册以获取更多重要注意事项。另请参见关于 threadpools 的章节。
此宏自 Julia 1.3 起可用。
自 Julia 1.4 起可通过 $ 插入值。
自 Julia 1.9 起可以指定线程池。
示例
julia> t() = println("Hello from ", Threads.threadid());
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4Base.Threads.threadid — FunctionThreads.threadid() -> Int获取当前执行线程的 ID 号。主线程的 ID 为 1。
示例
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4!!! 注意 任务运行的线程可能会在任务让出时发生变化,这被称为 Task Migration。因此,在大多数情况下,使用 threadid() 来索引,例如,缓冲区或状态对象的向量是不安全的。
Base.Threads.maxthreadid — FunctionThreads.maxthreadid() -> Int获取可用于 Julia 进程的线程数量的下限(跨所有线程池),具有原子获取语义。结果将始终大于或等于 threadid() 以及 threadid(task),对于您在调用 maxthreadid 之前能够观察到的任何任务。
Base.Threads.nthreads — FunctionThreads.nthreads(:default | :interactive) -> Int获取指定线程池中当前的线程数量。:interactive 中的线程具有 id 编号 1:nthreads(:interactive),而 :default 中的线程具有 id 编号 nthreads(:interactive) .+ (1:nthreads(:default))。
另请参见 [BLAS.get_num_threads] 和 [BLAS.set_num_threads] 在 LinearAlgebra 标准库中,以及 [nprocs()] 在 Distributed 标准库和 Threads.maxthreadid()。
Base.Threads.threadpool — FunctionThreads.threadpool(tid = threadid()) -> Symbol返回指定线程的线程池;可以是 :default、:interactive 或 :foreign。
Base.Threads.nthreadpools — FunctionThreads.nthreadpools() -> Int返回当前配置的线程池数量。
Base.Threads.threadpoolsize — FunctionThreads.threadpoolsize(pool::Symbol = :default) -> Int获取可用于默认线程池(或指定线程池)的线程数量。
另请参见标准库中的 BLAS.get_num_threads 和 BLAS.set_num_threads,以及 Distributed 标准库中的 nprocs()。
Base.Threads.ngcthreads — FunctionThreads.ngcthreads() -> Int返回当前配置的 GC 线程数量。这包括标记线程和并发清扫线程。
另请参见 Multi-Threading。
Atomic operations
atomic — Keyword不安全的指针操作与在 C11 和 C++23 中声明为 _Atomic 和 std::atomic 类型的指针的加载和存储是兼容的。如果不支持原子加载 Julia 类型 T,可能会抛出错误。
另请参见: unsafe_load, unsafe_modify!, unsafe_replace!, unsafe_store!, unsafe_swap!
Base.@atomic — Macro@atomic var
@atomic order ex将 var 或 ex 标记为以原子方式执行,如果 ex 是一个支持的表达式。如果未指定 order,则默认为 :sequentially_consistent。
@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend以原子方式执行右侧表达的存储操作并返回新值。
使用 = 时,此操作转换为 setproperty!(a.b, :x, new) 调用。使用任何运算符时,此操作也转换为 modifyproperty!(a.b, :x, +, addend)[2] 调用。
@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2以原子方式执行右侧表达的二元操作。将结果存储到第一个参数中的字段,并返回值 (old, new)。
此操作转换为 modifyproperty!(a.b, :x, func, arg2) 调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
1
julia> @atomic :sequentially_consistent a.x = 2 # 以顺序一致性设置 a 的字段 x
2
julia> @atomic a.x += 1 # 以顺序一致性递增 a 的字段 x
3
julia> @atomic a.x + 1 # 以顺序一致性递增 a 的字段 x
3 => 4
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
4
julia> @atomic max(a.x, 10) # 以顺序一致性将 a 的字段 x 更改为最大值
4 => 10
julia> @atomic a.x max 5 # 再次以顺序一致性将 a 的字段 x 更改为最大值
10 => 10此功能需要至少 Julia 1.7。
Base.@atomicswap — Macro@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new将 new 存储到 a.b.x 中,并返回 a.b.x 的旧值。
此操作转换为 swapproperty!(a.b, :x, new) 调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicswap a.x = 2+2 # 用 4 替换 a 的字段 x,具有顺序一致性
1
julia> @atomic a.x # 获取 a 的字段 x,具有顺序一致性
4此功能至少需要 Julia 1.7。
Base.@atomicreplace — Macro@atomicreplace a.b.x 预期 => 期望
@atomicreplace :sequentially_consistent a.b.x 预期 => 期望
@atomicreplace :sequentially_consistent :monotonic a.b.x 预期 => 期望执行成对的条件替换,原子性地返回值 (old, success::Bool)。其中 success 表示替换是否完成。
此操作转换为 replaceproperty!(a.b, :x, 预期, 期望) 调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicreplace a.x 1 => 2 # 如果 a 的字段 x 为 1,则用 2 替换,具有顺序一致性
(old = 1, success = true)
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
2
julia> @atomicreplace a.x 1 => 2 # 如果 a 的字段 x 为 1,则用 2 替换,具有顺序一致性
(old = 2, success = false)
julia> xchg = 2 => 0; # 如果 a 的字段 x 为 2,则用 0 替换,具有顺序一致性
julia> @atomicreplace a.x xchg
(old = 2, success = true)
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
0此功能至少需要 Julia 1.7。
Base.@atomiconce — Macro@atomiconce a.b.x = value
@atomiconce :sequentially_consistent a.b.x = value
@atomiconce :sequentially_consistent :monotonic a.b.x = value如果之前未设置,则以原子方式执行值的条件赋值,返回值 success::Bool。其中 success 表示赋值是否完成。
此操作转换为 setpropertyonce!(a.b, :x, value) 调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct AtomicOnce
@atomic x
AtomicOnce() = new()
end
julia> a = AtomicOnce()
AtomicOnce(#undef)
julia> @atomiconce a.x = 1 # 如果未设置,则以顺序一致性将 a 的字段 x 设置为 1
true
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
1
julia> @atomiconce a.x = 1 # 如果未设置,则以顺序一致性将 a 的字段 x 设置为 1
false此功能至少需要 Julia 1.11。
Core.AtomicMemory — TypeAtomicMemory{T} == GenericMemory{:atomic, T, Core.CPU}固定大小的 DenseVector{T}。对其任何元素的访问都是原子的(使用 :monotonic 排序)。设置任何元素必须使用 @atomic 宏并明确指定排序。
每个元素在访问时都是独立的原子,不能以非原子的方式设置。目前 @atomic 宏和更高级的接口尚未完成,但未来实现的构建块是内部内置函数 Core.memoryrefget、Core.memoryrefset!、Core.memoryref_isassigned、Core.memoryrefswap!、Core.memoryrefmodify! 和 Core.memoryrefreplace!。
有关详细信息,请参见 Atomic Operations
此类型需要 Julia 1.11 或更高版本。
还有可选的内存排序参数用于 unsafe 函数集,如果指定该参数,将选择与 C/C++ 兼容的这些原子操作版本,具体包括 unsafe_load、unsafe_store!、unsafe_swap!、unsafe_replace! 和 unsafe_modify!。
以下API已被弃用,但对它们的支持可能会在多个版本中继续存在。
Base.Threads.Atomic — TypeThreads.Atomic{T}持有类型为 T 的对象的引用,确保它仅以原子方式访问,即以线程安全的方式。
只有某些“简单”类型可以原子地使用,即原始布尔、整数和浮点类型。这些类型包括 Bool、Int8...Int128、UInt8...UInt128,以及 Float16...Float64。
新的原子对象可以从非原子值创建;如果未指定,则原子对象初始化为零。
可以使用 [] 符号访问原子对象:
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1原子操作使用 atomic_ 前缀,例如 atomic_add!、atomic_xchg! 等。
Base.Threads.atomic_cas! — FunctionThreads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T原子比较并设置 x
原子地将 x 中的值与 cmp 进行比较。如果相等,则将 newval 写入 x。否则,保持 x 不变。返回 x 中的旧值。通过将返回的值与 cmp 进行比较(通过 ===),可以知道 x 是否被修改,并且现在是否持有新值 newval。
有关更多详细信息,请参见 LLVM 的 cmpxchg 指令。
此函数可用于实现事务语义。在事务之前,记录 x 中的值。在事务之后,仅在此期间 x 未被修改的情况下存储新值。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)Base.Threads.atomic_xchg! — FunctionThreads.atomic_xchg!(x::Atomic{T}, newval::T) where T原子性地交换 x 中的值
原子性地将 x 中的值与 newval 交换。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw xchg 指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2Base.Threads.atomic_add! — FunctionThreads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes原子地将 val 加到 x 上
以原子方式执行 x[] += val。返回 旧 值。未定义于 Atomic{Bool}。
有关更多详细信息,请参见 LLVM 的 atomicrmw add 指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5Base.Threads.atomic_sub! — FunctionThreads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes原子地从 x 中减去 val
原子地执行 x[] -= val。返回旧值。未定义于 Atomic{Bool}。
有关更多详细信息,请参见 LLVM 的 atomicrmw sub 指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1Base.Threads.atomic_and! — FunctionThreads.atomic_and!(x::Atomic{T}, val::T) where T原子性地对 x 和 val 进行按位与操作
原子性地执行 x[] &= val。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw and 指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2Base.Threads.atomic_nand! — FunctionThreads.atomic_nand!(x::Atomic{T}, val::T) where T原子性地对 x 和 val 进行按位 NAND(非与)操作
原子性地执行 x[] = ~(x[] & val)。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw nand 指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3Base.Threads.atomic_or! — FunctionThreads.atomic_or!(x::Atomic{T}, val::T) where T原子性地对 x 和 val 进行按位或操作
原子性地执行 x[] |= val。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw or 指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7Base.Threads.atomic_xor! — FunctionThreads.atomic_xor!(x::Atomic{T}, val::T) where T原子性地对 x 和 val 进行按位异或(exclusive-or)
原子性地执行 x[] $= val。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw xor 指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2Base.Threads.atomic_max! — FunctionThreads.atomic_max!(x::Atomic{T}, val::T) where T原子性地将 x 和 val 的最大值存储在 x 中
原子性地执行 x[] = max(x[], val)。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw max 指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7Base.Threads.atomic_min! — FunctionThreads.atomic_min!(x::Atomic{T}, val::T) where T原子性地将 x 和 val 的最小值存储在 x 中
原子性地执行 x[] = min(x[], val)。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw min 指令。
示例
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5Base.Threads.atomic_fence — FunctionThreads.atomic_fence()插入一个顺序一致性内存屏障
插入一个具有顺序一致性排序语义的内存屏障。有些算法需要这样做,即获取/释放排序不足的情况。
这可能是一个非常昂贵的操作。考虑到Julia中所有其他原子操作已经具有获取/释放语义,显式屏障在大多数情况下是不必要的。
有关更多详细信息,请参见LLVM的fence指令。
ccall using a libuv threadpool (Experimental)
Base.@threadcall — Macro@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)@threadcall 宏的调用方式与 ccall 相同,但在不同的线程中执行。这在您想要调用一个阻塞的 C 函数而不导致当前 julia 线程被阻塞时非常有用。并发性受到 libuv 线程池大小的限制,默认情况下为 4 个线程,但可以通过设置 UV_THREADPOOL_SIZE 环境变量并重新启动 julia 进程来增加。
请注意,被调用的函数绝不应回调到 Julia。
Low-level synchronization primitives
这些构建块用于创建常规同步对象。
Base.Threads.SpinLock — TypeSpinLock()创建一个非重入的测试-测试-设置自旋锁。递归使用将导致死锁。这种锁应该仅在执行时间很短且不阻塞的代码周围使用(例如,执行 I/O)。一般来说,应该使用 ReentrantLock 代替。
每个 lock 必须与一个 unlock 匹配。如果 !islocked(lck::SpinLock) 为真,则 trylock(lck) 成功,除非有其他任务试图“同时”持有锁。
测试-测试-设置自旋锁在大约 30 个竞争线程之前是最快的。如果竞争超过这个数量,则应该考虑不同的同步方法。