Multi-processing and Distributed Computing

Реализация параллельных вычислений с распределенной памятью предоставляется модулем Distributed как частью стандартной библиотеки, поставляемой с Julia.

Большинство современных компьютеров имеют более одного ЦП, и несколько компьютеров могут быть объединены в кластер. Использование мощности этих нескольких ЦП позволяет выполнять многие вычисления быстрее. Существует два основных фактора, влияющих на производительность: скорость самих ЦП и скорость их доступа к памяти. В кластере довольно очевидно, что данный ЦП будет иметь самый быстрый доступ к ОЗУ внутри одного и того же компьютера (узла). Возможно, более удивительно, что аналогичные проблемы актуальны на типичном многопоточном ноутбуке из-за различий в скорости основной памяти и cache. Следовательно, хорошая среда для многопроцессорной обработки должна позволять контролировать "право собственности" на участок памяти конкретным ЦП. Julia предоставляет среду для многопроцессорной обработки на основе передачи сообщений, чтобы программы могли выполняться на нескольких процессах в отдельных доменах памяти одновременно.

Реализация передачи сообщений в Julia отличается от других сред, таких как MPI[1]. Связь в Julia, как правило, "односторонняя", что означает, что программисту нужно явно управлять только одним процессом в операции с двумя процессами. Более того, эти операции обычно не выглядят как "отправка сообщения" и "получение сообщения", а скорее напоминают операции более высокого уровня, такие как вызовы пользовательских функций.

Распределенное программирование в Julia основано на двух примитивах: удаленные ссылки и удаленные вызовы. Удаленная ссылка — это объект, который может использоваться из любого процесса для ссылки на объект, хранящийся на определенном процессе. Удаленный вызов — это запрос одного процесса на вызов определенной функции с определенными аргументами на другом (возможно, том же) процессе.

Удаленные ссылки бывают двух видов: Future и RemoteChannel.

Удаленный вызов возвращает Future в качестве результата. Удаленные вызовы возвращаются немедленно; процесс, который сделал вызов, продолжает свою следующую операцию, пока удаленный вызов происходит где-то еще. Вы можете дождаться завершения удаленного вызова, вызвав wait на возвращенном 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265, и вы можете получить полное значение результата, используя fetch.

С другой стороны, RemoteChannel можно перезаписывать. Например, несколько процессов могут координировать свою обработку, ссылаясь на один и тот же удаленный Channel.

Каждый процесс имеет связанный идентификатор. Процесс, предоставляющий интерактивный запрос Julia, всегда имеет id, равный 1. Процессы, используемые по умолчанию для параллельных операций, называются "рабочими". Когда существует только один процесс, процесс 1 считается рабочим. В противном случае рабочими считаются все процессы, кроме процесса 1. В результате добавление 2 или более процессов необходимо для получения преимуществ от методов параллельной обработки, таких как pmap. Добавление одного процесса полезно, если вы просто хотите заниматься другими делами в основном процессе, пока длительное вычисление выполняется на рабочем.

Давайте попробуем это. Начало с julia -p n предоставляет n рабочих процессов на локальной машине. Обычно имеет смысл, чтобы n равнялось количеству потоков ЦП (логических ядер) на машине. Обратите внимание, что аргумент -p неявно загружает модуль Distributed.

$ julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

Первый аргумент к remotecall — это функция, которую нужно вызвать. Большинство параллельных программ в Julia не ссылаются на конкретные процессы или количество доступных процессов, но 4d61726b646f776e2e436f64652822222c202272656d6f746563616c6c2229_40726566 считается интерфейсом низкого уровня, предоставляющим более тонкий контроль. Второй аргумент к 4d61726b646f776e2e436f64652822222c202272656d6f746563616c6c2229_40726566 — это id процесса, который будет выполнять работу, а оставшиеся аргументы будут переданы вызываемой функции.

Как вы можете видеть, в первой строке мы попросили процесс 2 создать случайную матрицу 2 на 2, а во второй строке мы попросили его добавить 1 к ней. Результат обоих вычислений доступен в двух futures, r и s. Макрос @spawnat вычисляет выражение во втором аргументе на процессе, указанном в первом аргументе.

Иногда вам может понадобиться немедленно получить значение, вычисленное удаленно. Это обычно происходит, когда вы читаете из удаленного объекта, чтобы получить данные, необходимые для следующей локальной операции. Функция remotecall_fetch существует для этой цели. Она эквивалентна fetch(remotecall(...)), но более эффективна.

julia> remotecall_fetch(r-> fetch(r)[1, 1], 2, r)
0.18526337335308085

Это извлекает массив на рабочем 2 и возвращает первое значение. Обратите внимание, что fetch в этом случае не перемещает никаких данных, так как он выполняется на рабочем, который владеет массивом. Также можно написать:

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.10824216411304866

Помните, что getindex(r,1,1) это equivalent для r[1,1], так что этот вызов извлекает первый элемент будущего r.

Чтобы упростить задачу, символ :any можно передать в @spawnat, который выбирает, где выполнять операцию за вас:

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

Обратите внимание, что мы использовали 1 .+ fetch(r) вместо 1 .+ r. Это связано с тем, что мы не знаем, где будет выполняться код, поэтому в общем случае может потребоваться fetch, чтобы переместить r в процесс, выполняющий сложение. В этом случае @spawnat достаточно умён, чтобы выполнить вычисление в процессе, которому принадлежит r, поэтому 4d61726b646f776e2e436f64652822222c202266657463682229_40726566 будет бездействием (никакая работа не выполняется).

(Стоит отметить, что @spawnat не является встроенным, а определено в Julia как macro. Возможно определить свои собственные такие конструкции.)

Важно помнить, что, после получения, Future будет кэшировать свое значение локально. Дальнейшие вызовы fetch не требуют сетевого перехода. Как только все ссылающиеся на 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265 получат данные, удаляется удаленное хранимое значение.

