Multi-Threading

Посетите этот blog post для презентации возможностей многопоточности Julia.

Starting Julia with multiple threads

По умолчанию, Julia запускается с одним потоком выполнения. Это можно проверить, используя команду Threads.nthreads():

julia> Threads.nthreads()
1

Количество потоков выполнения контролируется либо с помощью аргумента командной строки -t/--threads, либо с помощью переменной окружения JULIA_NUM_THREADS. Когда указаны оба, то приоритет имеет -t/--threads.

Количество потоков можно указать либо в виде целого числа (--threads=4), либо как auto (--threads=auto), где auto пытается определить полезное значение по умолчанию для количества потоков (см. Command-line Options для получения дополнительной информации).

Julia 1.5

Аргумент командной строки -t/--threads требует как минимум Julia 1.5. В более старых версиях вы должны использовать переменную окружения вместо этого.

Julia 1.7

Использование auto в качестве значения переменной окружения JULIA_NUM_THREADS требует как минимум Julia 1.7. В более старых версиях это значение игнорируется.

Давайте начнем Julia с 4 потоками:

$ julia --threads 4

Давайте проверим, что у нас в распоряжении 4 потока.

julia> Threads.nthreads()
4

Но мы в настоящее время на главной ветке. Чтобы проверить, мы используем функцию Threads.threadid

julia> Threads.threadid()
1
Note

Если вы предпочитаете использовать переменную окружения, вы можете установить её следующим образом в Bash (Linux/macOS):

export JULIA_NUM_THREADS=4

C оболочка на Linux/macOS, CMD на Windows:

set JULIA_NUM_THREADS=4

Powershell на Windows:

$env:JULIA_NUM_THREADS=4

Обратите внимание, что это должно быть сделано до начала работы с Julia.

Note

Количество потоков, указанных с помощью -t/--threads, передается рабочим процессам, которые создаются с использованием параметров командной строки -p/--procs или --machine-file. Например, julia -p2 -t2 создает 1 основной процесс с 2 рабочими процессами, и все три процесса имеют 2 включенных потока. Для более тонкого управления рабочими потоками используйте addprocs и передайте -t/--threads как exeflags.

Multiple GC Threads

Сборщик мусора (GC) может использовать несколько потоков. Количество используемых потоков составляет либо половину от числа потоков вычислительных работников, либо настраивается с помощью аргумента командной строки --gcthreads или с использованием переменной окружения JULIA_NUM_GC_THREADS.

Julia 1.10

Аргумент командной строки --gcthreads требует как минимум Julia 1.10.

Threadpools

Когда потоки программы заняты выполнением множества задач, задачи могут испытывать задержки, что может негативно сказаться на отзывчивости и интерактивности программы. Чтобы решить эту проблему, вы можете указать, что задача является интерактивной, когда вы Threads.@spawn это:

using Base.Threads
@spawn :interactive f()

Интерактивные задачи должны избегать выполнения операций с высокой задержкой, и если они являются длительными задачами, должны часто уступать.

Julia может быть запущена с одним или несколькими потоками, зарезервированными для выполнения интерактивных задач:

$ julia --threads 3,1

Переменная окружения JULIA_NUM_THREADS также может быть использована аналогично:

export JULIA_NUM_THREADS=3,1

Это запускает Julia с 3 потоками в пуле потоков :default и 1 потоком в пуле потоков :interactive:

julia> using Base.Threads

julia> nthreadpools()
2

julia> threadpool() # the main thread is in the interactive thread pool
:interactive

julia> nthreads(:default)
3

julia> nthreads(:interactive)
1

julia> nthreads()
3
Note

Версия nthreads без аргументов возвращает количество потоков в пуле по умолчанию.

Note

В зависимости от того, была ли Julia запущена с интерактивными потоками, основной поток находится либо в пуле потоков по умолчанию, либо в интерактивном пуле потоков.

Любое из чисел или оба могут быть заменены на слово auto, что заставляет Julia выбрать разумное значение по умолчанию.

The @threads Macro

Давайте рассмотрим простой пример, используя наши родные потоки. Создадим массив нулей:

julia> a = zeros(10)
10-element Vector{Float64}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

Давайте одновременно обработаем этот массив, используя 4 потока. Каждый поток запишет свой идентификатор потока в каждую ячейку.

Julia поддерживает параллельные циклы с помощью макроса Threads.@threads. Этот макрос размещается перед циклом for, чтобы указать Julia, что цикл является многопоточной областью:

julia> Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

