Multi-Threading

Base.Threads.@threadsMacro
Threads.@threads [schedule] for ... end

一个宏,用于并行执行 for 循环。迭代空间被分配给粗粒度的任务。此策略可以通过 schedule 参数指定。循环的执行会等待所有迭代的评估。

另请参见: @spawnDistributed 中的 pmap

扩展帮助

语义

除非调度选项指定了更强的保证,否则由 @threads 宏执行的循环具有以下语义。

@threads 宏以未指定的顺序和潜在的并发方式执行循环体。它不指定任务和工作线程的确切分配。每次执行的分配可能不同。循环体代码(包括从中递归调用的任何代码)不得对迭代分配给任务或执行它们的工作线程的分布做出任何假设。每次迭代的循环体必须能够独立于其他迭代向前推进,并且不受数据竞争的影响。因此,跨迭代的无效同步可能导致死锁,而未同步的内存访问可能导致未定义的行为。

例如,上述条件意味着:

  • 在一个迭代中获取的锁 必须 在同一迭代中释放。
  • 使用阻塞原语如 Channel 在迭代之间进行通信是不正确的。
  • 仅写入在迭代之间不共享的位置(除非使用锁或原子操作)。
  • 除非使用 :static 调度,否则 threadid() 的值可能在单个迭代内发生变化。请参见 Task Migration

调度器

如果没有调度器参数,则确切的调度是未指定的,并且在 Julia 版本之间有所不同。目前,当未指定调度器时,使用 :dynamic

Julia 1.5

从 Julia 1.5 开始,schedule 参数可用。

:dynamic(默认)

:dynamic 调度器动态地将迭代分配给可用的工作线程。当前实现假设每个迭代的工作负载是均匀的。然而,这一假设在未来可能会被移除。

此调度选项仅仅是对底层执行机制的提示。然而,可以预期一些属性。:dynamic 调度器使用的 Task 数量受可用工作线程数量的一个小常数倍的限制(Threads.threadpoolsize())。每个任务处理迭代空间的连续区域。因此,@threads :dynamic for x in xs; f(x); end 通常比 @sync for x in xs; @spawn f(x); end 更高效,前提是 length(xs) 显著大于工作线程的数量,并且 f(x) 的运行时间相对小于生成和同步任务的成本(通常小于 10 微秒)。

Julia 1.8

从 Julia 1.8 开始,schedule 参数的 :dynamic 选项可用且为默认值。

:greedy

:greedy 调度器最多生成 Threads.threadpoolsize() 个任务,每个任务贪婪地处理生成的给定迭代值。只要一个任务完成其工作,它就会从迭代器中获取下一个值。任何单个任务完成的工作不一定是来自迭代器的连续值。给定的迭代器可能会无限生成值,只需要迭代器接口(不需要索引)。

如果单个迭代的工作负载不均匀/有较大差异,这个调度选项通常是一个不错的选择。

Julia 1.11

从 Julia 1.11 开始,schedule 参数的 :greedy 选项可用。

:static

:static 调度器为每个线程创建一个任务,并在它们之间均匀分配迭代,将每个任务专门分配给每个线程。特别地,threadid() 的值在一个迭代内是恒定的。如果在另一个 @threads 循环内部或从线程 1 以外的线程使用,则指定 :static 是错误的。

Note

:static 调度的存在是为了支持在 Julia 1.3 之前编写的代码的过渡。在新编写的库函数中,不鼓励使用 :static 调度,因为使用此选项的函数不能从任意工作线程调用。

示例

为了说明不同的调度策略,考虑以下包含非让步定时循环的函数 busywait,该循环运行给定的秒数。

julia> function busywait(seconds)
            tstart = time_ns()
            while (time_ns() - tstart) / 1e9 < seconds
            end
        end

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :static for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)

julia> @time begin
            Threads.@spawn busywait(5)
            Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
                busywait(1)
            end
        end
2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

:dynamic 示例耗时 2 秒,因为一个未占用的线程能够运行两个 1 秒的迭代以完成 for 循环。

