Multi-processing and Distributed Computing
Реализация параллельных вычислений с распределенной памятью предоставляется модулем Distributed
как частью стандартной библиотеки, поставляемой с Julia.
Большинство современных компьютеров имеют более одного ЦП, и несколько компьютеров могут быть объединены в кластер. Использование мощности этих нескольких ЦП позволяет выполнять многие вычисления быстрее. Существует два основных фактора, влияющих на производительность: скорость самих ЦП и скорость их доступа к памяти. В кластере довольно очевидно, что данный ЦП будет иметь самый быстрый доступ к ОЗУ внутри одного и того же компьютера (узла). Возможно, более удивительно, что аналогичные проблемы актуальны на типичном многопоточном ноутбуке из-за различий в скорости основной памяти и cache. Следовательно, хорошая среда для многопроцессорной обработки должна позволять контролировать "право собственности" на участок памяти конкретным ЦП. Julia предоставляет среду для многопроцессорной обработки на основе передачи сообщений, чтобы программы могли выполняться на нескольких процессах в отдельных доменах памяти одновременно.
Реализация передачи сообщений в Julia отличается от других сред, таких как MPI[1]. Связь в Julia, как правило, "односторонняя", что означает, что программисту нужно явно управлять только одним процессом в операции с двумя процессами. Более того, эти операции обычно не выглядят как "отправка сообщения" и "получение сообщения", а скорее напоминают операции более высокого уровня, такие как вызовы пользовательских функций.
Распределенное программирование в Julia основано на двух примитивах: удаленные ссылки и удаленные вызовы. Удаленная ссылка — это объект, который может использоваться из любого процесса для ссылки на объект, хранящийся на определенном процессе. Удаленный вызов — это запрос одного процесса на вызов определенной функции с определенными аргументами на другом (возможно, том же) процессе.
Удаленные ссылки бывают двух видов: Future
и RemoteChannel
.
Удаленный вызов возвращает Future
в качестве результата. Удаленные вызовы возвращаются немедленно; процесс, который сделал вызов, продолжает свою следующую операцию, пока удаленный вызов происходит где-то еще. Вы можете дождаться завершения удаленного вызова, вызвав wait
на возвращенном 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
, и вы можете получить полное значение результата, используя fetch
.
С другой стороны, RemoteChannel
можно перезаписывать. Например, несколько процессов могут координировать свою обработку, ссылаясь на один и тот же удаленный Channel
.
Каждый процесс имеет связанный идентификатор. Процесс, предоставляющий интерактивный запрос Julia, всегда имеет id
, равный 1. Процессы, используемые по умолчанию для параллельных операций, называются "рабочими". Когда существует только один процесс, процесс 1 считается рабочим. В противном случае рабочими считаются все процессы, кроме процесса 1. В результате добавление 2 или более процессов необходимо для получения преимуществ от методов параллельной обработки, таких как pmap
. Добавление одного процесса полезно, если вы просто хотите заниматься другими делами в основном процессе, пока длительное вычисление выполняется на рабочем.
Давайте попробуем это. Начало с julia -p n
предоставляет n
рабочих процессов на локальной машине. Обычно имеет смысл, чтобы n
равнялось количеству потоков ЦП (логических ядер) на машине. Обратите внимание, что аргумент -p
неявно загружает модуль Distributed
.
$ julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
Первый аргумент к remotecall
— это функция, которую нужно вызвать. Большинство параллельных программ в Julia не ссылаются на конкретные процессы или количество доступных процессов, но 4d61726b646f776e2e436f64652822222c202272656d6f746563616c6c2229_40726566
считается интерфейсом низкого уровня, предоставляющим более тонкий контроль. Второй аргумент к 4d61726b646f776e2e436f64652822222c202272656d6f746563616c6c2229_40726566
— это id
процесса, который будет выполнять работу, а оставшиеся аргументы будут переданы вызываемой функции.
Как вы можете видеть, в первой строке мы попросили процесс 2 создать случайную матрицу 2 на 2, а во второй строке мы попросили его добавить 1 к ней. Результат обоих вычислений доступен в двух futures, r
и s
. Макрос @spawnat
вычисляет выражение во втором аргументе на процессе, указанном в первом аргументе.
Иногда вам может понадобиться немедленно получить значение, вычисленное удаленно. Это обычно происходит, когда вы читаете из удаленного объекта, чтобы получить данные, необходимые для следующей локальной операции. Функция remotecall_fetch
существует для этой цели. Она эквивалентна fetch(remotecall(...))
, но более эффективна.
julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085
Это извлекает массив на рабочем 2 и возвращает первое значение. Обратите внимание, что fetch
в этом случае не перемещает никаких данных, так как он выполняется на рабочем, который владеет массивом. Также можно написать:
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866
Помните, что getindex(r,1,1)
это equivalent для r[1,1]
, так что этот вызов извлекает первый элемент будущего r
.
Чтобы упростить задачу, символ :any
можно передать в @spawnat
, который выбирает, где выполнять операцию за вас:
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
Обратите внимание, что мы использовали 1 .+ fetch(r)
вместо 1 .+ r
. Это связано с тем, что мы не знаем, где будет выполняться код, поэтому в общем случае может потребоваться fetch
, чтобы переместить r
в процесс, выполняющий сложение. В этом случае @spawnat
достаточно умён, чтобы выполнить вычисление в процессе, которому принадлежит r
, поэтому 4d61726b646f776e2e436f64652822222c202266657463682229_40726566
будет бездействием (никакая работа не выполняется).
(Стоит отметить, что @spawnat
не является встроенным, а определено в Julia как macro. Возможно определить свои собственные такие конструкции.)
Важно помнить, что, после получения, Future
будет кэшировать свое значение локально. Дальнейшие вызовы fetch
не требуют сетевого перехода. Как только все ссылающиеся на 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
получат данные, удаляется удаленное хранимое значение.
@async
похож на @spawnat
, но выполняет задачи только в локальном процессе. Мы используем его для создания задачи "подачи" для каждого процесса. Каждая задача выбирает следующий индекс, который нужно вычислить, затем ждет, пока ее процесс завершится, и повторяет это, пока не исчерпает индексы. Обратите внимание, что задачи подачи не начинают выполняться, пока основная задача не достигнет конца блока @sync
, в этот момент она передает управление и ждет, пока все локальные задачи завершатся, прежде чем вернуться из функции. Что касается версии 0.7 и выше, задачи подачи могут делиться состоянием через nextidx
, потому что они все выполняются в одном и том же процессе. Даже если Tasks
планируются кооперативно, блокировка может все еще потребоваться в некоторых контекстах, как в asynchronous I/O. Это означает, что переключения контекста происходят только в четко определенных точках: в данном случае, когда вызывается remotecall_fetch
. Это текущее состояние реализации, и оно может измениться в будущих версиях Julia, так как оно предназначено для того, чтобы сделать возможным выполнение до N Tasks
на M Process
, также известном как M:N Threading. Затем потребуется модель захвата/освобождения блокировки для nextidx
, так как небезопасно позволять нескольким процессам одновременно читать и записывать ресурс.
Code Availability and Loading Packages
Ваш код должен быть доступен в любом процессе, который его выполняет. Например, введите следующее в командной строке Julia:
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))))
Stacktrace:
[...]
Процесс 1 знал о функции rand2
, но процесс 2 не знал.
Чаще всего вы будете загружать код из файлов или пакетов, и у вас есть значительная гибкость в контроле над тем, какие процессы загружают код. Рассмотрим файл DummyModule.jl
, содержащий следующий код:
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
Чтобы ссылаться на MyType
во всех процессах, DummyModule.jl
необходимо загрузить на каждом процессе. Вызов include("DummyModule.jl")
загружает его только на одном процессе. Чтобы загрузить его на каждом процессе, используйте макрос @everywhere
(запустив Julia с julia -p 2
):
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
Как обычно, это не приводит DummyModule
в область видимости ни одного из процессов, который требует using
или import
. Более того, когда DummyModule
вводится в область видимости в одном процессе, он не находится в области видимости ни в каком другом:
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined in `Main`
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
Тем не менее, все еще возможно, например, отправить MyType
в процесс, который загрузил DummyModule
, даже если он не в области видимости:
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
Файл также может быть предварительно загружен в несколько процессов при запуске с помощью флага -L
, и может быть использован скрипт драйвера для управления вычислениями:
julia -p <n> -L file1.jl -L file2.jl driver.jl
Процесс Julia, выполняющий скрипт драйвера в приведенном выше примере, имеет id
, равный 1, так же как и процесс, предоставляющий интерактивный интерфейс.
Наконец, если DummyModule.jl
не является отдельным файлом, а пакетом, то using DummyModule
загрузит DummyModule.jl
на всех процессах, но только введет его в область видимости на процессе, где был вызван using
.
Starting and managing worker processes
Базовая установка Julia имеет встроенную поддержку для двух типов кластеров:
- Локальный кластер, указанный с помощью опции
-p
, как показано выше. - Кластер, охватывающий машины, с использованием опции
--machine-file
. Это использует безпарольный входssh
для запуска процессов рабочих Julia (из того же пути, что и текущий хост) на указанных машинах. Каждое определение машины имеет форму[count*][user@]host[:port] [bind_addr[:port]]
.user
по умолчанию равен текущему пользователю,port
— стандартному порту ssh.count
— это количество рабочих, которые нужно запустить на узле, и по умолчанию равно 1. Необязательныйbind-to bind_addr[:port]
указывает IP-адрес и порт, которые другие рабочие должны использовать для подключения к этому рабочему.
Хотя Julia в целом стремится к обратной совместимости, распределение кода на рабочие процессы зависит от Serialization.serialize
. Как указано в соответствующей документации, это не может быть гарантировано для работы на разных версиях Julia, поэтому рекомендуется, чтобы все рабочие процессы на всех машинах использовали одну и ту же версию.
Функции addprocs
, rmprocs
, workers
и другие доступны как программный способ добавления, удаления и запроса процессов в кластере.
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
Модуль Distributed
должен быть явно загружен в главном процессе перед вызовом addprocs
. Он автоматически становится доступным в рабочих процессах.
Обратите внимание, что рабочие процессы не выполняют скрипт инициализации ~/.julia/config/startup.jl
, и они не синхронизируют свое глобальное состояние (например, параметры командной строки, глобальные переменные, определения новых методов и загруженные модули) с другими запущенными процессами. Вы можете использовать addprocs(exeflags="--project")
, чтобы инициализировать рабочий процесс с определенной средой, а затем @everywhere using <modulename>
или @everywhere include("file.jl")
.
Другие типы кластеров могут поддерживаться путем написания собственного пользовательского ClusterManager
, как описано ниже в разделе ClusterManagers.
Data Movement
Отправка сообщений и перемещение данных составляют большую часть накладных расходов в распределенной программе. Снижение количества сообщений и объема передаваемых данных имеет решающее значение для достижения производительности и масштабируемости. С этой целью важно понять перемещение данных, выполняемое различными конструкциями распределенного программирования Julia.
fetch
может рассматриваться как явная операция перемещения данных, поскольку она напрямую запрашивает перемещение объекта на локальную машину. @spawnat
(и несколько связанных конструкций) также перемещает данные, но это не так очевидно, поэтому это можно назвать неявной операцией перемещения данных. Рассмотрим эти два подхода к построению и возведению в квадрат случайной матрицы:
Метод 1:
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
Метод 2:
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
Разница кажется тривиальной, но на самом деле она довольно значительна из-за поведения @spawnat
. В первом методе случайная матрица создается локально, а затем отправляется в другой процесс, где она возводится в квадрат. Во втором методе случайная матрица как создается, так и возводится в квадрат в другом процессе. Поэтому второй метод отправляет гораздо меньше данных, чем первый.
В этом игрушечном примере два метода легко различить и выбрать. Однако в реальной программе проектирование перемещения данных может потребовать больше размышлений и, вероятно, некоторых измерений. Например, если первому процессу нужна матрица A
, то первый метод может быть лучше. Или, если вычисление A
дорогостоящее и только текущий процесс его имеет, то перемещение его в другой процесс может быть неизбежным. Или, если текущему процессу почти нечего делать между @spawnat
и fetch(Bref)
, может быть лучше полностью устранить параллелизм. Или представьте, что rand(1000,1000)
заменяется на более дорогостоящую операцию. Тогда может иметь смысл добавить еще одно выражение 4d61726b646f776e2e436f64652822222c202240737061776e61742229_40726566
только для этого шага.
Global variables
Выражения, выполняемые удаленно через @spawnat
, или замыкания, указанные для удаленного выполнения с использованием remotecall
, могут ссылаться на глобальные переменные. Глобальные привязки в модуле Main
обрабатываются немного иначе по сравнению с глобальными привязками в других модулях. Рассмотрите следующий фрагмент кода:
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
В этом случае sum
ДОЛЖЕН быть определен в удаленном процессе. Обратите внимание, что A
является глобальной переменной, определенной в локальном рабочем пространстве. У рабочего 2 нет переменной с именем A
в Main
. Акт отправки замыкания ()->sum(A)
рабочему 2 приводит к тому, что Main.A
определяется на 2. Main.A
продолжает существовать на рабочем 2 даже после того, как вызов remotecall_fetch
возвращается. Удаленные вызовы с встроенными глобальными ссылками (только в модуле Main
) управляют глобальными переменными следующим образом:
Новые глобальные привязки создаются на рабочих узлах назначения, если они упоминаются в рамках удаленного вызова.
Глобальные константы также объявляются как константы на удаленных узлах.
Глобальные переменные отправляются на рабочий узел назначения только в контексте удаленного вызова, и только если их значение изменилось. Кроме того, кластер не синхронизирует глобальные привязки между узлами. Например:
A = rand(10,10) remotecall_fetch(()->sum(A), 2) # worker 2 A = rand(10,10) remotecall_fetch(()->sum(A), 3) # worker 3 A = nothing
Выполнение приведенного выше фрагмента кода приводит к тому, что
Main.A
на рабочем узле 2 имеет значение, отличное отMain.A
на рабочем узле 3, в то время как значениеMain.A
на узле 1 установлено вnothing
.
Как вы могли заметить, память, связанная с глобальными переменными, может быть собрана, когда они переназначаются на главном узле, но такое действие не выполняется на рабочих узлах, так как привязки продолжают оставаться действительными. clear!
может быть использован для ручного переназначения конкретных глобальных переменных на удаленных узлах на nothing
, как только они больше не требуются. Это освободит любую память, связанную с ними, в рамках обычного цикла сборки мусора.
Таким образом, программы должны быть осторожны при ссылках на глобальные переменные в удаленных вызовах. На самом деле, предпочтительнее избегать их вовсе, если это возможно. Если вам необходимо ссылаться на глобальные переменные, рассмотрите возможность использования блоков let
для локализации глобальных переменных.
Например:
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
Как видно, глобальная переменная A
определена на рабочем узле 2, но B
захвачена как локальная переменная, и, следовательно, связывание для B
не существует на рабочем узле 2.
Parallel Map and Loops
К счастью, многие полезные параллельные вычисления не требуют перемещения данных. Общим примером является симуляция Монте-Карло, где несколько процессов могут одновременно обрабатывать независимые испытания симуляции. Мы можем использовать @spawnat
для подбрасывания монет на двух процессах. Сначала напишите следующую функцию в count_heads.jl
:
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
Функция count_heads
просто складывает n
случайных битов. Вот как мы можем провести несколько испытаний на двух машинах и сложить результаты:
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
Этот пример демонстрирует мощный и часто используемый шаблон параллельного программирования. Многие итерации выполняются независимо в нескольких процессах, а затем их результаты объединяются с помощью некоторой функции. Процесс объединения называется редукцией, поскольку он, как правило, уменьшает ранг тензора: вектор чисел сводится к одному числу, или матрица сводится к одной строке или столбцу и т. д. В коде это обычно выглядит как шаблон x = f(x,v[i])
, где x
— это аккумулятор, f
— функция редукции, а v[i]
— это элементы, которые редуцируются. Желательно, чтобы f
была ассоциативной, чтобы не имело значения, в каком порядке выполняются операции.
Обратите внимание, что наше использование этого шаблона с count_heads
можно обобщить. Мы использовали два явных @spawnat
оператора, что ограничивает параллелизм двумя процессами. Чтобы запустить на любом количестве процессов, мы можем использовать параллельный цикл for, работающий в распределенной памяти, который можно записать на Julia, используя @distributed
следующим образом:
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
Этот конструкция реализует шаблон назначения итераций нескольким процессам и их объединения с указанным редукцией (в данном случае (+)
). Результат каждой итерации принимается как значение последнего выражения внутри цикла. Все выражение параллельного цикла само по себе оценивается как окончательный ответ.
Обратите внимание, что хотя параллельные циклы for выглядят как последовательные циклы for, их поведение кардинально отличается. В частности, итерации не происходят в определенном порядке, и записи в переменные или массивы не будут глобально видимыми, поскольку итерации выполняются на разных процессах. Любые переменные, используемые внутри параллельного цикла, будут скопированы и переданы каждому процессу.
Например, следующий код не будет работать как задумано:
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
Этот код не инициализирует весь a
, так как каждый процесс будет иметь отдельную его копию. Такие параллельные циклы for следует избегать. К счастью, Shared Arrays можно использовать, чтобы обойти это ограничение:
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
Использование переменных "вне" в параллельных циклах вполне разумно, если переменные являются только для чтения:
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
Здесь на каждой итерации функция f
применяется к случайно выбранному образцу из вектора a
, который общий для всех процессов.
Как вы могли заметить, оператор редукции можно опустить, если он не нужен. В этом случае цикл выполняется асинхронно, т.е. он создает независимые задачи на всех доступных рабочих и сразу возвращает массив Future
, не дожидаясь завершения. Вызывающий может дождаться завершения 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
позже, вызвав fetch
на них, или дождаться завершения в конце цикла, добавив перед ним @sync
, как @sync @distributed for
.
В некоторых случаях оператор редукции не нужен, и мы просто хотим применить функцию ко всем целым числам в некотором диапазоне (или, более общо, ко всем элементам в некоторой коллекции). Это еще одна полезная операция, называемая параллельным отображением, реализованная в Julia как функция pmap
. Например, мы могли бы вычислить сингулярные значения нескольких больших случайных матриц параллельно следующим образом:
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Julia's pmap
предназначен для случаев, когда каждый вызов функции выполняет большое количество работы. В отличие от этого, @distributed for
может обрабатывать ситуации, когда каждая итерация мала, возможно, просто складывая два числа. Только рабочие процессы используются как 4d61726b646f776e2e436f64652822222c2022706d61702229_40726566
, так и @distributed for
для параллельных вычислений. В случае @distributed for
окончательное сокращение выполняется на вызывающем процессе.
Remote References and AbstractChannels
Удаленные ссылки всегда ссылаются на реализацию AbstractChannel
.
Конкретная реализация AbstractChannel
(например, Channel
) должна реализовать put!
, take!
, fetch
, isready
и wait
. Удаленный объект, на который ссылается Future
, хранится в Channel{Any}(1)
, т.е. в Channel
размером 1, способном хранить объекты типа Any
.
RemoteChannel
, который можно переписать, может указывать на любой тип и размер каналов или любую другую реализацию AbstractChannel
.
Конструктор RemoteChannel(f::Function, pid)()
позволяет нам создавать ссылки на каналы, содержащие более одного значения определенного типа. f
— это функция, выполняемая на pid
, и она должна возвращать AbstractChannel
.
Например, RemoteChannel(()->Channel{Int}(10), pid)
, вернет ссылку на канал типа Int
и размера 10. Канал существует на рабочем pid
.
Методы put!
, take!
, fetch
, isready
и wait
на RemoteChannel
проксируются на хранилище данных в удаленном процессе.
RemoteChannel
таким образом может быть использован для ссылки на реализованные пользователем объекты AbstractChannel
. Простой пример этого - следующий DictChannel
, который использует словарь в качестве своего удаленного хранилища:
julia> struct DictChannel{T} <: AbstractChannel{T}
d::Dict
cond_take::Threads.Condition # waiting for data to become available
DictChannel{T}() where {T} = new(Dict(), Threads.Condition())
DictChannel() = DictChannel{Any}()
end
julia> begin
function Base.put!(D::DictChannel, k, v)
@lock D.cond_take begin
D.d[k] = v
notify(D.cond_take)
end
return D
end
function Base.take!(D::DictChannel, k)
@lock D.cond_take begin
v = fetch(D, k)
delete!(D.d, k)
return v
end
end
Base.isready(D::DictChannel) = @lock D.cond_take !isempty(D.d)
Base.isready(D::DictChannel, k) = @lock D.cond_take haskey(D.d, k)
function Base.fetch(D::DictChannel, k)
@lock D.cond_take begin
wait(D, k)
return D.d[k]
end
end
function Base.wait(D::DictChannel, k)
@lock D.cond_take begin
while !isready(D, k)
wait(D.cond_take)
end
end
end
end;
julia> d = DictChannel();
julia> isready(d)
false
julia> put!(d, :k, :v);
julia> isready(d, :k)
true
julia> fetch(d, :k)
:v
julia> wait(d, :k)
julia> take!(d, :k)
:v
julia> isready(d, :k)
false
Channels and RemoteChannels
Channel
локален для процесса. Рабочий 2 не может напрямую ссылаться на4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
на рабочем 3 и наоборот. ОднакоRemoteChannel
может передавать и получать значения между рабочими.- A
RemoteChannel
можно рассматривать как идентификатор дляChannel
. - Идентификатор процесса,
pid
, связанный сRemoteChannel
, определяет процесс, в котором существует резервное хранилище, т.е. резервныйChannel
. - Любой процесс с ссылкой на
RemoteChannel
может помещать и забирать предметы из канала. Данные автоматически отправляются (или извлекаются) из процесса, с которым ассоциирован4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566
. - Сериализация
Channel
также сериализует любые данные, присутствующие в канале. Десериализация, следовательно, фактически создает копию оригинального объекта. - С другой стороны, сериализация
RemoteChannel
включает только сериализацию идентификатора, который определяет местоположение и экземплярChannel
, на который ссылается дескриптор. Таким образом, десериализованный объект4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566
(на любом рабочем узле) также указывает на тот же резервный хранилище, что и оригинал.
Пример каналов, приведенный выше, можно модифицировать для межпроцессного взаимодействия, как показано ниже.
Мы запускаем 4 рабочих для обработки одного удаленного канала jobs
. Задания, идентифицируемые по id (job_id
), записываются в канал. Каждая удаленно выполняемая задача в этой симуляции считывает job_id
, ждет случайное количество времени и записывает обратно кортеж из job_id
, затраченного времени и своего собственного pid
в канал результатов. В конце все results
выводятся на мастер-процесс.
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
Remote References and Distributed Garbage Collection
Объекты, на которые ссылаются удаленные ссылки, могут быть освобождены только тогда, когда все удерживаемые ссылки в кластере удалены.
Узел, в котором хранится значение, отслеживает, какие из рабочих имеют на него ссылку. Каждый раз, когда RemoteChannel
или (не загруженный) Future
сериализуется для рабочего, узел, на который указывает ссылка, получает уведомление. И каждый раз, когда 4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566
или (не загруженный) 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
собирается мусорщиком локально, узел, владеющий значением, снова получает уведомление. Это реализовано в внутреннем сериализаторе, учитывающем кластер. Удаленные ссылки действительны только в контексте работающего кластера. Сериализация и десериализация ссылок на и из обычных IO
объектов не поддерживается.
Уведомления выполняются путем отправки сообщений "отслеживания" – сообщения "добавить ссылку", когда ссылка сериализуется в другой процесс, и сообщения "удалить ссылку", когда ссылка локально собирается мусором.
Поскольку Future
являются одноразовыми и кэшируются локально, действие fetch
записи 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
также обновляет информацию отслеживания ссылок на узле, владеющем значением.
Узел, который владеет значением, освобождает его, как только все ссылки на него очищены.
С помощью Future
, сериализация уже полученного 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
на другой узел также отправляет значение, так как оригинальный удаленный хранилище могло собрать значение к этому времени.
Важно отметить, что когда объект будет локально собран сборщиком мусора, зависит от размера объекта и текущего давления на память в системе.
В случае удаленных ссылок размер локального объекта ссылки довольно мал, в то время как значение, хранящееся на удаленном узле, может быть довольно большим. Поскольку локальный объект может не быть собран немедленно, хорошей практикой является явный вызов finalize
на локальных экземплярах RemoteChannel
, или на невыгруженных Future
. Поскольку вызов fetch
на 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
также удаляет его ссылку из удаленного хранилища, это не требуется для выгруженных 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265
. Явный вызов 4d61726b646f776e2e436f64652822222c202266696e616c697a652229_40726566
приводит к немедленному отправлению сообщения на удаленный узел с просьбой удалить его ссылку на значение.
После завершения ссылка становится недействительной и не может быть использована в дальнейших вызовах.
Local invocations
Данные обязательно копируются на удаленный узел для выполнения. Это касается как удаленных вызовов, так и случаев, когда данные хранятся в RemoteChannel
/ Future
на другом узле. Как и ожидалось, это приводит к созданию копии сериализованных объектов на удаленном узле. Однако, когда целевой узел является локальным узлом, т.е. идентификатор вызывающего процесса совпадает с идентификатором удаленного узла, он выполняется как локальный вызов. Обычно (но не всегда) он выполняется в другой задаче - но сериализация/десериализация данных не происходит. Следовательно, вызов ссылается на те же экземпляры объектов, что и переданные - копии не создаются. Это поведение подчеркивается ниже:
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel created on local node
julia> v = [0];
julia> for i in 1:3
v[1] = i # Reusing `v`
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannel created on remote node
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
Как видно, put!
на локально принадлежащем RemoteChannel
с тем же объектом v
, измененным между вызовами, приводит к тому, что хранится один и тот же экземпляр объекта. В отличие от случаев, когда копии v
создаются, если узел, владеющий rc
, является другим узлом.
Следует отметить, что это, как правило, не является проблемой. Это следует учитывать только в том случае, если объект хранится локально и изменяется после вызова. В таких случаях может быть целесообразно сохранить deepcopy
объекта.
Это также верно для удаленных вызовов на локальном узле, как видно в следующем примере:
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # Executed on local node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
Как можно снова увидеть, удаленный вызов на локальном узле ведет себя так же, как и прямой вызов. Вызов изменяет локальные объекты, переданные в качестве аргументов. В удаленном вызове он работает с копией аргументов.
Чтобы повторить, в общем это не является проблемой. Если локальный узел также используется как вычислительный узел, и аргументы, используемые после вызова, необходимо учитывать это поведение, и, если требуется, глубокие копии аргументов должны быть переданы в вызов, инициируемый на локальном узле. Вызовы на удаленных узлах всегда будут работать с копиями аргументов.
Shared Arrays
Shared Arrays используют системную общую память для отображения одного и того же массива в нескольких процессах. SharedArray
является хорошим выбором, когда вы хотите иметь большой объем данных, совместно доступных для двух или более процессов на одной машине. Поддержка Shared Array доступна через модуль SharedArrays
, который необходимо явно загрузить на всех участвующих рабочих.
Дополнительная структура данных предоставляется внешним пакетом DistributedArrays.jl
в форме DArray
. Хотя есть некоторые сходства с SharedArray
, поведение DArray
довольно отличается. В 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566
каждый "участвующий" процесс имеет доступ ко всему массиву; в отличие от этого, в 4d61726b646f776e2e436f64652822222c20224441727261792229_68747470733a2f2f6769746875622e636f6d2f4a756c6961506172616c6c656c2f44697374726962757465644172726179732e6a6c
каждый процесс имеет локальный доступ только к части данных, и ни два процесса не делят одну и ту же часть.
SharedArray
индексация (назначение и доступ к значениям) работает так же, как и с обычными массивами, и эффективна, потому что базовая память доступна локальному процессу. Поэтому большинство алгоритмов естественно работают с 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566
, хотя в однопроцессорном режиме. В случаях, когда алгоритм настаивает на входе Array
, базовый массив можно получить из 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566
, вызвав sdata
. Для других типов AbstractArray
4d61726b646f776e2e436f64652822222c202273646174612229_40726566
просто возвращает сам объект, поэтому безопасно использовать 4d61726b646f776e2e436f64652822222c202273646174612229_40726566
для любого объекта типа Array
.
Конструктор для общего массива имеет следующую форму:
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
который создает N
-мерный общий массив типа T
и размера dims
для процессов, указанных в pids
. В отличие от распределенных массивов, общий массив доступен только для тех рабочих процессов, которые указаны в аргументе pids
(и для создающего процесса, если он находится на том же хосте). Обратите внимание, что в SharedArray поддерживаются только элементы, которые являются isbits
.
Если указана функция init
с сигнатурой initfn(S::SharedArray)
, она вызывается на всех участвующих рабочих. Вы можете указать, что каждый рабочий выполняет функцию init
на отдельной части массива, тем самым параллелизируя инициализацию.
Вот краткий пример:
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
предоставляет разрозненные одномерные диапазоны индексов и иногда удобно для разделения задач между процессами. Вы, конечно, можете разделить работу любым способом, который вам нравится:
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
Поскольку все процессы имеют доступ к основным данным, вам нужно быть осторожным, чтобы не создать конфликты. Например:
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
это приведет к неопределенному поведению. Поскольку каждый процесс заполняет весь массив своим pid
, тот процесс, который последним выполнит (для любого конкретного элемента S
), сохранит свой pid
.
В качестве более расширенного и сложного примера рассмотрим выполнение следующего "ядра" параллельно:
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
В этом случае, если мы попытаемся разделить работу, используя одномерный индекс, мы, вероятно, столкнемся с проблемами: если q[i,j,t]
находится вблизи конца блока, назначенного одному работнику, а q[i,j,t+1]
находится вблизи начала блока, назначенного другому, то очень вероятно, что q[i,j,t]
не будет готово в момент, когда оно потребуется для вычисления q[i,j,t+1]
. В таких случаях лучше вручную разбить массив на части. Давайте разделим по второму измерению. Определите функцию, которая возвращает индексы (irange, jrange)
, назначенные этому работнику:
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # This worker is not assigned a piece
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
Далее определите ядро:
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # display so we can see what's happening
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
Мы также определяем удобную обертку для реализации SharedArray
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
Теперь давайте сравним три разные версии, одна из которых работает в одном процессе:
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
один, который использует @distributed
:
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
и один, который делегирует по частям:
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
Если мы создадим SharedArray
и замерим время выполнения этих функций, мы получим следующие результаты (с julia -p 4
):
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
Запустите функции один раз для JIT-компиляции и @time
их на втором запуске:
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
Главное преимущество advection_shared!
заключается в том, что оно минимизирует трафик между рабочими, позволяя каждому работать в течение длительного времени над назначенной частью.
Shared Arrays and Distributed Garbage Collection
Как и удаленные ссылки, общие массивы также зависят от сборки мусора на создающем узле для освобождения ссылок от всех участвующих рабочих. Код, который создает множество краткоживущих объектов общих массивов, выиграет от явного завершения этих объектов как можно скорее. Это приведет к более раннему освобождению как памяти, так и дескрипторов файлов, сопоставляющих общий сегмент.
ClusterManagers
Запуск, управление и сетевое взаимодействие процессов Julia в логическом кластере осуществляется через менеджеры кластеров. ClusterManager
отвечает за
- запуск рабочих процессов в кластерной среде
- управление событиями в течение жизни каждого работника
- опционально, предоставляя транспорт данных
Кластер Julia имеет следующие характеристики:
- Начальный процесс Julia, также называемый
master
, является специальным и имеетid
1. - Только процесс
master
может добавлять или удалять рабочие процессы. - Все процессы могут напрямую общаться друг с другом.
Соединения между рабочими (с использованием встроенного транспортного протокола TCP/IP) устанавливаются следующим образом:
addprocs
вызывается в главном процессе с объектомClusterManager
.addprocs
вызывает соответствующий методlaunch
, который создает необходимое количество рабочих процессов на соответствующих машинах.- Каждый работник начинает прослушивание на свободном порту и записывает информацию о своем хосте и порте в
stdout
. - Менеджер кластера захватывает
stdout
каждого рабочего процесса и делает его доступным для главного процесса. - Мастер-процесс анализирует эту информацию и устанавливает TCP/IP соединения с каждым рабочим.
- Каждый работник также уведомляется о других работниках в кластере.
- Каждый работник подключается ко всем работникам, чей
id
меньше, чем его собственныйid
. - Таким образом, создается сетевое соединение, в котором каждый работник напрямую соединен с каждым другим работником.
Хотя по умолчанию транспортный уровень использует простой TCPSocket
, кластер Julia может предоставить свой собственный транспорт.
Julia предоставляет два встроенных менеджера кластеров:
LocalManager
, используемый, когда вызываютсяaddprocs()
илиaddprocs(np::Integer)
SSHManager
, используемый, когдаaddprocs(hostnames::Array)
вызывается с списком имен хостов
LocalManager
используется для запуска дополнительных рабочих процессов на том же хосте, тем самым используя многоядерное и многопроцессорное оборудование.
Таким образом, минимальный менеджер кластера должен:
- быть подтипом абстрактного
ClusterManager
- реализовать
launch
, метод, ответственный за запуск новых работников - реализовать
manage
, который вызывается на различных событиях в течение жизни работника (например, отправка сигнала прерывания)
addprocs(manager::FooManager)
требует, чтобы FooManager
реализовал:
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
В качестве примера давайте посмотрим, как реализован LocalManager
, менеджер, ответственный за запуск рабочих процессов на одном хосте:
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
Метод launch
принимает следующие аргументы:
manager::ClusterManager
: кластерный менеджер, с которым вызываетсяaddprocs
params::Dict
: все именованные аргументы, переданные вaddprocs
launched::Array
: массив, в который нужно добавить один или несколько объектовWorkerConfig
c::Condition
: переменная условия, которая будет уведомлена по мере запуска рабочих процессов
Метод launch
вызывается асинхронно в отдельной задаче. Завершение этой задачи сигнализирует о том, что все запрошенные рабочие запущены. Следовательно, функция 4d61726b646f776e2e436f64652822222c20226c61756e63682229_40726566
ДОЛЖНА завершиться, как только все запрошенные рабочие будут запущены.
Новые запущенные рабочие соединены друг с другом и с мастер-процессом в режиме "все ко всем". Указание аргумента командной строки --worker[=<cookie>]
приводит к тому, что запущенные процессы инициализируют себя как рабочие, и соединения устанавливаются через сокеты TCP/IP.
Все работники в кластере используют тот же cookie, что и мастер. Когда куки не указаны, т.е. с опцией --worker
, работник пытается прочитать его из своего стандартного ввода. LocalManager
и SSHManager
оба передают куки вновь запущенным работникам через их стандартные вводы.
По умолчанию рабочий процесс будет слушать на свободном порту по адресу, возвращаемому вызовом getipaddr()
. Конкретный адрес для прослушивания может быть указан с помощью необязательного аргумента --bind-to bind_addr[:port]
. Это полезно для многопортовых хостов.
В качестве примера транспорта, не основанного на TCP/IP, реализация может выбрать использование MPI, в этом случае --worker
НЕ должен быть указан. Вместо этого вновь запущенные рабочие процессы должны вызывать init_worker(cookie)
перед использованием любых параллельных конструкций.
Для каждого запущенного работника метод launch
должен добавлять объект WorkerConfig
(с инициализированными соответствующими полями) в launched
mutable struct WorkerConfig
# Common fields relevant to all cluster managers
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# Used when launching additional workers at a host
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# External cluster managers can use this to store information at a per-worker level
# Can be a dict if multiple fields need to be stored.
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Used by Local/SSH managers
connect_at::Any
[...]
end
Большинство полей в WorkerConfig
используются встроенными менеджерами. Пользовательские менеджеры кластера обычно указывают только io
или host
/ port
:
Если указано
io
, оно используется для чтения информации о хосте/порту. Рабочий процесс Julia выводит свой адрес привязки и порт при запуске. Это позволяет рабочим процессам Julia слушать на любом свободном порту, доступном вместо того, чтобы требовать ручной настройки портов рабочих процессов.Если
io
не указан, используютсяhost
иport
для подключения.count
,exename
иexeflags
имеют значение для запуска дополнительных рабочих процессов из рабочего процесса. Например, менеджер кластера может запустить один рабочий процесс на узел и использовать его для запуска дополнительных рабочих процессов.count
с целочисленным значениемn
запустит в общей сложностиn
рабочих.count
со значением:auto
запустит столько рабочих процессов, сколько потоков ЦП (логических ядер) на этом компьютере.exename
— это имя исполняемого файлаjulia
, включая полный путь.exeflags
должны быть установлены на необходимые аргументы командной строки для новых рабочих.
tunnel
,bind_addr
,sshflags
иmax_parallel
используются, когда требуется ssh-туннель для подключения к рабочим процессам из главного процесса.userdata
предоставляется для пользовательских менеджеров кластеров, чтобы хранить собственную информацию, специфичную для рабочих узлов.
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
вызывается в разное время в течение жизни рабочего с соответствующими значениями op
:
- с
:register
/:deregister
, когда рабочий добавляется / удаляется из пула рабочих Julia. - с
:interrupt
, когда вызываетсяinterrupt(workers)
.ClusterManager
должен послать соответствующему рабочему сигнал прерывания. - с
:finalize
для целей очистки.
Cluster Managers with Custom Transports
Замена стандартных соединений сокетов TCP/IP на пользовательский транспортный уровень немного сложнее. Каждый процесс Julia имеет столько же задач по коммуникации, сколько рабочих, к которым он подключен. Например, рассмотрим кластер Julia из 32 процессов в сети "все ко всем":
- Каждый процесс Julia имеет 31 задачу по коммуникации.
- Каждая задача обрабатывает все входящие сообщения от одного удаленного работника в цикле обработки сообщений.
- Цикл обработки сообщений ожидает объект
IO
(например,TCPSocket
в стандартной реализации), считывает целое сообщение, обрабатывает его и ожидает следующего. - Отправка сообщений в процесс осуществляется напрямую из любой задачи Julia — не только из задач связи — снова через соответствующий объект
IO
.
Замена стандартного транспорта требует от новой реализации настройки соединений с удаленными рабочими и предоставления соответствующих объектов IO
, на которые могут ожидать циклы обработки сообщений. Специфические для менеджера обратные вызовы, которые необходимо реализовать, это:
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
Стандартная реализация (которая использует сокеты TCP/IP) реализована как connect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
.
connect
должен возвращать пару объектов IO
, один для чтения данных, отправленных от рабочего pid
, и другой для записи данных, которые необходимо отправить рабочему pid
. Пользовательские менеджеры кластеров могут использовать поток BufferStream
в памяти в качестве соединения для проксирования данных между пользовательским, возможно, не-IO
транспортом и встроенной параллельной инфраструктурой Julia.
BufferStream
— это поток в памяти IOBuffer
, который ведет себя как IO
— это поток, который можно обрабатывать асинхронно.
Папка clustermanager/0mq
в Examples repository содержит пример использования ZeroMQ для подключения рабочих процессов Julia в звездной топологии с брокером 0MQ посередине. Примечание: Процессы Julia по-прежнему логически связаны друг с другом — любой рабочий процесс может отправить сообщение любому другому рабочему процессу напрямую, не осознавая, что 0MQ используется в качестве транспортного слоя.
При использовании пользовательских транспортов:
- Рабочие процессы Julia не должны запускаться с
--worker
. Запуск с--worker
приведет к тому, что вновь запущенные рабочие процессы будут по умолчанию использовать реализацию транспортного протокола TCP/IP. - Для каждого входящего логического соединения с рабочим
Base.process_messages(rd::IO, wr::IO)()
должен быть вызван. Это запускает новую задачу, которая обрабатывает чтение и запись сообщений от/к рабочему, представленному объектамиIO
. init_worker(cookie, manager::FooManager)
должен быть вызван в рамках инициализации рабочего процесса.- Поле
connect_at::Any
вWorkerConfig
может быть установлено менеджером кластера, когда вызываетсяlaunch
. Значение этого поля передается во всехconnect
обратных вызовах. Обычно оно содержит информацию о том, как подключиться к рабочему. Например, транспорт TCP/IP использует это поле для указания кортежа(host, port)
, по которому следует подключиться к рабочему.
kill(manager, pid, config)
вызывается для удаления рабочего из кластера. В главном процессе соответствующие объекты IO
должны быть закрыты реализацией для обеспечения правильной очистки. Стандартная реализация просто выполняет вызов exit()
на указанном удаленном рабочем.
Папка Examples clustermanager/simple
является примером, который демонстрирует простую реализацию с использованием сокетов домена UNIX для настройки кластера.
Network Requirements for LocalManager and SSHManager
Кластеры Julia предназначены для выполнения в уже защищенных средах на инфраструктуре, такой как локальные ноутбуки, департаментские кластеры или даже в облаке. Этот раздел охватывает требования к сетевой безопасности для встроенных LocalManager
и SSHManager
:
Мастер-процесс не слушает ни на одном порту. Он только подключается к рабочим процессам.
Каждый рабочий процесс привязывается только к одному из локальных интерфейсов и слушает на временном номере порта, назначенном операционной системой.
LocalManager
, используемый вaddprocs(N)
, по умолчанию связывается только с интерфейсом обратной связи. Это означает, что рабочие процессы, запущенные позже на удаленных хостах (или кем-либо с злонамеренными намерениями), не могут подключиться к кластеру. Вызовaddprocs(4)
, за которым следуетaddprocs(["remote_host"])
, завершится неудачей. Некоторые пользователи могут захотеть создать кластер, состоящий из их локальной системы и нескольких удаленных систем. Это можно сделать, явно запросивLocalManager
связаться с внешним сетевым интерфейсом через аргумент ключевого словаrestrict
:addprocs(4; restrict=false)
.SSHManager
, используемый вaddprocs(list_of_remote_hosts)
, запускает рабочие процессы на удаленных хостах через SSH. По умолчанию SSH используется только для запуска рабочих процессов Julia. Последующие соединения между мастером и рабочими процессами, а также между рабочими процессами используют обычные, незащищенные TCP/IP сокеты. Удаленные хосты должны иметь включенный вход без пароля. Дополнительные флаги SSH или учетные данные могут быть указаны через аргумент ключевого словаsshflags
.addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
полезен, когда мы хотим использовать SSH-соединения для мастера-работника. Типичный сценарий для этого - локальный ноутбук, работающий с Julia REPL (т.е. мастер), с остальной частью кластера в облаке, скажем, на Amazon EC2. В этом случае необходимо открыть только порт 22 на удаленном кластере в сочетании с клиентом SSH, аутентифицированным через инфраструктуру открытых ключей (PKI). Учетные данные для аутентификации могут быть предоставлены черезsshflags
, напримерsshflags=`-i <keyfile>`
.В топологии "все к всем" (по умолчанию) все рабочие узлы соединяются друг с другом через обычные TCP-сокеты. Политика безопасности на узлах кластера должна обеспечивать свободное соединение между рабочими узлами для диапазона временных портов (варьируется в зависимости от ОС).
Защита и шифрование всего трафика между рабочими узлами (через SSH) или шифрование отдельных сообщений может быть выполнено с помощью пользовательского
ClusterManager
.Если вы укажете
multiplex=true
в качестве параметра дляaddprocs
, будет использовано SSH мультиплексирование для создания туннеля между мастером и рабочими. Если вы настроили SSH мультиплексирование самостоятельно и соединение уже установлено, SSH мультиплексирование будет использоваться независимо от параметраmultiplex
. Если мультиплексирование включено, пересылка устанавливается с использованием существующего соединения (параметр-O forward
в ssh). Это полезно, если ваши серверы требуют аутентификации по паролю; вы можете избежать аутентификации в Julia, войдя на сервер заранее перед4d61726b646f776e2e436f64652822222c202261646470726f63732229_40726566
. Управляющий сокет будет находиться по адресу~/.ssh/julia-%r@%h:%p
в течение сессии, если не используется существующее соединение мультиплексирования. Обратите внимание, что пропускная способность может быть ограничена, если вы создаете несколько процессов на узле и включаете мультиплексирование, потому что в этом случае процессы делят одно TCP соединение мультиплексирования.
Cluster Cookie
Все процессы в кластере используют одно и то же cookie, который по умолчанию является случайно сгенерированной строкой в главном процессе:
cluster_cookie()
возвращает куки, в то время какcluster_cookie(cookie)()
устанавливает их и возвращает новые куки.- Все соединения аутентифицированы с обеих сторон, чтобы гарантировать, что только рабочие процессы, запущенные мастером, могут подключаться друг к другу.
- Куки могут быть переданы рабочим при запуске через аргумент
--worker=<cookie>
. Если аргумент--worker
указан без куки, рабочий пытается прочитать куки из своего стандартного ввода (stdin
). Стандартный ввод закрывается сразу после получения куки. ClusterManager
s могут получить куки на мастере, вызвавcluster_cookie()
. Менеджеры кластеров, не использующие транспорт по умолчанию TCP/IP (и, следовательно, не указывающие--worker
), должны вызватьinit_worker(cookie, manager)
с тем же куки, что и на мастере.
Обратите внимание, что среды, требующие более высокого уровня безопасности, могут реализовать это с помощью пользовательского ClusterManager
. Например, куки могут быть предварительно поделены и, следовательно, не указаны в качестве аргумента запуска.
Specifying Network Topology (Experimental)
Ключевой аргумент topology
, переданный в addprocs
, используется для указания того, как работники должны быть связаны друг с другом:
:all_to_all
, по умолчанию: все работники соединены друг с другом.:master_worker
: только процесс драйвера, т.е.pid
1, имеет соединения с рабочими.:custom
: методlaunch
менеджера кластера указывает топологию соединения через поляident
иconnect_idents
вWorkerConfig
. Рабочий с идентичностьюident
, предоставленной менеджером кластера, будет подключаться ко всем рабочим, указанным вconnect_idents
.
Ключевой аргумент lazy=true|false
влияет только на опцию topology
:all_to_all
. Если true
, кластер начинает с того, что мастер подключен ко всем рабочим. Конкретные соединения между рабочими устанавливаются при первом удаленном вызове между двумя рабочими. Это помогает сократить начальные ресурсы, выделяемые для внутрикластерной связи. Соединения настраиваются в зависимости от требований времени выполнения параллельной программы. Значение по умолчанию для lazy
— true
.
В настоящее время отправка сообщения между несвязанными рабочими приводит к ошибке. Это поведение, как и функциональность и интерфейс, следует считать экспериментальным по своей природе и оно может измениться в будущих релизах.
Noteworthy external packages
Вне параллелизма Julia существует множество внешних пакетов, которые следует упомянуть. Например, MPI.jl
является оберткой Julia для протокола MPI
, Dagger.jl
предоставляет функциональность, аналогичную Dask на Python, а DistributedArrays.jl
предоставляет операции с массивами, распределенные между рабочими, как outlined above.
Необходимо упомянуть экосистему программирования на GPU Julia, которая включает:
- CUDA.jl оборачивает различные библиотеки CUDA и поддерживает компиляцию ядер Julia для графических процессоров Nvidia.
- oneAPI.jl оборачивает унифицированную модель программирования oneAPI и поддерживает выполнение ядер Julia на поддерживаемых ускорителях. В настоящее время поддерживается только Linux.
- AMDGPU.jl оборачивает библиотеки AMD ROCm и поддерживает компиляцию ядер Julia для графических процессоров AMD. В настоящее время поддерживается только Linux.
- Библиотеки высокого уровня, такие как KernelAbstractions.jl, Tullio.jl и ArrayFire.jl.
В следующем примере мы будем использовать как DistributedArrays.jl
, так и CUDA.jl
, чтобы распределить массив между несколькими процессами, сначала преобразовав его с помощью distribute()
и CuArray()
.
Помните, когда импортируете DistributedArrays.jl
, чтобы импортировать его на всех процессах, используя @everywhere
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CUDA
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
В следующем примере мы будем использовать как DistributedArrays.jl
, так и CUDA.jl
, чтобы распределить массив между несколькими процессами и вызвать на нем универсальную функцию.
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
повторно создает новый вектор и нормализует его. Мы не указали никакой типовой сигнатуры в объявлении функции, давайте посмотрим, будет ли это работать с вышеупомянутыми типами данных:
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
Чтобы завершить это краткое знакомство с внешними пакетами, мы можем рассмотреть MPI.jl
, обертку Julia для протокола MPI. Поскольку было бы слишком долго рассматривать каждую внутреннюю функцию, лучше просто оценить подход, использованный для реализации протокола.
Рассмотрим этот игрушечный скрипт, который просто вызывает каждый подпроцесс, создает его ранг, а когда достигается главный процесс, выполняет сумму рангов.
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl
- 1In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.