Итерационное пространство разделяется между потоками, после чего каждый поток записывает свой идентификатор потока в назначенные ему места:

julia> a
10-element Vector{Float64}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

Обратите внимание, что Threads.@threads не имеет необязательного параметра сокращения, как @distributed.

Using @threads without data-races

Концепция гонки данных подробно рассматривается в "Communication and data races between threads". На данный момент просто знайте, что гонка данных может привести к неправильным результатам и опасным ошибкам.

Давайте скажем, что мы хотим сделать функцию sum_single ниже многопоточной.

julia> function sum_single(a)
           s = 0
           for i in a
               s += i
           end
           s
       end
sum_single (generic function with 1 method)

julia> sum_single(1:1_000_000)
500000500000

Простое добавление @threads приводит к гонке данных, когда несколько потоков одновременно читают и записывают s.

julia> function sum_multi_bad(a)
           s = 0
           Threads.@threads for i in a
               s += i
           end
           s
       end
sum_multi_bad (generic function with 1 method)

julia> sum_multi_bad(1:1_000_000)
70140554652

Обратите внимание, что результат не равен 500000500000, как и должно быть, и, скорее всего, будет изменяться при каждой оценке.

Чтобы исправить это, можно использовать буферы, специфичные для задачи, чтобы сегментировать сумму на части, свободные от гонок. Здесь sum_single повторно используется с собственным внутренним буфером s. Входной вектор a разбивается на nthreads() частей для параллельной работы. Затем мы используем Threads.@spawn, чтобы создать задачи, которые индивидуально суммируют каждую часть. Наконец, мы снова суммируем результаты из каждой задачи, используя sum_single:

julia> function sum_multi_good(a)
           chunks = Iterators.partition(a, length(a) ÷ Threads.nthreads())
           tasks = map(chunks) do chunk
               Threads.@spawn sum_single(chunk)
           end
           chunk_sums = fetch.(tasks)
           return sum_single(chunk_sums)
       end
sum_multi_good (generic function with 1 method)

julia> sum_multi_good(1:1_000_000)
500000500000
Note

Буферы не должны управляться на основе threadid(), т.е. buffers = zeros(Threads.nthreads()), потому что параллельные задачи могут приостанавливать выполнение, что означает, что несколько параллельных задач могут использовать один и тот же буфер на данном потоке, что создает риск гонок данных. Более того, когда доступно более одного потока, задачи могут менять поток в точках приостановки, что известно как task migration.

Другим вариантом является использование атомарных операций над переменными, разделяемыми между задачами/потоками, что может быть более производительным в зависимости от характеристик операций.

Communication and data-races between threads

Хотя потоки Julia могут общаться через общую память, писать корректный код с многопоточностью, свободный от гонок данных, крайне сложно. Channel в Julia являются потокобезопасными и могут использоваться для безопасного общения. Также ниже есть разделы, которые объясняют, как использовать locks и atomics, чтобы избежать гонок данных.

Data-race freedom

Вы полностью ответственны за то, чтобы ваша программа была свободна от гонок данных, и ничего, что здесь обещано, не может быть предположено, если вы не соблюдаете это требование. Наблюдаемые результаты могут быть крайне неинтуитивными.

Если возникают гонки данных, Julia не является безопасной с точки зрения памяти. Будьте очень осторожны при чтении любых данных, если другой поток может их записывать, так как это может привести к ошибкам сегментации или еще хуже. Ниже приведены несколько небезопасных способов доступа к глобальным переменным из разных потоков:

Thread 1:
global b = false
global a = rand()
global b = true

Thread 2:
while !b; end
bad_read1(a) # it is NOT safe to access `a` here!

Thread 3:
while !@isdefined(a); end
bad_read2(a) # it is NOT safe to access `a` here

Using locks to avoid data-races

Важным инструментом для избежания гонок данных и, таким образом, написания потокобезопасного кода является концепция "замка". Замок может быть заблокирован и разблокирован. Если поток заблокировал замок и не разблокировал его, говорят, что он "удерживает" замок. Если есть только один замок, и мы пишем код, который требует удержания замка для доступа к некоторым данным, мы можем гарантировать, что несколько потоков никогда не будут одновременно получать доступ к одним и тем же данным. Обратите внимание, что связь между замком и переменной устанавливается программистом, а не программой.

Например, мы можем создать замок my_lock и заблокировать его, пока изменяем переменную my_variable. Это делается проще всего с помощью макроса @lock:

julia> my_lock = ReentrantLock();

julia> my_variable = [1, 2, 3];

julia> @lock my_lock my_variable[1] = 100
100