source
Base.Threads.foreachFunction
Threads.foreach(f, channel::Channel;
                schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
                ntasks=Threads.threadpoolsize())

类似于 foreach(f, channel),但对 channel 的迭代和对 f 的调用是在 Threads.@spawn 生成的 ntasks 任务之间分配的。此函数将在所有内部生成的任务完成之前等待返回。

如果 schedule isa FairScheduleThreads.foreach 将尝试以一种方式生成任务,使得 Julia 的调度器能够更自由地在线程之间负载平衡工作项。这种方法通常具有更高的每项开销,但在与其他多线程工作负载并发时可能表现得比 StaticSchedule 更好。

如果 schedule isa StaticScheduleThreads.foreach 将以一种产生比 FairSchedule 更低的每项开销的方式生成任务,但不太适合负载平衡。因此,这种方法可能更适合细粒度、均匀的工作负载,但在与其他多线程工作负载并发时可能表现得比 FairSchedule 更差。

示例

julia> n = 20

julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)

julia> d = Channel{Int}(n) do ch
           f = i -> put!(ch, i^2)
           Threads.foreach(f, c)
       end

julia> collect(d)
collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]
Julia 1.6

此函数需要 Julia 1.6 或更高版本。

source
Base.Threads.@spawnMacro
Threads.@spawn [:default|:interactive] expr

创建一个 Task 并将其 schedule 到指定线程池中的任何可用线程(如果未指定,则为 :default)。一旦有线程可用,任务就会被分配到该线程。要等待任务完成,请在此宏的结果上调用 wait,或调用 fetch 以等待并获取其返回值。

可以通过 $ 将值插入到 @spawn 中,这会将值直接复制到构造的底层闭包中。这允许您插入变量的 ,将异步代码与当前任务中变量值的变化隔离开来。

Note

任务运行的线程可能会在任务让出时改变,因此 threadid() 不应被视为任务的常量。请参见 Task Migration,以及更广泛的 multi-threading 手册以获取更多重要注意事项。另请参见关于 threadpools 的章节。

Julia 1.3

此宏自 Julia 1.3 起可用。

Julia 1.4

自 Julia 1.4 起可通过 $ 插入值。

Julia 1.9

自 Julia 1.9 起可以指定线程池。

示例

julia> t() = println("Hello from ", Threads.threadid());

julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
Hello from 1
Hello from 1
Hello from 3
Hello from 4
source
Base.Threads.threadidFunction
Threads.threadid() -> Int

获取当前执行线程的 ID 号。主线程的 ID 为 1

示例

julia> Threads.threadid()
1

julia> Threads.@threads for i in 1:4
          println(Threads.threadid())
       end
4
2
5
4

!!! 注意 任务运行的线程可能会在任务让出时发生变化,这被称为 Task Migration。因此,在大多数情况下,使用 threadid() 来索引,例如,缓冲区或状态对象的向量是不安全的。

source
Base.Threads.maxthreadidFunction
Threads.maxthreadid() -> Int

获取可用于 Julia 进程的线程数量的下限(跨所有线程池),具有原子获取语义。结果将始终大于或等于 threadid() 以及 threadid(task),对于您在调用 maxthreadid 之前能够观察到的任何任务。

source
Base.Threads.nthreadsFunction
Threads.nthreads(:default | :interactive) -> Int

获取指定线程池中当前的线程数量。:interactive 中的线程具有 id 编号 1:nthreads(:interactive),而 :default 中的线程具有 id 编号 nthreads(:interactive) .+ (1:nthreads(:default))

另请参见 [BLAS.get_num_threads] 和 [BLAS.set_num_threads] 在 LinearAlgebra 标准库中,以及 [nprocs()] 在 Distributed 标准库和 Threads.maxthreadid()

source
Base.Threads.threadpoolFunction
Threads.threadpool(tid = threadid()) -> Symbol

返回指定线程的线程池;可以是 :default:interactive:foreign

source
Base.Threads.threadpoolsizeFunction
Threads.threadpoolsize(pool::Symbol = :default) -> Int

获取可用于默认线程池(或指定线程池)的线程数量。