@async похож на @spawnat, но выполняет задачи только в локальном процессе. Мы используем его для создания задачи "подачи" для каждого процесса. Каждая задача выбирает следующий индекс, который нужно вычислить, затем ждет, пока ее процесс завершится, и повторяет это, пока не исчерпает индексы. Обратите внимание, что задачи подачи не начинают выполняться, пока основная задача не достигнет конца блока @sync, в этот момент она передает управление и ждет, пока все локальные задачи завершатся, прежде чем вернуться из функции. Что касается версии 0.7 и выше, задачи подачи могут делиться состоянием через nextidx, потому что они все выполняются в одном и том же процессе. Даже если Tasks планируются кооперативно, блокировка может все еще потребоваться в некоторых контекстах, как в asynchronous I/O. Это означает, что переключения контекста происходят только в четко определенных точках: в данном случае, когда вызывается remotecall_fetch. Это текущее состояние реализации, и оно может измениться в будущих версиях Julia, так как оно предназначено для того, чтобы сделать возможным выполнение до N Tasks на M Process, также известном как M:N Threading. Затем потребуется модель захвата/освобождения блокировки для nextidx, так как небезопасно позволять нескольким процессам одновременно читать и записывать ресурс.

Code Availability and Loading Packages

Ваш код должен быть доступен в любом процессе, который его выполняет. Например, введите следующее в командной строке Julia:

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

julia> rand2(2,2)
2×2 Array{Float64,2}:
 0.153756  0.368514
 1.15119   0.918912

julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))))
Stacktrace:
[...]

Процесс 1 знал о функции rand2, но процесс 2 не знал.

Чаще всего вы будете загружать код из файлов или пакетов, и у вас есть значительная гибкость в контроле над тем, какие процессы загружают код. Рассмотрим файл DummyModule.jl, содержащий следующий код:

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

Чтобы ссылаться на MyType во всех процессах, DummyModule.jl необходимо загрузить на каждом процессе. Вызов include("DummyModule.jl") загружает его только на одном процессе. Чтобы загрузить его на каждом процессе, используйте макрос @everywhere (запустив Julia с julia -p 2):

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

Как обычно, это не приводит DummyModule в область видимости ни одного из процессов, который требует using или import. Более того, когда DummyModule вводится в область видимости в одном процессе, он не находится в области видимости ни в каком другом:

julia> using .DummyModule

julia> MyType(7)
MyType(7)

julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: `MyType` not defined in `Main`
⋮

julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)

Тем не менее, все еще возможно, например, отправить MyType в процесс, который загрузил DummyModule, даже если он не в области видимости:

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

Файл также может быть предварительно загружен в несколько процессов при запуске с помощью флага -L, и может быть использован скрипт драйвера для управления вычислениями:

julia -p <n> -L file1.jl -L file2.jl driver.jl

Процесс Julia, выполняющий скрипт драйвера в приведенном выше примере, имеет id, равный 1, так же как и процесс, предоставляющий интерактивный интерфейс.

Наконец, если DummyModule.jl не является отдельным файлом, а пакетом, то using DummyModule загрузит DummyModule.jl на всех процессах, но только введет его в область видимости на процессе, где был вызван using.

Starting and managing worker processes

Базовая установка Julia имеет встроенную поддержку для двух типов кластеров:

  • Локальный кластер, указанный с помощью опции -p, как показано выше.
  • Кластер, охватывающий машины, с использованием опции --machine-file. Это использует безпарольный вход ssh для запуска процессов рабочих Julia (из того же пути, что и текущий хост) на указанных машинах. Каждое определение машины имеет форму [count*][user@]host[:port] [bind_addr[:port]]. user по умолчанию равен текущему пользователю, port — стандартному порту ssh. count — это количество рабочих, которые нужно запустить на узле, и по умолчанию равно 1. Необязательный bind-to bind_addr[:port] указывает IP-адрес и порт, которые другие рабочие должны использовать для подключения к этому рабочему.
Note

Хотя Julia в целом стремится к обратной совместимости, распределение кода на рабочие процессы зависит от Serialization.serialize. Как указано в соответствующей документации, это не может быть гарантировано для работы на разных версиях Julia, поэтому рекомендуется, чтобы все рабочие процессы на всех машинах использовали одну и ту же версию.

Функции addprocs, rmprocs, workers и другие доступны как программный способ добавления, удаления и запроса процессов в кластере.

julia> using Distributed

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

Модуль Distributed должен быть явно загружен в главном процессе перед вызовом addprocs. Он автоматически становится доступным в рабочих процессах.

Note

Обратите внимание, что рабочие процессы не выполняют скрипт инициализации ~/.julia/config/startup.jl, и они не синхронизируют свое глобальное состояние (например, параметры командной строки, глобальные переменные, определения новых методов и загруженные модули) с другими запущенными процессами. Вы можете использовать addprocs(exeflags="--project"), чтобы инициализировать рабочий процесс с определенной средой, а затем @everywhere using <modulename> или @everywhere include("file.jl").

Другие типы кластеров могут поддерживаться путем написания собственного пользовательского ClusterManager, как описано ниже в разделе ClusterManagers.

Data Movement

Отправка сообщений и перемещение данных составляют большую часть накладных расходов в распределенной программе. Снижение количества сообщений и объема передаваемых данных имеет решающее значение для достижения производительности и масштабируемости. С этой целью важно понять перемещение данных, выполняемое различными конструкциями распределенного программирования Julia.

fetch может рассматриваться как явная операция перемещения данных, поскольку она напрямую запрашивает перемещение объекта на локальную машину. @spawnat (и несколько связанных конструкций) также перемещает данные, но это не так очевидно, поэтому это можно назвать неявной операцией перемещения данных. Рассмотрим эти два подхода к построению и возведению в квадрат случайной матрицы:

Метод 1:

julia> A = rand(1000,1000);

julia> Bref = @spawnat :any A^2;

[...]

julia> fetch(Bref);

Метод 2:

julia> Bref = @spawnat :any rand(1000,1000)^2;

[...]

julia> fetch(Bref);

Разница кажется тривиальной, но на самом деле она довольно значительна из-за поведения @spawnat. В первом методе случайная матрица создается локально, а затем отправляется в другой процесс, где она возводится в квадрат. Во втором методе случайная матрица как создается, так и возводится в квадрат в другом процессе. Поэтому второй метод отправляет гораздо меньше данных, чем первый.

В этом игрушечном примере два метода легко различить и выбрать. Однако в реальной программе проектирование перемещения данных может потребовать больше размышлений и, вероятно, некоторых измерений. Например, если первому процессу нужна матрица A, то первый метод может быть лучше. Или, если вычисление A дорогостоящее и только текущий процесс его имеет, то перемещение его в другой процесс может быть неизбежным. Или, если текущему процессу почти нечего делать между @spawnat и fetch(Bref), может быть лучше полностью устранить параллелизм. Или представьте, что rand(1000,1000) заменяется на более дорогостоящую операцию. Тогда может иметь смысл добавить еще одно выражение 4d61726b646f776e2e436f64652822222c202240737061776e61742229_40726566 только для этого шага.