Используя аналогичный шаблон с тем же замком и переменной, но в другом потоке, операции свободны от гонок данных.

Мы могли бы выполнить операцию выше с функциональной версией lock следующими двумя способами:

julia> lock(my_lock) do
           my_variable[1] = 100
       end
100

julia> begin
           lock(my_lock)
           try
               my_variable[1] = 100
           finally
               unlock(my_lock)
           end
       end
100

Все три варианта эквивалентны. Обратите внимание, что финальная версия требует явного блока try, чтобы гарантировать, что блокировка всегда будет снята, в то время как первые два варианта делают это внутренне. Всегда следует использовать приведенный выше шаблон блокировки при изменении данных (например, при присвоении глобальной или замыкательной переменной), к которым обращаются другие потоки. Невыполнение этого может иметь непредвиденные и серьезные последствия.

Atomic Operations

Julia supports accessing and modifying values atomically, that is, in a thread-safe way to avoid race conditions. A value (which must be of a primitive type) can be wrapped as Threads.Atomic to indicate it must be accessed in this way. Here we can see an example:

julia> i = Threads.Atomic{Int}(0);

julia> ids = zeros(4);

julia> old_is = zeros(4);

julia> Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end

julia> old_is
4-element Vector{Float64}:
 0.0
 1.0
 7.0
 3.0

julia> i[]
 10

julia> ids
4-element Vector{Float64}:
 1.0
 2.0
 3.0
 4.0

Если бы мы попытались выполнить сложение без атомарного тега, мы могли бы получить неправильный ответ из-за состояния гонки. Пример того, что произошло бы, если бы мы не избежали гонки:

julia> using Base.Threads

julia> Threads.nthreads()
4

julia> acc = Ref(0)
Base.RefValue{Int64}(0)

julia> @threads for i in 1:1000
          acc[] += 1
       end

julia> acc[]
926

julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)

julia> @threads for i in 1:1000
          atomic_add!(acc, 1)
       end

julia> acc[]
1000

Per-field atomics

Мы также можем использовать атомарные операции на более детальном уровне, используя макросы @atomic, @atomicswap, @atomicreplace и @atomiconce.

Конкретные детали модели памяти и другие детали дизайна записаны в Julia Atomics Manifesto, которые позже будут официально опубликованы.

Любое поле в объявлении структуры может быть украшено @atomic, и тогда любое запись также должна быть помечена @atomic и должна использовать один из определенных атомарных порядков (:monotonic, :acquire, :release, :acquire_release или :sequentially_consistent). Любое чтение атомарного поля также может быть аннотировано ограничением атомарного порядка или будет выполнено с монотонным (ослабленным) порядком, если не указано.

Julia 1.7

Переписка по полям требует как минимум Julia 1.7.

Side effects and mutable function arguments

При использовании многопоточности необходимо быть осторожным при использовании функций, которые не являются pure, так как мы можем получить неправильный ответ. Например, функции, которые имеют name ending with !, по соглашению изменяют свои аргументы и, следовательно, не являются чистыми.

@threadcall

Внешние библиотеки, такие как те, что вызываются через ccall, представляют собой проблему для механизма ввода-вывода на основе задач в Julia. Если библиотека C выполняет блокирующую операцию, это предотвращает выполнение планировщиком Julia любых других задач до тех пор, пока вызов не вернется. (Исключения составляют вызовы в пользовательский код C, который вызывает обратно в Julia, что может затем приостановить выполнение, или код C, который вызывает jl_yield(), эквивалент C для yield.)

Макрос @threadcall предоставляет способ избежать задержки выполнения в такой ситуации. Он планирует выполнение функции C в отдельном потоке. Для этого используется пул потоков с размером по умолчанию 4. Размер пула потоков контролируется с помощью переменной окружения UV_THREADPOOL_SIZE. В то время как ожидается свободный поток, и во время выполнения функции, как только поток становится доступным, запрашиваемая задача (в основном цикле событий Julia) передает управление другим задачам. Обратите внимание, что @threadcall не возвращает управление, пока выполнение не завершится. С точки зрения пользователя это, следовательно, блокирующий вызов, как и другие API Julia.

Очень важно, чтобы вызываемая функция не вызывала обратно в Julia, так как это приведет к ошибке сегментации.

@threadcall может быть удален/изменен в будущих версиях Julia.

Caveats