另请参见标准库中的 BLAS.get_num_threadsBLAS.set_num_threads,以及 Distributed 标准库中的 nprocs()

source
Base.Threads.ngcthreadsFunction
Threads.ngcthreads() -> Int

返回当前配置的 GC 线程数量。这包括标记线程和并发清扫线程。

source

另请参见 Multi-Threading

Atomic operations

Base.@atomicMacro
@atomic var
@atomic order ex

varex 标记为以原子方式执行,如果 ex 是一个支持的表达式。如果未指定 order,则默认为 :sequentially_consistent。

@atomic a.b.x = new
@atomic a.b.x += addend
@atomic :release a.b.x = new
@atomic :acquire_release a.b.x += addend

以原子方式执行右侧表达的存储操作并返回新值。

使用 = 时,此操作转换为 setproperty!(a.b, :x, new) 调用。使用任何运算符时,此操作也转换为 modifyproperty!(a.b, :x, +, addend)[2] 调用。

@atomic a.b.x max arg2
@atomic a.b.x + arg2
@atomic max(a.b.x, arg2)
@atomic :acquire_release max(a.b.x, arg2)
@atomic :acquire_release a.b.x + arg2
@atomic :acquire_release a.b.x max arg2

以原子方式执行右侧表达的二元操作。将结果存储到第一个参数中的字段,并返回值 (old, new)

此操作转换为 modifyproperty!(a.b, :x, func, arg2) 调用。

有关更多详细信息,请参见手册中的 Per-field atomics 部分。

示例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
1

julia> @atomic :sequentially_consistent a.x = 2 # 以顺序一致性设置 a 的字段 x
2

julia> @atomic a.x += 1 # 以顺序一致性递增 a 的字段 x
3

julia> @atomic a.x + 1 # 以顺序一致性递增 a 的字段 x
3 => 4

julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
4

julia> @atomic max(a.x, 10) # 以顺序一致性将 a 的字段 x 更改为最大值
4 => 10

julia> @atomic a.x max 5 # 再次以顺序一致性将 a 的字段 x 更改为最大值
10 => 10
Julia 1.7

此功能需要至少 Julia 1.7。

source
Base.@atomicswapMacro
@atomicswap a.b.x = new
@atomicswap :sequentially_consistent a.b.x = new

new 存储到 a.b.x 中,并返回 a.b.x 的旧值。

此操作转换为 swapproperty!(a.b, :x, new) 调用。

有关更多详细信息,请参见手册中的 Per-field atomics 部分。

示例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicswap a.x = 2+2 # 用 4 替换 a 的字段 x,具有顺序一致性
1

julia> @atomic a.x # 获取 a 的字段 x,具有顺序一致性
4
Julia 1.7

此功能至少需要 Julia 1.7。

source
Base.@atomicreplaceMacro
@atomicreplace a.b.x 预期 => 期望
@atomicreplace :sequentially_consistent a.b.x 预期 => 期望
@atomicreplace :sequentially_consistent :monotonic a.b.x 预期 => 期望

执行成对的条件替换,原子性地返回值 (old, success::Bool)。其中 success 表示替换是否完成。

此操作转换为 replaceproperty!(a.b, :x, 预期, 期望) 调用。

有关更多详细信息,请参见手册中的 Per-field atomics 部分。

示例

julia> mutable struct Atomic{T}; @atomic x::T; end

julia> a = Atomic(1)
Atomic{Int64}(1)

julia> @atomicreplace a.x 1 => 2 # 如果 a 的字段 x 为 1,则用 2 替换,具有顺序一致性
(old = 1, success = true)

julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
2

julia> @atomicreplace a.x 1 => 2 # 如果 a 的字段 x 为 1,则用 2 替换,具有顺序一致性
(old = 2, success = false)

julia> xchg = 2 => 0; # 如果 a 的字段 x 为 2,则用 0 替换,具有顺序一致性

julia> @atomicreplace a.x xchg
(old = 2, success = true)

julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
0
Julia 1.7

此功能至少需要 Julia 1.7。