Global variables

Выражения, выполняемые удаленно через @spawnat, или замыкания, указанные для удаленного выполнения с использованием remotecall, могут ссылаться на глобальные переменные. Глобальные привязки в модуле Main обрабатываются немного иначе по сравнению с глобальными привязками в других модулях. Рассмотрите следующий фрагмент кода:

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

В этом случае sum ДОЛЖЕН быть определен в удаленном процессе. Обратите внимание, что A является глобальной переменной, определенной в локальном рабочем пространстве. У рабочего 2 нет переменной с именем A в Main. Акт отправки замыкания ()->sum(A) рабочему 2 приводит к тому, что Main.A определяется на 2. Main.A продолжает существовать на рабочем 2 даже после того, как вызов remotecall_fetch возвращается. Удаленные вызовы с встроенными глобальными ссылками (только в модуле Main) управляют глобальными переменными следующим образом:

  • Новые глобальные привязки создаются на рабочих узлах назначения, если они упоминаются в рамках удаленного вызова.

  • Глобальные константы также объявляются как константы на удаленных узлах.

  • Глобальные переменные отправляются на рабочий узел назначения только в контексте удаленного вызова, и только если их значение изменилось. Кроме того, кластер не синхронизирует глобальные привязки между узлами. Например:

    A = rand(10,10)
    remotecall_fetch(()->sum(A), 2) # worker 2
    A = rand(10,10)
    remotecall_fetch(()->sum(A), 3) # worker 3
    A = nothing

    Выполнение приведенного выше фрагмента кода приводит к тому, что Main.A на рабочем узле 2 имеет значение, отличное от Main.A на рабочем узле 3, в то время как значение Main.A на узле 1 установлено в nothing.

Как вы могли заметить, память, связанная с глобальными переменными, может быть собрана, когда они переназначаются на главном узле, но такое действие не выполняется на рабочих узлах, так как привязки продолжают оставаться действительными. clear! может быть использован для ручного переназначения конкретных глобальных переменных на удаленных узлах на nothing, как только они больше не требуются. Это освободит любую память, связанную с ними, в рамках обычного цикла сборки мусора.

Таким образом, программы должны быть осторожны при ссылках на глобальные переменные в удаленных вызовах. На самом деле, предпочтительнее избегать их вовсе, если это возможно. Если вам необходимо ссылаться на глобальные переменные, рассмотрите возможность использования блоков let для локализации глобальных переменных.

Например:

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name           size summary
––––––––– ––––––––– ––––––––––––––––––––––
A         800 bytes 10×10 Array{Float64,2}
Base                Module
Core                Module
Main                Module

Как видно, глобальная переменная A определена на рабочем узле 2, но B захвачена как локальная переменная, и, следовательно, связывание для B не существует на рабочем узле 2.

Parallel Map and Loops

К счастью, многие полезные параллельные вычисления не требуют перемещения данных. Общим примером является симуляция Монте-Карло, где несколько процессов могут одновременно обрабатывать независимые испытания симуляции. Мы можем использовать @spawnat для подбрасывания монет на двух процессах. Сначала напишите следующую функцию в count_heads.jl:

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

Функция count_heads просто складывает n случайных битов. Вот как мы можем провести несколько испытаний на двух машинах и сложить результаты:

julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

Этот пример демонстрирует мощный и часто используемый шаблон параллельного программирования. Многие итерации выполняются независимо в нескольких процессах, а затем их результаты объединяются с помощью некоторой функции. Процесс объединения называется редукцией, поскольку он, как правило, уменьшает ранг тензора: вектор чисел сводится к одному числу, или матрица сводится к одной строке или столбцу и т. д. В коде это обычно выглядит как шаблон x = f(x,v[i]), где x — это аккумулятор, f — функция редукции, а v[i] — это элементы, которые редуцируются. Желательно, чтобы f была ассоциативной, чтобы не имело значения, в каком порядке выполняются операции.

Обратите внимание, что наше использование этого шаблона с count_heads можно обобщить. Мы использовали два явных @spawnat оператора, что ограничивает параллелизм двумя процессами. Чтобы запустить на любом количестве процессов, мы можем использовать параллельный цикл for, работающий в распределенной памяти, который можно записать на Julia, используя @distributed следующим образом:

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

Этот конструкция реализует шаблон назначения итераций нескольким процессам и их объединения с указанным редукцией (в данном случае (+)). Результат каждой итерации принимается как значение последнего выражения внутри цикла. Все выражение параллельного цикла само по себе оценивается как окончательный ответ.

Обратите внимание, что хотя параллельные циклы for выглядят как последовательные циклы for, их поведение кардинально отличается. В частности, итерации не происходят в определенном порядке, и записи в переменные или массивы не будут глобально видимыми, поскольку итерации выполняются на разных процессах. Любые переменные, используемые внутри параллельного цикла, будут скопированы и переданы каждому процессу.

Например, следующий код не будет работать как задумано:

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

Этот код не инициализирует весь a, так как каждый процесс будет иметь отдельную его копию. Такие параллельные циклы for следует избегать. К счастью, Shared Arrays можно использовать, чтобы обойти это ограничение:

using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

Использование переменных "вне" в параллельных циклах вполне разумно, если переменные являются только для чтения:

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

Здесь на каждой итерации функция f применяется к случайно выбранному образцу из вектора a, который общий для всех процессов.

Как вы могли заметить, оператор редукции можно опустить, если он не нужен. В этом случае цикл выполняется асинхронно, т.е. он создает независимые задачи на всех доступных рабочих и сразу возвращает массив Future, не дожидаясь завершения. Вызывающий может дождаться завершения 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265 позже, вызвав fetch на них, или дождаться завершения в конце цикла, добавив перед ним @sync, как @sync @distributed for.

В некоторых случаях оператор редукции не нужен, и мы просто хотим применить функцию ко всем целым числам в некотором диапазоне (или, более общо, ко всем элементам в некоторой коллекции). Это еще одна полезная операция, называемая параллельным отображением, реализованная в Julia как функция pmap. Например, мы могли бы вычислить сингулярные значения нескольких больших случайных матриц параллельно следующим образом:

julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

Julia's pmap предназначен для случаев, когда каждый вызов функции выполняет большое количество работы. В отличие от этого, @distributed for может обрабатывать ситуации, когда каждая итерация мала, возможно, просто складывая два числа. Только рабочие процессы используются как 4d61726b646f776e2e436f64652822222c2022706d61702229_40726566, так и @distributed for для параллельных вычислений. В случае @distributed for окончательное сокращение выполняется на вызывающем процессе.

