Multi-Threading
Base.Threads.@threads
— MacroThreads.@threads [schedule] for ... end
一个宏,用于并行执行 for
循环。迭代空间被分配给粗粒度的任务。此策略可以通过 schedule
参数指定。循环的执行会等待所有迭代的评估。
另请参见: @spawn
和 Distributed
中的 pmap
。
扩展帮助
语义
除非调度选项指定了更强的保证,否则由 @threads
宏执行的循环具有以下语义。
@threads
宏以未指定的顺序和潜在的并发方式执行循环体。它不指定任务和工作线程的确切分配。每次执行的分配可能不同。循环体代码(包括从中递归调用的任何代码)不得对迭代分配给任务或执行它们的工作线程的分布做出任何假设。每次迭代的循环体必须能够独立于其他迭代向前推进,并且不受数据竞争的影响。因此,跨迭代的无效同步可能导致死锁,而未同步的内存访问可能导致未定义的行为。
例如,上述条件意味着:
- 在一个迭代中获取的锁 必须 在同一迭代中释放。
- 使用阻塞原语如
Channel
在迭代之间进行通信是不正确的。 - 仅写入在迭代之间不共享的位置(除非使用锁或原子操作)。
- 除非使用
:static
调度,否则threadid()
的值可能在单个迭代内发生变化。请参见Task Migration
。
调度器
如果没有调度器参数,则确切的调度是未指定的,并且在 Julia 版本之间有所不同。目前,当未指定调度器时,使用 :dynamic
。
从 Julia 1.5 开始,schedule
参数可用。
:dynamic
(默认)
:dynamic
调度器动态地将迭代分配给可用的工作线程。当前实现假设每个迭代的工作负载是均匀的。然而,这一假设在未来可能会被移除。
此调度选项仅仅是对底层执行机制的提示。然而,可以预期一些属性。:dynamic
调度器使用的 Task
数量受可用工作线程数量的一个小常数倍的限制(Threads.threadpoolsize()
)。每个任务处理迭代空间的连续区域。因此,@threads :dynamic for x in xs; f(x); end
通常比 @sync for x in xs; @spawn f(x); end
更高效,前提是 length(xs)
显著大于工作线程的数量,并且 f(x)
的运行时间相对小于生成和同步任务的成本(通常小于 10 微秒)。
从 Julia 1.8 开始,schedule
参数的 :dynamic
选项可用且为默认值。
:greedy
:greedy
调度器最多生成 Threads.threadpoolsize()
个任务,每个任务贪婪地处理生成的给定迭代值。只要一个任务完成其工作,它就会从迭代器中获取下一个值。任何单个任务完成的工作不一定是来自迭代器的连续值。给定的迭代器可能会无限生成值,只需要迭代器接口(不需要索引)。
如果单个迭代的工作负载不均匀/有较大差异,这个调度选项通常是一个不错的选择。
从 Julia 1.11 开始,schedule
参数的 :greedy
选项可用。
:static
:static
调度器为每个线程创建一个任务,并在它们之间均匀分配迭代,将每个任务专门分配给每个线程。特别地,threadid()
的值在一个迭代内是恒定的。如果在另一个 @threads
循环内部或从线程 1 以外的线程使用,则指定 :static
是错误的。
:static
调度的存在是为了支持在 Julia 1.3 之前编写的代码的过渡。在新编写的库函数中,不鼓励使用 :static
调度,因为使用此选项的函数不能从任意工作线程调用。
示例
为了说明不同的调度策略,考虑以下包含非让步定时循环的函数 busywait
,该循环运行给定的秒数。
julia> function busywait(seconds)
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
end
end
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :static for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
julia> @time begin
Threads.@spawn busywait(5)
Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
busywait(1)
end
end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)
:dynamic
示例耗时 2 秒,因为一个未占用的线程能够运行两个 1 秒的迭代以完成 for 循环。
Base.Threads.foreach
— FunctionThreads.foreach(f, channel::Channel;
schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
ntasks=Threads.threadpoolsize())
类似于 foreach(f, channel)
,但对 channel
的迭代和对 f
的调用是在 Threads.@spawn
生成的 ntasks
任务之间分配的。此函数将在所有内部生成的任务完成之前等待返回。
如果 schedule isa FairSchedule
,Threads.foreach
将尝试以一种方式生成任务,使得 Julia 的调度器能够更自由地在线程之间负载平衡工作项。这种方法通常具有更高的每项开销,但在与其他多线程工作负载并发时可能表现得比 StaticSchedule
更好。
如果 schedule isa StaticSchedule
,Threads.foreach
将以一种产生比 FairSchedule
更低的每项开销的方式生成任务,但不太适合负载平衡。因此,这种方法可能更适合细粒度、均匀的工作负载,但在与其他多线程工作负载并发时可能表现得比 FairSchedule
更差。
示例
julia> n = 20
julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
julia> d = Channel{Int}(n) do ch
f = i -> put!(ch, i^2)
Threads.foreach(f, c)
end
julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
此函数需要 Julia 1.6 或更高版本。
Base.Threads.@spawn
— MacroThreads.@spawn [:default|:interactive] expr
创建一个 Task
并将其 schedule
到指定线程池中的任何可用线程(如果未指定,则为 :default
)。一旦有线程可用,任务就会被分配到该线程。要等待任务完成,请在此宏的结果上调用 wait
,或调用 fetch
以等待并获取其返回值。
可以通过 $
将值插入到 @spawn
中,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 值,将异步代码与当前任务中变量值的变化隔离开来。
任务运行的线程可能会在任务让出时改变,因此 threadid()
不应被视为任务的常量。请参见 Task Migration
,以及更广泛的 multi-threading 手册以获取更多重要注意事项。另请参见关于 threadpools 的章节。
此宏自 Julia 1.3 起可用。
自 Julia 1.4 起可通过 $
插入值。
自 Julia 1.9 起可以指定线程池。
示例
julia> t() = println("Hello from ", Threads.threadid());
julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
Base.Threads.threadid
— FunctionThreads.threadid() -> Int
获取当前执行线程的 ID 号。主线程的 ID 为 1
。
示例
julia> Threads.threadid()
1
julia> Threads.@threads for i in 1:4
println(Threads.threadid())
end
4
2
5
4
!!! 注意 任务运行的线程可能会在任务让出时发生变化,这被称为 Task Migration
。因此,在大多数情况下,使用 threadid()
来索引,例如,缓冲区或状态对象的向量是不安全的。
Base.Threads.maxthreadid
— FunctionThreads.maxthreadid() -> Int
获取可用于 Julia 进程的线程数量的下限(跨所有线程池),具有原子获取语义。结果将始终大于或等于 threadid()
以及 threadid(task)
,对于您在调用 maxthreadid
之前能够观察到的任何任务。
Base.Threads.nthreads
— FunctionThreads.nthreads(:default | :interactive) -> Int
获取指定线程池中当前的线程数量。:interactive
中的线程具有 id 编号 1:nthreads(:interactive)
,而 :default
中的线程具有 id 编号 nthreads(:interactive) .+ (1:nthreads(:default))
。
另请参见 [BLAS.get_num_threads
] 和 [BLAS.set_num_threads
] 在 LinearAlgebra
标准库中,以及 [nprocs()
] 在 Distributed
标准库和 Threads.maxthreadid()
。
Base.Threads.threadpool
— FunctionThreads.threadpool(tid = threadid()) -> Symbol
返回指定线程的线程池;可以是 :default
、:interactive
或 :foreign
。
Base.Threads.nthreadpools
— FunctionThreads.nthreadpools() -> Int
返回当前配置的线程池数量。
Base.Threads.threadpoolsize
— FunctionThreads.threadpoolsize(pool::Symbol = :default) -> Int
获取可用于默认线程池(或指定线程池)的线程数量。
另请参见标准库中的 BLAS.get_num_threads
和 BLAS.set_num_threads
,以及 Distributed
标准库中的 nprocs()
。
Base.Threads.ngcthreads
— FunctionThreads.ngcthreads() -> Int
返回当前配置的 GC 线程数量。这包括标记线程和并发清扫线程。
另请参见 Multi-Threading。
Atomic operations
atomic
— Keyword不安全的指针操作与在 C11 和 C++23 中声明为 _Atomic
和 std::atomic
类型的指针的加载和存储是兼容的。如果不支持原子加载 Julia 类型 T
,可能会抛出错误。
另请参见: unsafe_load
, unsafe_modify!
, unsafe_replace!
, unsafe_store!
, unsafe_swap!
Base.@atomic
— Macro@atomic var
@atomic order ex
将 var
或 ex
标记为以原子方式执行,如果 ex
是一个支持的表达式。如果未指定 order
,则默认为 :sequentially_consistent。
@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend
以原子方式执行右侧表达的存储操作并返回新值。
使用 =
时,此操作转换为 setproperty!(a.b, :x, new)
调用。使用任何运算符时,此操作也转换为 modifyproperty!(a.b, :x, +, addend)[2]
调用。
@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2
以原子方式执行右侧表达的二元操作。将结果存储到第一个参数中的字段,并返回值 (old, new)
。
此操作转换为 modifyproperty!(a.b, :x, func, arg2)
调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
1
julia> @atomic :sequentially_consistent a.x = 2 # 以顺序一致性设置 a 的字段 x
2
julia> @atomic a.x += 1 # 以顺序一致性递增 a 的字段 x
3
julia> @atomic a.x + 1 # 以顺序一致性递增 a 的字段 x
3 => 4
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
4
julia> @atomic max(a.x, 10) # 以顺序一致性将 a 的字段 x 更改为最大值
4 => 10
julia> @atomic a.x max 5 # 再次以顺序一致性将 a 的字段 x 更改为最大值
10 => 10
此功能需要至少 Julia 1.7。
Base.@atomicswap
— Macro@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new
将 new
存储到 a.b.x
中,并返回 a.b.x
的旧值。
此操作转换为 swapproperty!(a.b, :x, new)
调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicswap a.x = 2+2 # 用 4 替换 a 的字段 x,具有顺序一致性
1
julia> @atomic a.x # 获取 a 的字段 x,具有顺序一致性
4
此功能至少需要 Julia 1.7。
Base.@atomicreplace
— Macro@atomicreplace a.b.x 预期 => 期望
@atomicreplace :sequentially_consistent a.b.x 预期 => 期望
@atomicreplace :sequentially_consistent :monotonic a.b.x 预期 => 期望
执行成对的条件替换,原子性地返回值 (old, success::Bool)
。其中 success
表示替换是否完成。
此操作转换为 replaceproperty!(a.b, :x, 预期, 期望)
调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct Atomic{T}; @atomic x::T; end
julia> a = Atomic(1)
Atomic{Int64}(1)
julia> @atomicreplace a.x 1 => 2 # 如果 a 的字段 x 为 1,则用 2 替换,具有顺序一致性
(old = 1, success = true)
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
2
julia> @atomicreplace a.x 1 => 2 # 如果 a 的字段 x 为 1,则用 2 替换,具有顺序一致性
(old = 2, success = false)
julia> xchg = 2 => 0; # 如果 a 的字段 x 为 2,则用 0 替换,具有顺序一致性
julia> @atomicreplace a.x xchg
(old = 2, success = true)
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
0
此功能至少需要 Julia 1.7。
Base.@atomiconce
— Macro@atomiconce a.b.x = value
@atomiconce :sequentially_consistent a.b.x = value
@atomiconce :sequentially_consistent :monotonic a.b.x = value
如果之前未设置,则以原子方式执行值的条件赋值,返回值 success::Bool
。其中 success
表示赋值是否完成。
此操作转换为 setpropertyonce!(a.b, :x, value)
调用。
有关更多详细信息,请参见手册中的 Per-field atomics 部分。
示例
julia> mutable struct AtomicOnce
@atomic x
AtomicOnce() = new()
end
julia> a = AtomicOnce()
AtomicOnce(#undef)
julia> @atomiconce a.x = 1 # 如果未设置,则以顺序一致性将 a 的字段 x 设置为 1
true
julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
1
julia> @atomiconce a.x = 1 # 如果未设置,则以顺序一致性将 a 的字段 x 设置为 1
false
此功能至少需要 Julia 1.11。
Core.AtomicMemory
— TypeAtomicMemory{T} == GenericMemory{:atomic, T, Core.CPU}
固定大小的 DenseVector{T}
。对其任何元素的访问都是原子的(使用 :monotonic
排序)。设置任何元素必须使用 @atomic
宏并明确指定排序。
每个元素在访问时都是独立的原子,不能以非原子的方式设置。目前 @atomic
宏和更高级的接口尚未完成,但未来实现的构建块是内部内置函数 Core.memoryrefget
、Core.memoryrefset!
、Core.memoryref_isassigned
、Core.memoryrefswap!
、Core.memoryrefmodify!
和 Core.memoryrefreplace!
。
有关详细信息,请参见 Atomic Operations
此类型需要 Julia 1.11 或更高版本。
还有可选的内存排序参数用于 unsafe
函数集,如果指定该参数,将选择与 C/C++ 兼容的这些原子操作版本,具体包括 unsafe_load
、unsafe_store!
、unsafe_swap!
、unsafe_replace!
和 unsafe_modify!
。
以下API已被弃用,但对它们的支持可能会在多个版本中继续存在。
Base.Threads.Atomic
— TypeThreads.Atomic{T}
持有类型为 T
的对象的引用,确保它仅以原子方式访问,即以线程安全的方式。
只有某些“简单”类型可以原子地使用,即原始布尔、整数和浮点类型。这些类型包括 Bool
、Int8
...Int128
、UInt8
...UInt128
,以及 Float16
...Float64
。
新的原子对象可以从非原子值创建;如果未指定,则原子对象初始化为零。
可以使用 []
符号访问原子对象:
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> x[] = 1
1
julia> x[]
1
原子操作使用 atomic_
前缀,例如 atomic_add!
、atomic_xchg!
等。
Base.Threads.atomic_cas!
— FunctionThreads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T
原子比较并设置 x
原子地将 x
中的值与 cmp
进行比较。如果相等,则将 newval
写入 x
。否则,保持 x
不变。返回 x
中的旧值。通过将返回的值与 cmp
进行比较(通过 ===
),可以知道 x
是否被修改,并且现在是否持有新值 newval
。
有关更多详细信息,请参见 LLVM 的 cmpxchg
指令。
此函数可用于实现事务语义。在事务之前,记录 x
中的值。在事务之后,仅在此期间 x
未被修改的情况下存储新值。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 4, 2);
julia> x
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_cas!(x, 3, 2);
julia> x
Base.Threads.Atomic{Int64}(2)
Base.Threads.atomic_xchg!
— FunctionThreads.atomic_xchg!(x::Atomic{T}, newval::T) where T
原子性地交换 x
中的值
原子性地将 x
中的值与 newval
交换。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw xchg
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_xchg!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_add!
— FunctionThreads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
原子地将 val
加到 x
上
以原子方式执行 x[] += val
。返回 旧 值。未定义于 Atomic{Bool}
。
有关更多详细信息,请参见 LLVM 的 atomicrmw add
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_add!(x, 2)
3
julia> x[]
5
Base.Threads.atomic_sub!
— FunctionThreads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes
原子地从 x
中减去 val
原子地执行 x[] -= val
。返回旧值。未定义于 Atomic{Bool}
。
有关更多详细信息,请参见 LLVM 的 atomicrmw sub
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_sub!(x, 2)
3
julia> x[]
1
Base.Threads.atomic_and!
— FunctionThreads.atomic_and!(x::Atomic{T}, val::T) where T
原子性地对 x
和 val
进行按位与操作
原子性地执行 x[] &= val
。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw and
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_and!(x, 2)
3
julia> x[]
2
Base.Threads.atomic_nand!
— FunctionThreads.atomic_nand!(x::Atomic{T}, val::T) where T
原子性地对 x
和 val
进行按位 NAND(非与)操作
原子性地执行 x[] = ~(x[] & val)
。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw nand
指令。
示例
julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)
julia> Threads.atomic_nand!(x, 2)
3
julia> x[]
-3
Base.Threads.atomic_or!
— FunctionThreads.atomic_or!(x::Atomic{T}, val::T) where T
原子性地对 x
和 val
进行按位或操作
原子性地执行 x[] |= val
。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw or
指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_or!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_xor!
— FunctionThreads.atomic_xor!(x::Atomic{T}, val::T) where T
原子性地对 x
和 val
进行按位异或(exclusive-or)
原子性地执行 x[] $= val
。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw xor
指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_xor!(x, 7)
5
julia> x[]
2
Base.Threads.atomic_max!
— FunctionThreads.atomic_max!(x::Atomic{T}, val::T) where T
原子性地将 x
和 val
的最大值存储在 x
中
原子性地执行 x[] = max(x[], val)
。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw max
指令。
示例
julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)
julia> Threads.atomic_max!(x, 7)
5
julia> x[]
7
Base.Threads.atomic_min!
— FunctionThreads.atomic_min!(x::Atomic{T}, val::T) where T
原子性地将 x
和 val
的最小值存储在 x
中
原子性地执行 x[] = min(x[], val)
。返回旧值。
有关更多详细信息,请参见 LLVM 的 atomicrmw min
指令。
示例
julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)
julia> Threads.atomic_min!(x, 5)
7
julia> x[]
5
Base.Threads.atomic_fence
— FunctionThreads.atomic_fence()
插入一个顺序一致性内存屏障
插入一个具有顺序一致性排序语义的内存屏障。有些算法需要这样做,即获取/释放排序不足的情况。
这可能是一个非常昂贵的操作。考虑到Julia中所有其他原子操作已经具有获取/释放语义,显式屏障在大多数情况下是不必要的。
有关更多详细信息,请参见LLVM的fence
指令。
ccall using a libuv threadpool (Experimental)
Base.@threadcall
— Macro@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)
@threadcall
宏的调用方式与 ccall
相同,但在不同的线程中执行。这在您想要调用一个阻塞的 C 函数而不导致当前 julia
线程被阻塞时非常有用。并发性受到 libuv 线程池大小的限制,默认情况下为 4 个线程,但可以通过设置 UV_THREADPOOL_SIZE
环境变量并重新启动 julia
进程来增加。
请注意,被调用的函数绝不应回调到 Julia。
Low-level synchronization primitives
这些构建块用于创建常规同步对象。
Base.Threads.SpinLock
— TypeSpinLock()
创建一个非重入的测试-测试-设置自旋锁。递归使用将导致死锁。这种锁应该仅在执行时间很短且不阻塞的代码周围使用(例如,执行 I/O)。一般来说,应该使用 ReentrantLock
代替。
每个 lock
必须与一个 unlock
匹配。如果 !islocked(lck::SpinLock)
为真,则 trylock(lck)
成功,除非有其他任务试图“同时”持有锁。
测试-测试-设置自旋锁在大约 30 个竞争线程之前是最快的。如果竞争超过这个数量,则应该考虑不同的同步方法。