source
Base.@atomiconceMacro
@atomiconce a.b.x = value
@atomiconce :sequentially_consistent a.b.x = value
@atomiconce :sequentially_consistent :monotonic a.b.x = value

如果之前未设置,则以原子方式执行值的条件赋值,返回值 success::Bool。其中 success 表示赋值是否完成。

此操作转换为 setpropertyonce!(a.b, :x, value) 调用。

有关更多详细信息,请参见手册中的 Per-field atomics 部分。

示例

julia> mutable struct AtomicOnce
           @atomic x
           AtomicOnce() = new()
       end

julia> a = AtomicOnce()
AtomicOnce(#undef)

julia> @atomiconce a.x = 1 # 如果未设置,则以顺序一致性将 a 的字段 x 设置为 1
true

julia> @atomic a.x # 以顺序一致性获取 a 的字段 x
1

julia> @atomiconce a.x = 1 # 如果未设置,则以顺序一致性将 a 的字段 x 设置为 1
false
Julia 1.11

此功能至少需要 Julia 1.11。

source
Core.AtomicMemoryType
AtomicMemory{T} == GenericMemory{:atomic, T, Core.CPU}

固定大小的 DenseVector{T}。对其任何元素的访问都是原子的(使用 :monotonic 排序)。设置任何元素必须使用 @atomic 宏并明确指定排序。

Warning

每个元素在访问时都是独立的原子,不能以非原子的方式设置。目前 @atomic 宏和更高级的接口尚未完成,但未来实现的构建块是内部内置函数 Core.memoryrefgetCore.memoryrefset!Core.memoryref_isassignedCore.memoryrefswap!Core.memoryrefmodify!Core.memoryrefreplace!

有关详细信息,请参见 Atomic Operations

Julia 1.11

此类型需要 Julia 1.11 或更高版本。

source

还有可选的内存排序参数用于 unsafe 函数集,如果指定该参数,将选择与 C/C++ 兼容的这些原子操作版本,具体包括 unsafe_loadunsafe_store!unsafe_swap!unsafe_replace!unsafe_modify!

Warning

以下API已被弃用,但对它们的支持可能会在多个版本中继续存在。

Base.Threads.AtomicType
Threads.Atomic{T}

持有类型为 T 的对象的引用,确保它仅以原子方式访问,即以线程安全的方式。

只有某些“简单”类型可以原子地使用,即原始布尔、整数和浮点类型。这些类型包括 BoolInt8...Int128UInt8...UInt128,以及 Float16...Float64

新的原子对象可以从非原子值创建;如果未指定,则原子对象初始化为零。

可以使用 [] 符号访问原子对象:

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> x[] = 1
1

julia> x[]
1

原子操作使用 atomic_ 前缀,例如 atomic_add!atomic_xchg! 等。

source
Base.Threads.atomic_cas!Function
Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

原子比较并设置 x

原子地将 x 中的值与 cmp 进行比较。如果相等,则将 newval 写入 x。否则,保持 x 不变。返回 x 中的旧值。通过将返回的值与 cmp 进行比较(通过 ===),可以知道 x 是否被修改,并且现在是否持有新值 newval

有关更多详细信息,请参见 LLVM 的 cmpxchg 指令。

此函数可用于实现事务语义。在事务之前,记录 x 中的值。在事务之后,仅在此期间 x 未被修改的情况下存储新值。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 4, 2);

julia> x
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_cas!(x, 3, 2);

julia> x
Base.Threads.Atomic{Int64}(2)
source
Base.Threads.atomic_xchg!Function
Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

原子性地交换 x 中的值

原子性地将 x 中的值与 newval 交换。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw xchg 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_xchg!(x, 2)
3

julia> x[]
2
source
Base.Threads.atomic_add!Function
Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

原子地将 val 加到 x

以原子方式执行 x[] += val。返回 值。未定义于 Atomic{Bool}

有关更多详细信息,请参见 LLVM 的 atomicrmw add 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_add!(x, 2)
3

julia> x[]
5
source
Base.Threads.atomic_sub!Function
Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

原子地从 x 中减去 val