Remote References and AbstractChannels

Удаленные ссылки всегда ссылаются на реализацию AbstractChannel.

Конкретная реализация AbstractChannel (например, Channel) должна реализовать put!, take!, fetch, isready и wait. Удаленный объект, на который ссылается Future, хранится в Channel{Any}(1), т.е. в Channel размером 1, способном хранить объекты типа Any.

RemoteChannel, который можно переписать, может указывать на любой тип и размер каналов или любую другую реализацию AbstractChannel.

Конструктор RemoteChannel(f::Function, pid)() позволяет нам создавать ссылки на каналы, содержащие более одного значения определенного типа. f — это функция, выполняемая на pid, и она должна возвращать AbstractChannel.

Например, RemoteChannel(()->Channel{Int}(10), pid), вернет ссылку на канал типа Int и размера 10. Канал существует на рабочем pid.

Методы put!, take!, fetch, isready и wait на RemoteChannel проксируются на хранилище данных в удаленном процессе.

RemoteChannel таким образом может быть использован для ссылки на реализованные пользователем объекты AbstractChannel. Простой пример этого - следующий DictChannel, который использует словарь в качестве своего удаленного хранилища:

julia> struct DictChannel{T} <: AbstractChannel{T}
           d::Dict
           cond_take::Threads.Condition    # waiting for data to become available
           DictChannel{T}() where {T} = new(Dict(), Threads.Condition())
           DictChannel() = DictChannel{Any}()
       end

julia> begin
       function Base.put!(D::DictChannel, k, v)
           @lock D.cond_take begin
               D.d[k] = v
               notify(D.cond_take)
           end
           return D
       end
       function Base.take!(D::DictChannel, k)
           @lock D.cond_take begin
               v = fetch(D, k)
               delete!(D.d, k)
               return v
           end
       end
       Base.isready(D::DictChannel) = @lock D.cond_take !isempty(D.d)
       Base.isready(D::DictChannel, k) = @lock D.cond_take haskey(D.d, k)
       function Base.fetch(D::DictChannel, k)
           @lock D.cond_take begin
               wait(D, k)
               return D.d[k]
           end
       end
       function Base.wait(D::DictChannel, k)
           @lock D.cond_take begin
               while !isready(D, k)
                   wait(D.cond_take)
               end
           end
       end
       end;

julia> d = DictChannel();

julia> isready(d)
false

julia> put!(d, :k, :v);

julia> isready(d, :k)
true

julia> fetch(d, :k)
:v

julia> wait(d, :k)

julia> take!(d, :k)
:v

julia> isready(d, :k)
false

Channels and RemoteChannels

  • Channel локален для процесса. Рабочий 2 не может напрямую ссылаться на 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 на рабочем 3 и наоборот. Однако RemoteChannel может передавать и получать значения между рабочими.
  • A RemoteChannel можно рассматривать как идентификатор для Channel.
  • Идентификатор процесса, pid, связанный с RemoteChannel, определяет процесс, в котором существует резервное хранилище, т.е. резервный Channel.
  • Любой процесс с ссылкой на RemoteChannel может помещать и забирать предметы из канала. Данные автоматически отправляются (или извлекаются) из процесса, с которым ассоциирован 4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566.
  • Сериализация Channel также сериализует любые данные, присутствующие в канале. Десериализация, следовательно, фактически создает копию оригинального объекта.
  • С другой стороны, сериализация RemoteChannel включает только сериализацию идентификатора, который определяет местоположение и экземпляр Channel, на который ссылается дескриптор. Таким образом, десериализованный объект 4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566 (на любом рабочем узле) также указывает на тот же резервный хранилище, что и оригинал.

Пример каналов, приведенный выше, можно модифицировать для межпроцессного взаимодействия, как показано ниже.

Мы запускаем 4 рабочих для обработки одного удаленного канала jobs. Задания, идентифицируемые по id (job_id), записываются в канал. Каждая удаленно выполняемая задача в этой симуляции считывает job_id, ждет случайное количество времени и записывает обратно кортеж из job_id, затраченного времени и своего собственного pid в канал результатов. В конце все results выводятся на мастер-процесс.

julia> addprocs(4); # add worker processes

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia> @everywhere function do_work(jobs, results) # define work function everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))
           end
       end

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs

julia> for p in workers() # start tasks on the workers to process requests in parallel
           remote_do(do_work, p, jobs, results)
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741

Remote References and Distributed Garbage Collection

Объекты, на которые ссылаются удаленные ссылки, могут быть освобождены только тогда, когда все удерживаемые ссылки в кластере удалены.

Узел, в котором хранится значение, отслеживает, какие из рабочих имеют на него ссылку. Каждый раз, когда RemoteChannel или (не загруженный) Future сериализуется для рабочего, узел, на который указывает ссылка, получает уведомление. И каждый раз, когда 4d61726b646f776e2e436f64652822222c202252656d6f74654368616e6e656c2229_40726566 или (не загруженный) 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265 собирается мусорщиком локально, узел, владеющий значением, снова получает уведомление. Это реализовано в внутреннем сериализаторе, учитывающем кластер. Удаленные ссылки действительны только в контексте работающего кластера. Сериализация и десериализация ссылок на и из обычных IO объектов не поддерживается.

Уведомления выполняются путем отправки сообщений "отслеживания" – сообщения "добавить ссылку", когда ссылка сериализуется в другой процесс, и сообщения "удалить ссылку", когда ссылка локально собирается мусором.

Поскольку Future являются одноразовыми и кэшируются локально, действие fetch записи 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265 также обновляет информацию отслеживания ссылок на узле, владеющем значением.

Узел, который владеет значением, освобождает его, как только все ссылки на него очищены.

С помощью Future, сериализация уже полученного 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265 на другой узел также отправляет значение, так как оригинальный удаленный хранилище могло собрать значение к этому времени.

Важно отметить, что когда объект будет локально собран сборщиком мусора, зависит от размера объекта и текущего давления на память в системе.