В настоящее время большинство операций в среде выполнения Julia и стандартных библиотеках могут использоваться безопасно для потоков, если пользовательский код свободен от гонок данных. Однако в некоторых областях работа по стабилизации поддержки потоков продолжается. Многопоточное программирование имеет много врожденных трудностей, и если программа, использующая потоки, проявляет необычное или нежелательное поведение (например, сбои или загадочные результаты), взаимодействия потоков обычно следует подозревать в первую очередь.

Существует несколько конкретных ограничений и предупреждений, о которых следует помнить при использовании потоков в Julia:

  • Типы базовых коллекций требуют ручной блокировки, если они используются одновременно несколькими потоками, где хотя бы один поток изменяет коллекцию (распространенные примеры включают push! для массивов или вставку элементов в Dict).
  • Расписание, используемое @spawn, является недетерминированным и на него не следует полагаться.
  • Задачи, связанные с вычислениями и не выделяющие память, могут предотвратить выполнение сборки мусора в других потоках, которые выделяют память. В этих случаях может потребоваться вставить ручной вызов GC.safepoint(), чтобы разрешить выполнение сборки мусора. Это ограничение будет снято в будущем.
  • Избегайте выполнения операций верхнего уровня, например, include или eval определений типа, метода и модуля параллельно.
  • Имейте в виду, что финализаторы, зарегистрированные библиотекой, могут не работать, если потоки включены. Это может потребовать некоторой переходной работы по всей экосистеме, прежде чем многопоточность сможет быть широко принята с уверенностью. См. раздел the safe use of finalizers для получения дополнительной информации.

Task Migration

После того как задача начинает выполняться на определенном потоке, она может перейти на другой поток, если задача уступает.

Такие задачи могли быть начаты с @spawn или @threads, хотя опция расписания :static для @threads действительно замораживает threadid.

Это означает, что в большинстве случаев threadid() не следует рассматривать как константу в рамках задачи, и, следовательно, не следует использовать для индексации в векторе буферов или объектов с состоянием.

Julia 1.7

Миграция задач была введена в Julia 1.7. Ранее эти задачи всегда оставались на том же потоке, на котором они были запущены.

Safe use of Finalizers

Поскольку финализаторы могут прерывать любой код, они должны быть очень осторожны в том, как они взаимодействуют с любым глобальным состоянием. К сожалению, главная причина, по которой используются финализаторы, заключается в обновлении глобального состояния (чистая функция обычно довольно бессмысленна в качестве финализатора). Это приводит нас к небольшой головоломке. Существует несколько подходов к решению этой проблемы:

  1. Когда код выполняется в однопоточном режиме, он может вызвать внутреннюю C-функцию jl_gc_enable_finalizers, чтобы предотвратить планирование финализаторов внутри критической области. Внутренне это используется в некоторых функциях (таких как наши C-блокировки), чтобы предотвратить рекурсию при выполнении определенных операций (инкрементальная загрузка пакетов, генерация кода и т. д.). Комбинация блокировки и этого флага может быть использована для обеспечения безопасности финализаторов.

  2. Вторая стратегия, используемая Base в нескольких местах, заключается в явной задержке финализатора до тех пор, пока он не сможет получить свой замок нерекурсивно. Следующий пример демонстрирует, как эта стратегия может быть применена к Distributed.finalize_ref:

    julia function finalize_ref(r::AbstractRemoteRef) if r.where > 0 # Check if the finalizer is already run if islocked(client_refs) || !trylock(client_refs) # delay finalizer for later if we aren't free to acquire the lock finalizer(finalize_ref, r) return nothing end try # `lock` should always be followed by `try` if r.where > 0 # Must check again here # Do actual cleanup here r.where = 0 end finally unlock(client_refs) end end nothing end

  3. Связанная третья стратегия заключается в использовании очереди без ожидания. В данный момент у нас нет реализованной очереди без блокировок в Base, но Base.IntrusiveLinkedListSynchronized{T} подходит. Это может часто быть хорошей стратегией для кода с циклами событий. Например, эта стратегия используется в Gtk.jl для управления подсчетом ссылок на время жизни. В этом подходе мы не выполняем никаких явных действий внутри finalizer, а вместо этого добавляем его в очередь для выполнения в более безопасное время. На самом деле, планировщик задач Julia уже использует это, поэтому определение финализатора как x -> @spawn do_cleanup(x) является одним из примеров этого подхода. Однако стоит отметить, что это не контролирует, на каком потоке выполняется do_cleanup, поэтому do_cleanup все равно нужно будет захватить блокировку. Это не обязательно, если вы реализуете свою собственную очередь, так как вы можете явно опустошать эту очередь только из вашего потока.