原子地执行 x[] -= val。返回值。未定义于 Atomic{Bool}

有关更多详细信息,请参见 LLVM 的 atomicrmw sub 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_sub!(x, 2)
3

julia> x[]
1
source
Base.Threads.atomic_and!Function
Threads.atomic_and!(x::Atomic{T}, val::T) where T

原子性地对 xval 进行按位与操作

原子性地执行 x[] &= val。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw and 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_and!(x, 2)
3

julia> x[]
2
source
Base.Threads.atomic_nand!Function
Threads.atomic_nand!(x::Atomic{T}, val::T) where T

原子性地对 xval 进行按位 NAND(非与)操作

原子性地执行 x[] = ~(x[] & val)。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw nand 指令。

示例

julia> x = Threads.Atomic{Int}(3)
Base.Threads.Atomic{Int64}(3)

julia> Threads.atomic_nand!(x, 2)
3

julia> x[]
-3
source
Base.Threads.atomic_or!Function
Threads.atomic_or!(x::Atomic{T}, val::T) where T

原子性地对 xval 进行按位或操作

原子性地执行 x[] |= val。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw or 指令。

示例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_or!(x, 7)
5

julia> x[]
7
source
Base.Threads.atomic_xor!Function
Threads.atomic_xor!(x::Atomic{T}, val::T) where T

原子性地对 xval 进行按位异或(exclusive-or)

原子性地执行 x[] $= val。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw xor 指令。

示例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_xor!(x, 7)
5

julia> x[]
2
source
Base.Threads.atomic_max!Function
Threads.atomic_max!(x::Atomic{T}, val::T) where T

原子性地将 xval 的最大值存储在 x

原子性地执行 x[] = max(x[], val)。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw max 指令。

示例

julia> x = Threads.Atomic{Int}(5)
Base.Threads.Atomic{Int64}(5)

julia> Threads.atomic_max!(x, 7)
5

julia> x[]
7
source
Base.Threads.atomic_min!Function
Threads.atomic_min!(x::Atomic{T}, val::T) where T

原子性地将 xval 的最小值存储在 x

原子性地执行 x[] = min(x[], val)。返回值。

有关更多详细信息,请参见 LLVM 的 atomicrmw min 指令。

示例

julia> x = Threads.Atomic{Int}(7)
Base.Threads.Atomic{Int64}(7)

julia> Threads.atomic_min!(x, 5)
7

julia> x[]
5
source
Base.Threads.atomic_fenceFunction
Threads.atomic_fence()

插入一个顺序一致性内存屏障

插入一个具有顺序一致性排序语义的内存屏障。有些算法需要这样做,即获取/释放排序不足的情况。

这可能是一个非常昂贵的操作。考虑到Julia中所有其他原子操作已经具有获取/释放语义,显式屏障在大多数情况下是不必要的。

有关更多详细信息,请参见LLVM的fence指令。

source

ccall using a libuv threadpool (Experimental)

Base.@threadcallMacro
@threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

@threadcall 宏的调用方式与 ccall 相同,但在不同的线程中执行。这在您想要调用一个阻塞的 C 函数而不导致当前 julia 线程被阻塞时非常有用。并发性受到 libuv 线程池大小的限制,默认情况下为 4 个线程,但可以通过设置 UV_THREADPOOL_SIZE 环境变量并重新启动 julia 进程来增加。

请注意,被调用的函数绝不应回调到 Julia。

source

Low-level synchronization primitives

这些构建块用于创建常规同步对象。

Base.Threads.SpinLockType
SpinLock()

创建一个非重入的测试-测试-设置自旋锁。递归使用将导致死锁。这种锁应该仅在执行时间很短且不阻塞的代码周围使用(例如,执行 I/O)。一般来说,应该使用 ReentrantLock 代替。

每个 lock 必须与一个 unlock 匹配。如果 !islocked(lck::SpinLock) 为真,则 trylock(lck) 成功,除非有其他任务试图“同时”持有锁。

测试-测试-设置自旋锁在大约 30 个竞争线程之前是最快的。如果竞争超过这个数量,则应该考虑不同的同步方法。

source