В случае удаленных ссылок размер локального объекта ссылки довольно мал, в то время как значение, хранящееся на удаленном узле, может быть довольно большим. Поскольку локальный объект может не быть собран немедленно, хорошей практикой является явный вызов finalize на локальных экземплярах RemoteChannel, или на невыгруженных Future. Поскольку вызов fetch на 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265 также удаляет его ссылку из удаленного хранилища, это не требуется для выгруженных 4d61726b646f776e2e436f64652822222c20224675747572652229_407265662044697374726962757465642e467574757265. Явный вызов 4d61726b646f776e2e436f64652822222c202266696e616c697a652229_40726566 приводит к немедленному отправлению сообщения на удаленный узел с просьбой удалить его ссылку на значение.

После завершения ссылка становится недействительной и не может быть использована в дальнейших вызовах.

Local invocations

Данные обязательно копируются на удаленный узел для выполнения. Это касается как удаленных вызовов, так и случаев, когда данные хранятся в RemoteChannel / Future на другом узле. Как и ожидалось, это приводит к созданию копии сериализованных объектов на удаленном узле. Однако, когда целевой узел является локальным узлом, т.е. идентификатор вызывающего процесса совпадает с идентификатором удаленного узла, он выполняется как локальный вызов. Обычно (но не всегда) он выполняется в другой задаче - но сериализация/десериализация данных не происходит. Следовательно, вызов ссылается на те же экземпляры объектов, что и переданные - копии не создаются. Это поведение подчеркивается ниже:

julia> using Distributed;

julia> rc = RemoteChannel(()->Channel(3));   # RemoteChannel created on local node

julia> v = [0];

julia> for i in 1:3
           v[1] = i                          # Reusing `v`
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[3], [3], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1

julia> addprocs(1);

julia> rc = RemoteChannel(()->Channel(3), workers()[1]);   # RemoteChannel created on remote node

julia> v = [0];

julia> for i in 1:3
           v[1] = i
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[1], [2], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3

Как видно, put! на локально принадлежащем RemoteChannel с тем же объектом v, измененным между вызовами, приводит к тому, что хранится один и тот же экземпляр объекта. В отличие от случаев, когда копии v создаются, если узел, владеющий rc, является другим узлом.

Следует отметить, что это, как правило, не является проблемой. Это следует учитывать только в том случае, если объект хранится локально и изменяется после вызова. В таких случаях может быть целесообразно сохранить deepcopy объекта.

Это также верно для удаленных вызовов на локальном узле, как видно в следующем примере:

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);     # Executed on local node

julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # Executed on remote node

julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false

Как можно снова увидеть, удаленный вызов на локальном узле ведет себя так же, как и прямой вызов. Вызов изменяет локальные объекты, переданные в качестве аргументов. В удаленном вызове он работает с копией аргументов.

Чтобы повторить, в общем это не является проблемой. Если локальный узел также используется как вычислительный узел, и аргументы, используемые после вызова, необходимо учитывать это поведение, и, если требуется, глубокие копии аргументов должны быть переданы в вызов, инициируемый на локальном узле. Вызовы на удаленных узлах всегда будут работать с копиями аргументов.

Shared Arrays

Shared Arrays используют системную общую память для отображения одного и того же массива в нескольких процессах. SharedArray является хорошим выбором, когда вы хотите иметь большой объем данных, совместно доступных для двух или более процессов на одной машине. Поддержка Shared Array доступна через модуль SharedArrays, который необходимо явно загрузить на всех участвующих рабочих.

Дополнительная структура данных предоставляется внешним пакетом DistributedArrays.jl в форме DArray. Хотя есть некоторые сходства с SharedArray, поведение DArray довольно отличается. В 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566 каждый "участвующий" процесс имеет доступ ко всему массиву; в отличие от этого, в 4d61726b646f776e2e436f64652822222c20224441727261792229_68747470733a2f2f6769746875622e636f6d2f4a756c6961506172616c6c656c2f44697374726962757465644172726179732e6a6c каждый процесс имеет локальный доступ только к части данных, и ни два процесса не делят одну и ту же часть.

SharedArray индексация (назначение и доступ к значениям) работает так же, как и с обычными массивами, и эффективна, потому что базовая память доступна локальному процессу. Поэтому большинство алгоритмов естественно работают с 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566, хотя в однопроцессорном режиме. В случаях, когда алгоритм настаивает на входе Array, базовый массив можно получить из 4d61726b646f776e2e436f64652822222c202253686172656441727261792229_40726566, вызвав sdata. Для других типов AbstractArray 4d61726b646f776e2e436f64652822222c202273646174612229_40726566 просто возвращает сам объект, поэтому безопасно использовать 4d61726b646f776e2e436f64652822222c202273646174612229_40726566 для любого объекта типа Array.

Конструктор для общего массива имеет следующую форму:

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

который создает N-мерный общий массив типа T и размера dims для процессов, указанных в pids. В отличие от распределенных массивов, общий массив доступен только для тех рабочих процессов, которые указаны в аргументе pids (и для создающего процесса, если он находится на том же хосте). Обратите внимание, что в SharedArray поддерживаются только элементы, которые являются isbits.

Если указана функция init с сигнатурой initfn(S::SharedArray), она вызывается на всех участвующих рабочих. Вы можете указать, что каждый рабочий выполняет функцию init на отдельной части массива, тем самым параллелизируя инициализацию.

Вот краткий пример:

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

SharedArrays.localindices предоставляет разрозненные одномерные диапазоны индексов и иногда удобно для разделения задач между процессами. Вы, конечно, можете разделить работу любым способом, который вам нравится:

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

Поскольку все процессы имеют доступ к основным данным, вам нужно быть осторожным, чтобы не создать конфликты. Например:

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

это приведет к неопределенному поведению. Поскольку каждый процесс заполняет весь массив своим pid, тот процесс, который последним выполнит (для любого конкретного элемента S), сохранит свой pid.

В качестве более расширенного и сложного примера рассмотрим выполнение следующего "ядра" параллельно:

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

В этом случае, если мы попытаемся разделить работу, используя одномерный индекс, мы, вероятно, столкнемся с проблемами: если q[i,j,t] находится вблизи конца блока, назначенного одному работнику, а q[i,j,t+1] находится вблизи начала блока, назначенного другому, то очень вероятно, что q[i,j,t] не будет готово в момент, когда оно потребуется для вычисления q[i,j,t+1]. В таких случаях лучше вручную разбить массив на части. Давайте разделим по второму измерению. Определите функцию, которая возвращает индексы (irange, jrange), назначенные этому работнику:

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0 # This worker is not assigned a piece
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

Далее определите ядро:

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # display so we can see what's happening
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

Мы также определяем удобную обертку для реализации SharedArray

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

Теперь давайте сравним три разные версии, одна из которых работает в одном процессе:

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

один, который использует @distributed:

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

и один, который делегирует по частям:

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

Если мы создадим SharedArray и замерим время выполнения этих функций, мы получим следующие результаты (с julia -p 4):

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

Запустите функции один раз для JIT-компиляции и @time их на втором запуске:

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

Главное преимущество advection_shared! заключается в том, что оно минимизирует трафик между рабочими, позволяя каждому работать в течение длительного времени над назначенной частью.

Shared Arrays and Distributed Garbage Collection

Как и удаленные ссылки, общие массивы также зависят от сборки мусора на создающем узле для освобождения ссылок от всех участвующих рабочих. Код, который создает множество краткоживущих объектов общих массивов, выиграет от явного завершения этих объектов как можно скорее. Это приведет к более раннему освобождению как памяти, так и дескрипторов файлов, сопоставляющих общий сегмент.

ClusterManagers

Запуск, управление и сетевое взаимодействие процессов Julia в логическом кластере осуществляется через менеджеры кластеров. ClusterManager отвечает за

  • запуск рабочих процессов в кластерной среде
  • управление событиями в течение жизни каждого работника
  • опционально, предоставляя транспорт данных

Кластер Julia имеет следующие характеристики:

  • Начальный процесс Julia, также называемый master, является специальным и имеет id 1.
  • Только процесс master может добавлять или удалять рабочие процессы.
  • Все процессы могут напрямую общаться друг с другом.

Соединения между рабочими (с использованием встроенного транспортного протокола TCP/IP) устанавливаются следующим образом:

  • addprocs вызывается в главном процессе с объектом ClusterManager.
  • addprocs вызывает соответствующий метод launch, который создает необходимое количество рабочих процессов на соответствующих машинах.
  • Каждый работник начинает прослушивание на свободном порту и записывает информацию о своем хосте и порте в stdout.
  • Менеджер кластера захватывает stdout каждого рабочего процесса и делает его доступным для главного процесса.
  • Мастер-процесс анализирует эту информацию и устанавливает TCP/IP соединения с каждым рабочим.
  • Каждый работник также уведомляется о других работниках в кластере.
  • Каждый работник подключается ко всем работникам, чей id меньше, чем его собственный id.
  • Таким образом, создается сетевое соединение, в котором каждый работник напрямую соединен с каждым другим работником.

Хотя по умолчанию транспортный уровень использует простой TCPSocket, кластер Julia может предоставить свой собственный транспорт.

Julia предоставляет два встроенных менеджера кластеров:

LocalManager используется для запуска дополнительных рабочих процессов на том же хосте, тем самым используя многоядерное и многопроцессорное оборудование.

Таким образом, минимальный менеджер кластера должен:

  • быть подтипом абстрактного ClusterManager
  • реализовать launch, метод, ответственный за запуск новых работников
  • реализовать manage, который вызывается на различных событиях в течение жизни работника (например, отправка сигнала прерывания)

addprocs(manager::FooManager) требует, чтобы FooManager реализовал:

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

В качестве примера давайте посмотрим, как реализован LocalManager, менеджер, ответственный за запуск рабочих процессов на одном хосте:

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

Метод launch принимает следующие аргументы:

  • manager::ClusterManager: кластерный менеджер, с которым вызывается addprocs
  • params::Dict: все именованные аргументы, переданные в addprocs
  • launched::Array: массив, в который нужно добавить один или несколько объектов WorkerConfig
  • c::Condition: переменная условия, которая будет уведомлена по мере запуска рабочих процессов

Метод launch вызывается асинхронно в отдельной задаче. Завершение этой задачи сигнализирует о том, что все запрошенные рабочие запущены. Следовательно, функция 4d61726b646f776e2e436f64652822222c20226c61756e63682229_40726566 ДОЛЖНА завершиться, как только все запрошенные рабочие будут запущены.

Новые запущенные рабочие соединены друг с другом и с мастер-процессом в режиме "все ко всем". Указание аргумента командной строки --worker[=<cookie>] приводит к тому, что запущенные процессы инициализируют себя как рабочие, и соединения устанавливаются через сокеты TCP/IP.

Все работники в кластере используют тот же cookie, что и мастер. Когда куки не указаны, т.е. с опцией --worker, работник пытается прочитать его из своего стандартного ввода. LocalManager и SSHManager оба передают куки вновь запущенным работникам через их стандартные вводы.

По умолчанию рабочий процесс будет слушать на свободном порту по адресу, возвращаемому вызовом getipaddr(). Конкретный адрес для прослушивания может быть указан с помощью необязательного аргумента --bind-to bind_addr[:port]. Это полезно для многопортовых хостов.

В качестве примера транспорта, не основанного на TCP/IP, реализация может выбрать использование MPI, в этом случае --worker НЕ должен быть указан. Вместо этого вновь запущенные рабочие процессы должны вызывать init_worker(cookie) перед использованием любых параллельных конструкций.

Для каждого запущенного работника метод launch должен добавлять объект WorkerConfig (с инициализированными соответствующими полями) в launched

mutable struct WorkerConfig
    # Common fields relevant to all cluster managers
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # Used when launching additional workers at a host
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # External cluster managers can use this to store information at a per-worker level
    # Can be a dict if multiple fields need to be stored.
    userdata::Any

    # SSHManager / SSH tunnel connections to workers
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Used by Local/SSH managers
    connect_at::Any

    [...]
end

Большинство полей в WorkerConfig используются встроенными менеджерами. Пользовательские менеджеры кластера обычно указывают только io или host / port:

  • Если указано io, оно используется для чтения информации о хосте/порту. Рабочий процесс Julia выводит свой адрес привязки и порт при запуске. Это позволяет рабочим процессам Julia слушать на любом свободном порту, доступном вместо того, чтобы требовать ручной настройки портов рабочих процессов.

  • Если io не указан, используются host и port для подключения.

  • count, exename и exeflags имеют значение для запуска дополнительных рабочих процессов из рабочего процесса. Например, менеджер кластера может запустить один рабочий процесс на узел и использовать его для запуска дополнительных рабочих процессов.

    • count с целочисленным значением n запустит в общей сложности n рабочих.
    • count со значением :auto запустит столько рабочих процессов, сколько потоков ЦП (логических ядер) на этом компьютере.
    • exename — это имя исполняемого файла julia, включая полный путь.
    • exeflags должны быть установлены на необходимые аргументы командной строки для новых рабочих.
  • tunnel, bind_addr, sshflags и max_parallel используются, когда требуется ssh-туннель для подключения к рабочим процессам из главного процесса.

  • userdata предоставляется для пользовательских менеджеров кластеров, чтобы хранить собственную информацию, специфичную для рабочих узлов.

manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol) вызывается в разное время в течение жизни рабочего с соответствующими значениями op:

  • с :register/:deregister, когда рабочий добавляется / удаляется из пула рабочих Julia.
  • с :interrupt, когда вызывается interrupt(workers). ClusterManager должен послать соответствующему рабочему сигнал прерывания.
  • с :finalize для целей очистки.

Cluster Managers with Custom Transports

Замена стандартных соединений сокетов TCP/IP на пользовательский транспортный уровень немного сложнее. Каждый процесс Julia имеет столько же задач по коммуникации, сколько рабочих, к которым он подключен. Например, рассмотрим кластер Julia из 32 процессов в сети "все ко всем":

  • Каждый процесс Julia имеет 31 задачу по коммуникации.
  • Каждая задача обрабатывает все входящие сообщения от одного удаленного работника в цикле обработки сообщений.
  • Цикл обработки сообщений ожидает объект IO (например, TCPSocket в стандартной реализации), считывает целое сообщение, обрабатывает его и ожидает следующего.
  • Отправка сообщений в процесс осуществляется напрямую из любой задачи Julia — не только из задач связи — снова через соответствующий объект IO.

Замена стандартного транспорта требует от новой реализации настройки соединений с удаленными рабочими и предоставления соответствующих объектов IO, на которые могут ожидать циклы обработки сообщений. Специфические для менеджера обратные вызовы, которые необходимо реализовать, это:

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

Стандартная реализация (которая использует сокеты TCP/IP) реализована как connect(manager::ClusterManager, pid::Integer, config::WorkerConfig).

connect должен возвращать пару объектов IO, один для чтения данных, отправленных от рабочего pid, и другой для записи данных, которые необходимо отправить рабочему pid. Пользовательские менеджеры кластеров могут использовать поток BufferStream в памяти в качестве соединения для проксирования данных между пользовательским, возможно, не-IO транспортом и встроенной параллельной инфраструктурой Julia.

BufferStream — это поток в памяти IOBuffer, который ведет себя как IO — это поток, который можно обрабатывать асинхронно.

Папка clustermanager/0mq в Examples repository содержит пример использования ZeroMQ для подключения рабочих процессов Julia в звездной топологии с брокером 0MQ посередине. Примечание: Процессы Julia по-прежнему логически связаны друг с другом — любой рабочий процесс может отправить сообщение любому другому рабочему процессу напрямую, не осознавая, что 0MQ используется в качестве транспортного слоя.

При использовании пользовательских транспортов:

  • Рабочие процессы Julia не должны запускаться с --worker. Запуск с --worker приведет к тому, что вновь запущенные рабочие процессы будут по умолчанию использовать реализацию транспортного протокола TCP/IP.
  • Для каждого входящего логического соединения с рабочим Base.process_messages(rd::IO, wr::IO)() должен быть вызван. Это запускает новую задачу, которая обрабатывает чтение и запись сообщений от/к рабочему, представленному объектами IO.
  • init_worker(cookie, manager::FooManager) должен быть вызван в рамках инициализации рабочего процесса.
  • Поле connect_at::Any в WorkerConfig может быть установлено менеджером кластера, когда вызывается launch. Значение этого поля передается во всех connect обратных вызовах. Обычно оно содержит информацию о том, как подключиться к рабочему. Например, транспорт TCP/IP использует это поле для указания кортежа (host, port), по которому следует подключиться к рабочему.

kill(manager, pid, config) вызывается для удаления рабочего из кластера. В главном процессе соответствующие объекты IO должны быть закрыты реализацией для обеспечения правильной очистки. Стандартная реализация просто выполняет вызов exit() на указанном удаленном рабочем.

Папка Examples clustermanager/simple является примером, который демонстрирует простую реализацию с использованием сокетов домена UNIX для настройки кластера.

Network Requirements for LocalManager and SSHManager

Кластеры Julia предназначены для выполнения в уже защищенных средах на инфраструктуре, такой как локальные ноутбуки, департаментские кластеры или даже в облаке. Этот раздел охватывает требования к сетевой безопасности для встроенных LocalManager и SSHManager:

  • Мастер-процесс не слушает ни на одном порту. Он только подключается к рабочим процессам.

  • Каждый рабочий процесс привязывается только к одному из локальных интерфейсов и слушает на временном номере порта, назначенном операционной системой.

  • LocalManager, используемый в addprocs(N), по умолчанию связывается только с интерфейсом обратной связи. Это означает, что рабочие процессы, запущенные позже на удаленных хостах (или кем-либо с злонамеренными намерениями), не могут подключиться к кластеру. Вызов addprocs(4), за которым следует addprocs(["remote_host"]), завершится неудачей. Некоторые пользователи могут захотеть создать кластер, состоящий из их локальной системы и нескольких удаленных систем. Это можно сделать, явно запросив LocalManager связаться с внешним сетевым интерфейсом через аргумент ключевого слова restrict: addprocs(4; restrict=false).

  • SSHManager, используемый в addprocs(list_of_remote_hosts), запускает рабочие процессы на удаленных хостах через SSH. По умолчанию SSH используется только для запуска рабочих процессов Julia. Последующие соединения между мастером и рабочими процессами, а также между рабочими процессами используют обычные, незащищенные TCP/IP сокеты. Удаленные хосты должны иметь включенный вход без пароля. Дополнительные флаги SSH или учетные данные могут быть указаны через аргумент ключевого слова sshflags.

  • addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>) полезен, когда мы хотим использовать SSH-соединения для мастера-работника. Типичный сценарий для этого - локальный ноутбук, работающий с Julia REPL (т.е. мастер), с остальной частью кластера в облаке, скажем, на Amazon EC2. В этом случае необходимо открыть только порт 22 на удаленном кластере в сочетании с клиентом SSH, аутентифицированным через инфраструктуру открытых ключей (PKI). Учетные данные для аутентификации могут быть предоставлены через sshflags, например sshflags=`-i <keyfile>`.

    В топологии "все к всем" (по умолчанию) все рабочие узлы соединяются друг с другом через обычные TCP-сокеты. Политика безопасности на узлах кластера должна обеспечивать свободное соединение между рабочими узлами для диапазона временных портов (варьируется в зависимости от ОС).

    Защита и шифрование всего трафика между рабочими узлами (через SSH) или шифрование отдельных сообщений может быть выполнено с помощью пользовательского ClusterManager.

  • Если вы укажете multiplex=true в качестве параметра для addprocs, будет использовано SSH мультиплексирование для создания туннеля между мастером и рабочими. Если вы настроили SSH мультиплексирование самостоятельно и соединение уже установлено, SSH мультиплексирование будет использоваться независимо от параметра multiplex. Если мультиплексирование включено, пересылка устанавливается с использованием существующего соединения (параметр -O forward в ssh). Это полезно, если ваши серверы требуют аутентификации по паролю; вы можете избежать аутентификации в Julia, войдя на сервер заранее перед 4d61726b646f776e2e436f64652822222c202261646470726f63732229_40726566. Управляющий сокет будет находиться по адресу ~/.ssh/julia-%r@%h:%p в течение сессии, если не используется существующее соединение мультиплексирования. Обратите внимание, что пропускная способность может быть ограничена, если вы создаете несколько процессов на узле и включаете мультиплексирование, потому что в этом случае процессы делят одно TCP соединение мультиплексирования.

Все процессы в кластере используют одно и то же cookie, который по умолчанию является случайно сгенерированной строкой в главном процессе:

  • cluster_cookie() возвращает куки, в то время как cluster_cookie(cookie)() устанавливает их и возвращает новые куки.
  • Все соединения аутентифицированы с обеих сторон, чтобы гарантировать, что только рабочие процессы, запущенные мастером, могут подключаться друг к другу.
  • Куки могут быть переданы рабочим при запуске через аргумент --worker=<cookie>. Если аргумент --worker указан без куки, рабочий пытается прочитать куки из своего стандартного ввода (stdin). Стандартный ввод закрывается сразу после получения куки.
  • ClusterManagers могут получить куки на мастере, вызвав cluster_cookie(). Менеджеры кластеров, не использующие транспорт по умолчанию TCP/IP (и, следовательно, не указывающие --worker), должны вызвать init_worker(cookie, manager) с тем же куки, что и на мастере.

Обратите внимание, что среды, требующие более высокого уровня безопасности, могут реализовать это с помощью пользовательского ClusterManager. Например, куки могут быть предварительно поделены и, следовательно, не указаны в качестве аргумента запуска.

Specifying Network Topology (Experimental)

Ключевой аргумент topology, переданный в addprocs, используется для указания того, как работники должны быть связаны друг с другом:

  • :all_to_all, по умолчанию: все работники соединены друг с другом.
  • :master_worker: только процесс драйвера, т.е. pid 1, имеет соединения с рабочими.
  • :custom: метод launch менеджера кластера указывает топологию соединения через поля ident и connect_idents в WorkerConfig. Рабочий с идентичностью ident, предоставленной менеджером кластера, будет подключаться ко всем рабочим, указанным в connect_idents.

Ключевой аргумент lazy=true|false влияет только на опцию topology :all_to_all. Если true, кластер начинает с того, что мастер подключен ко всем рабочим. Конкретные соединения между рабочими устанавливаются при первом удаленном вызове между двумя рабочими. Это помогает сократить начальные ресурсы, выделяемые для внутрикластерной связи. Соединения настраиваются в зависимости от требований времени выполнения параллельной программы. Значение по умолчанию для lazytrue.

В настоящее время отправка сообщения между несвязанными рабочими приводит к ошибке. Это поведение, как и функциональность и интерфейс, следует считать экспериментальным по своей природе и оно может измениться в будущих релизах.

Noteworthy external packages

Вне параллелизма Julia существует множество внешних пакетов, которые следует упомянуть. Например, MPI.jl является оберткой Julia для протокола MPI, Dagger.jl предоставляет функциональность, аналогичную Dask на Python, а DistributedArrays.jl предоставляет операции с массивами, распределенные между рабочими, как outlined above.

Необходимо упомянуть экосистему программирования на GPU Julia, которая включает:

  1. CUDA.jl оборачивает различные библиотеки CUDA и поддерживает компиляцию ядер Julia для графических процессоров Nvidia.
  2. oneAPI.jl оборачивает унифицированную модель программирования oneAPI и поддерживает выполнение ядер Julia на поддерживаемых ускорителях. В настоящее время поддерживается только Linux.
  3. AMDGPU.jl оборачивает библиотеки AMD ROCm и поддерживает компиляцию ядер Julia для графических процессоров AMD. В настоящее время поддерживается только Linux.
  4. Библиотеки высокого уровня, такие как KernelAbstractions.jl, Tullio.jl и ArrayFire.jl.

В следующем примере мы будем использовать как DistributedArrays.jl, так и CUDA.jl, чтобы распределить массив между несколькими процессами, сначала преобразовав его с помощью distribute() и CuArray().

Помните, когда импортируете DistributedArrays.jl, чтобы импортировать его на всех процессах, используя @everywhere

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CUDA

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C .≈ 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

julia> all(dC .≈ 4*π)
true

julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}

julia> cuB = CuArray(B);

julia> cuA = CuArray(A);

julia> cuC = 2 .* cuA ./ cuB;

julia> all(cuC .≈ 4*π);
true

julia> typeof(cuC)
CuArray{Float64,1}

В следующем примере мы будем использовать как DistributedArrays.jl, так и CUDA.jl, чтобы распределить массив между несколькими процессами и вызвать на нем универсальную функцию.

function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # or  (M*v) ./ v
end

power_method повторно создает новый вектор и нормализует его. Мы не указали никакой типовой сигнатуры в объявлении функции, давайте посмотрим, будет ли это работать с вышеупомянутыми типами данных:

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}

Чтобы завершить это краткое знакомство с внешними пакетами, мы можем рассмотреть MPI.jl, обертку Julia для протокола MPI. Поскольку было бы слишком долго рассматривать каждую внутреннюю функцию, лучше просто оценить подход, использованный для реализации протокола.

Рассмотрим этот игрушечный скрипт, который просто вызывает каждый подпроцесс, создает его ранг, а когда достигается главный процесс, выполняет сумму рангов.

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl
  • 1In this context, MPI refers to the MPI-1 standard. Beginning with MPI-2, the MPI standards committee introduced a new set of communication mechanisms, collectively referred to as Remote Memory Access (RMA). The motivation for adding rma to the MPI standard was to facilitate one-sided communication patterns. For additional information on the latest MPI standard, see https://mpi-forum.org/docs.