Distributed Computing
Distributed
— ModuleИнструменты для распределенной параллельной обработки.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> Список идентификаторов процессов
Запускает рабочие процессы через указанный кластерный менеджер.
Например, кластеры Beowulf поддерживаются через пользовательский кластерный менеджер, реализованный в пакете ClusterManagers.jl
.
Количество секунд, в течение которых вновь запущенный рабочий процесс ждет установления соединения с мастером, можно указать через переменную JULIA_WORKER_TIMEOUT
в окружении рабочего процесса. Это имеет значение только при использовании TCP/IP в качестве транспорта.
Чтобы запустить рабочих процессов без блокировки REPL или содержащей функции, если рабочие процессы запускаются программно, выполните addprocs
в своей задаче.
Примеры
# В загруженных кластерах вызывайте `addprocs` асинхронно
t = @async addprocs(...)
# Используйте рабочих, когда они выходят в онлайн
if nprocs() > 1 # Убедитесь, что доступен хотя бы один новый рабочий процесс
.... # выполните распределенное выполнение
end
# Получите идентификаторы вновь запущенных рабочих процессов или любые сообщения об ошибках
if istaskdone(t) # Проверьте, завершился ли `addprocs`, чтобы убедиться, что `fetch` не блокирует
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Список идентификаторов процессов
Добавьте рабочие процессы на удаленных машинах через SSH. Конфигурация выполняется с помощью именованных аргументов (см. ниже). В частности, ключевое слово exename
можно использовать для указания пути к бинарному файлу julia
на удаленной машине(ах).
machines
— это вектор "спецификаций машин", которые задаются в виде строк формата [user@]host[:port] [bind_addr[:port]]
. user
по умолчанию равен текущему пользователю, а port
— стандартному порту SSH. Если указано [bind_addr[:port]]
, другие рабочие процессы будут подключаться к этому рабочему процессу по указанному bind_addr
и port
.
Возможно запустить несколько процессов на удаленном хосте, используя кортеж в векторе machines
или форму (machine_spec, count)
, где count
— это количество рабочих процессов, которые будут запущены на указанном хосте. Передача :auto
в качестве количества рабочих процессов запустит столько рабочих процессов, сколько потоков ЦП на удаленном хосте.
Примеры:
addprocs([
"remote1", # один рабочий процесс на 'remote1', входящий с текущим именем пользователя
"user@remote2", # один рабочий процесс на 'remote2', входящий с именем пользователя 'user'
"user@remote3:2222", # указание порта SSH на '2222' для 'remote3'
("user@remote4", 4), # запустить 4 рабочих процесса на 'remote4'
("user@remote5", :auto), # запустить столько рабочих процессов, сколько потоков ЦП на 'remote5'
])
Именованные аргументы:
tunnel
: еслиtrue
, то будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. По умолчаниюfalse
.multiplex
: еслиtrue
, то для SSH-туннелирования используется SSH-мультиплексирование. По умолчаниюfalse
.ssh
: имя или путь к исполняемому файлу SSH-клиента, используемого для запуска рабочих процессов. По умолчанию"ssh"
.sshflags
: указывает дополнительные параметры ssh, напримерsshflags=`-i /home/foo/bar.pem`
max_parallel
: указывает максимальное количество рабочих процессов, подключенных параллельно на хосте. По умолчанию 10.shell
: указывает тип оболочки, к которой ssh подключается на рабочих процессах.shell=:posix
: совместимая с POSIX оболочка Unix/Linux (sh, ksh, bash, dash, zsh и т. д.). По умолчанию.shell=:csh
: оболочка C Unix (csh, tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: указывает рабочий каталог на рабочих процессах. По умолчанию текущий каталог хоста (как найдено с помощьюpwd()
)enable_threaded_blas
: еслиtrue
, то BLAS будет работать на нескольких потоках в добавленных процессах. По умолчаниюfalse
.exename
: имя исполняемого файлаjulia
. По умолчанию"$(Sys.BINDIR)/julia"
или"$(Sys.BINDIR)/julia-debug"
в зависимости от случая. Рекомендуется использовать одну и ту же версию Julia на всех удаленных машинах, так как сериализация и распределение кода могут не сработать в противном случае.exeflags
: дополнительные флаги, передаваемые рабочим процессам.topology
: указывает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами приводит к ошибке.topology=:all_to_all
: Все процессы подключены друг к другу. По умолчанию.topology=:master_worker
: Только главный процесс, т.е.pid
1, подключается к рабочим процессам. Рабочие процессы не подключаются друг к другу.topology=:custom
: Методlaunch
менеджера кластера указывает топологию подключения через поляident
иconnect_idents
вWorkerConfig
. Рабочий процесс с идентичностью менеджера кластераident
подключится ко всем рабочим процессам, указанным вconnect_idents
.
lazy
: Применимо только сtopology=:all_to_all
. Еслиtrue
, соединения между рабочими процессами устанавливаются лениво, т.е. они устанавливаются при первом вызове удаленного вызова между рабочими процессами. По умолчанию true.env
: предоставьте массив пар строк, таких какenv=["JULIA_DEPOT_PATH"=>"/depot"]
, чтобы запросить установку переменных окружения на удаленной машине. По умолчанию только переменная окруженияJULIA_WORKER_TIMEOUT
передается автоматически из локальной среды в удаленную.cmdline_cookie
: передайте аутентификационное cookie через параметр командной строки--worker
. (более безопасное) поведение по умолчанию передачи cookie через ssh stdio может зависнуть с рабочими процессами Windows, которые используют более старые (предшествующие ConPTY) версии Julia или Windows, в этом случаеcmdline_cookie=true
предлагает обходной путь.
Именованные аргументы ssh
, shell
, env
и cmdline_cookie
были добавлены в Julia 1.6.
Переменные окружения:
Если главный процесс не может установить соединение с вновь запущенным рабочим процессом в течение 60.0 секунд, рабочий процесс рассматривает это как фатальную ситуацию и завершает работу. Этот тайм-аут можно контролировать с помощью переменной окружения JULIA_WORKER_TIMEOUT
. Значение JULIA_WORKER_TIMEOUT
на главном процессе указывает количество секунд, в течение которых вновь запущенный рабочий процесс ждет установления соединения.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Список идентификаторов процессов
Запустите np
рабочих на локальном хосте, используя встроенный LocalManager
.
Локальные рабочие наследуют текущее окружение пакета (т.е. активный проект, LOAD_PATH
и DEPOT_PATH
) от основного процесса.
Обратите внимание, что рабочие не выполняют скрипт инициализации ~/.julia/config/startup.jl
, и они не синхронизируют свое глобальное состояние (такие как параметры командной строки, глобальные переменные, определения новых методов и загруженные модули) с любыми другими запущенными процессами.
Ключевые аргументы:
restrict::Bool
: еслиtrue
(по умолчанию) привязка ограничена127.0.0.1
.dir
,exename
,exeflags
,env
,topology
,lazy
,enable_threaded_blas
: тот же эффект, что и дляSSHManager
, см. документацию дляaddprocs(machines::AbstractVector)
.
Наследование окружения пакета и ключевой аргумент env
были добавлены в Julia 1.9.
Distributed.nprocs
— Functionnprocs()
Получите количество доступных процессов.
Примеры
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
Получите количество доступных рабочих процессов. Это на единицу меньше, чем nprocs()
. Равно nprocs()
, если nprocs() == 1
.
Примеры
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
Возвращает список всех идентификаторов процессов, включая pid 1 (который не включен в workers()
).
Примеры
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
Возвращает список всех идентификаторов процессов на том же физическом узле. В частности, возвращаются все рабочие процессы, привязанные к тому же IP-адресу, что и pid
.
Distributed.workers
— Functionworkers()
Возвращает список всех идентификаторов рабочих процессов.
Примеры
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
Удалите указанных работников. Обратите внимание, что только процесс 1 может добавлять или удалять работников.
Аргумент waitfor
указывает, как долго ждать завершения работы работников:
- Если не указано,
rmprocs
будет ждать, пока все запрашиваемыеpids
не будут удалены. - Исключение
ErrorException
будет вызвано, если все работники не могут быть завершены до истечения запрашиваемых секундwaitfor
. - При значении
waitfor
равном 0, вызов возвращается немедленно с работниками, запланированными на удаление в другой задаче. Запланированный объектTask
возвращается. Пользователь должен вызватьwait
на задаче перед вызовом любых других параллельных вызовов.
Примеры
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
Прерывает текущую выполняемую задачу на указанных рабочих узлах. Это эквивалентно нажатию Ctrl-C на локальной машине. Если аргументы не указаны, все рабочие узлы прерываются.
interrupt(pids::AbstractVector=workers())
Прерывает текущую выполняемую задачу на указанных рабочих. Это эквивалентно нажатию Ctrl-C на локальной машине. Если аргументы не указаны, все рабочие прерываются.
Distributed.myid
— Functionmyid()
Получите идентификатор текущего процесса.
Примеры
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
Преобразуйте коллекцию c
, применяя f
к каждому элементу с использованием доступных рабочих процессов и задач.
Для нескольких аргументов коллекции применяйте f
поэлементно.
Обратите внимание, что f
должен быть доступен всем рабочим процессам; смотрите Доступность кода и загрузка пакетов для получения подробной информации.
Если пул рабочих процессов не указан, будут использованы все доступные рабочие процессы через CachingPool
.
По умолчанию pmap
распределяет вычисления между всеми указанными рабочими процессами. Чтобы использовать только локальный процесс и распределить вычисления по задачам, укажите distributed=false
. Это эквивалентно использованию asyncmap
. Например, pmap(f, c; distributed=false)
эквивалентно asyncmap(f,c; ntasks=()->nworkers())
pmap
также может использовать смесь процессов и задач через аргумент batch_size
. Для размеров пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size
или меньше. Пакет отправляется как единый запрос к свободному рабочему процессу, где локальный asyncmap
обрабатывает элементы из пакета, используя несколько параллельных задач.
Любая ошибка останавливает pmap
от обработки оставшейся части коллекции. Чтобы переопределить это поведение, вы можете указать функцию обработки ошибок через аргумент on_error
, которая принимает один аргумент, т.е. исключение. Функция может остановить обработку, повторно выбросив ошибку, или, чтобы продолжить, вернуть любое значение, которое затем возвращается вместе с результатами вызывающему.
Рассмотрим следующие два примера. Первый возвращает объект исключения на месте, второй - 0 вместо любого исключения:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
Ошибки также могут обрабатываться путем повторной попытки неудачных вычислений. Ключевые аргументы retry_delays
и retry_check
передаются в retry
как ключевые аргументы delays
и check
соответственно. Если указано пакетирование, и весь пакет не удался, все элементы в пакете будут повторно попытаны.
Обратите внимание, что если указаны как on_error
, так и retry_delays
, хук on_error
вызывается перед повторной попыткой. Если on_error
не выбрасывает (или не повторно выбрасывает) исключение, элемент не будет повторно попытан.
Пример: При ошибках повторно пытайтесь f
на элементе максимум 3 раза без задержки между попытками.
pmap(f, c; retry_delays = zeros(3))
Пример: Повторяйте f
только если исключение не является типом InexactError
, с экспоненциально увеличивающимися задержками до 3 раз. Верните NaN
вместо всех случаев InexactError
.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(захвачено)
Исключения при удаленных вычислениях захватываются и повторно выбрасываются локально. RemoteException
оборачивает pid
рабочего и захваченное исключение. CapturedException
захватывает удаленное исключение и сериализуемую форму стека вызовов, когда было вызвано исключение.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
После выхода клиентского процесса Julia дальнейшие попытки сослаться на мертвый дочерний процесс вызовут это исключение.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
— это заполнитель для одной вычислительной операции с неизвестным статусом завершения и временем. Для нескольких потенциальных вычислений см. RemoteChannel
. См. remoteref_id
для идентификации AbstractRemoteRef
.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
Создает ссылку на Channel{Any}(1)
в процессе pid
. Значение по умолчанию для pid
— текущий процесс.
RemoteChannel(f::Function, pid::Integer=myid())
Создает ссылки на удаленные каналы определенного размера и типа. f
— это функция, которая при выполнении на pid
должна вернуть реализацию AbstractChannel
.
Например, RemoteChannel(()->Channel{Int}(10), pid)
, вернет ссылку на канал типа Int
и размера 10 на pid
.
Значение по умолчанию для pid
— текущий процесс.
Base.fetch
— Methodfetch(x::Future)
Ожидает и получает значение Future
. Полученное значение кэшируется локально. Повторные вызовы fetch
с той же ссылкой возвращают кэшированное значение. Если удаленное значение является исключением, выбрасывает RemoteException
, который захватывает удаленное исключение и трассировку стека.
Base.fetch
— Methodfetch(c::RemoteChannel)
Ожидает и получает значение из RemoteChannel
. Исключения, которые возникают, такие же, как для Future
. Не удаляет извлеченный элемент.
fetch(x::Any)
Возвращает x
.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
Вызовите функцию f
асинхронно с заданными аргументами на указанном процессе. Верните Future
. Ключевые аргументы, если таковые имеются, передаются в f
.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
Выполняет более быстрый wait(remotecall(...))
в одном сообщении на Worker
, указанном идентификатором рабочего id
. Ключевые аргументы, если таковые имеются, передаются в f
.
Смотрите также wait
и remotecall
.
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
Выполняет fetch(remotecall(...))
в одном сообщении. Ключевые аргументы, если они есть, передаются в f
. Все удаленные исключения захватываются в RemoteException
и выбрасываются.
Смотрите также fetch
и remotecall
.
Примеры
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: На рабочем узле 2:
DomainError с -4.0:
sqrt был вызван с отрицательным действительным аргументом, но вернет комплексный результат только если будет вызван с комплексным аргументом. Попробуйте sqrt(Complex(x)).
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
Выполняет f
на рабочем узле id
асинхронно. В отличие от remotecall
, он не сохраняет результат вычисления и нет возможности дождаться его завершения.
Успешный вызов указывает на то, что запрос был принят для выполнения на удаленном узле.
Хотя последовательные remotecall
к одному и тому же рабочему узлу сериализуются в порядке их вызова, порядок выполнения на удаленном рабочем узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
сериализует вызов f1
, за которым следуют f2
и f3
в этом порядке. Однако не гарантируется, что f1
будет выполнен до f3
на рабочем узле 2.
Любые исключения, выбрасываемые f
, выводятся в stderr
на удаленном рабочем узле.
Аргументы ключевых слов, если таковые имеются, передаются в f
.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
Сохраните набор значений в RemoteChannel
. Если канал полон, блокируется до тех пор, пока не появится место. Возвращает первый аргумент.
Base.put!
— Methodput!(rr::Future, v)
Сохраните значение в Future
rr
. Future
— это ссылки на удаленные объекты, которые можно записать только один раз. Вызов put!
на уже установленном Future
вызывает Exception
. Все асинхронные удаленные вызовы возвращают Future
и устанавливают значение в возвращаемое значение вызова по завершении.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
Извлекает значение(я) из RemoteChannel
rr
, удаляя значение(я) в процессе.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
Определите, имеет ли RemoteChannel
сохраненное значение. Обратите внимание, что эта функция может вызвать гонки, так как к моменту получения результата это может уже не быть верным. Тем не менее, ее можно безопасно использовать с Future
, так как они назначаются только один раз.
Base.isready
— Methodisready(rr::Future)
Определяет, имеет ли Future
сохраненное значение.
Если аргумент Future
принадлежит другой ноде, этот вызов будет блокировать выполнение до получения ответа. Рекомендуется ожидать rr
в отдельной задаче или использовать локальный Channel
в качестве прокси:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # не будет блокировать
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
Супертип для пулов рабочих, таких как WorkerPool
и CachingPool
. AbstractWorkerPool
должен реализовывать:
push!
- добавить нового рабочего в общий пул (доступные + занятые)put!
- вернуть рабочего в доступный пулtake!
- взять рабочего из доступного пула (для использования в удаленном выполнении функции)length
- количество рабочих, доступных в общем пулеisready
- вернуть false, еслиtake!
в пуле заблокирует, иначе true
Стандартные реализации вышеуказанных методов (в AbstractWorkerPool
) требуют поля
channel::Channel{Int}
workers::Set{Int}
где channel
содержит свободные pid рабочих, а workers
- это множество всех рабочих, связанных с этим пулом.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
Создайте WorkerPool
из вектора или диапазона идентификаторов рабочих.
Примеры
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
Реализация AbstractWorkerPool
. remote
, remotecall_fetch
, pmap
(и другие удаленные вызовы, которые выполняют функции удаленно) получают выгоду от кэширования сериализованных/десериализованных функций на узлах-работниках, особенно замыканий (которые могут захватывать большие объемы данных).
Удаленный кэш поддерживается на протяжении всего времени жизни возвращаемого объекта CachingPool
. Чтобы очистить кэш раньше, используйте clear!(pool)
.
Для глобальных переменных в замыкании захватываются только привязки, а не данные. Блоки let
могут быть использованы для захвата глобальных данных.
Примеры
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
Вышеуказанное передаст foo
только один раз каждому работнику.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
, содержащий бездействующих workers
- используется в remote(f)
и pmap
(по умолчанию). Если явно не задано через default_worker_pool!(pool)
, пул рабочих по умолчанию инициализируется как WorkerPool
.
Примеры
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
Очищает глобальные привязки в модулях, инициализируя их значением nothing
. syms
должен быть типа Symbol
или коллекцией Symbol
s. pids
и mod
определяют процессы и модуль, в котором глобальные переменные должны быть переинициализированы. Очищаются только те имена, которые были определены в mod
.
Исключение возникает, если запрашивается очистка глобальной константы.
clear!(pool::CachingPool) -> pool
Удаляет все кэшированные функции у всех участвующих рабочих.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
Возвращает анонимную функцию, которая выполняет функцию f
на доступном рабочем узле (взятым из WorkerPool
p
, если он предоставлен) с использованием remotecall_fetch
.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
вариант remotecall(f, pid, ....)
. Ожидает и берет свободного работника из pool
и выполняет remotecall
на нем.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
В этом примере задача выполнялась на pid 2, вызванная из pid 1.
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
вариант remotecall_wait(f, pid, ....)
. Ожидает и берет свободного работника из pool
и выполняет remotecall_wait
на нем.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> результат
WorkerPool
вариант remotecall_fetch(f, pid, ....)
. Ожидает и берет свободного работника из pool
и выполняет remotecall_fetch
на нем.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> ничего
WorkerPool
вариант remote_do(f, pid, ....)
. Ждите и возьмите свободного работника из pool
и выполните remote_do
на нем.
Distributed.@spawn
— Macro@spawn expr
Создает замыкание вокруг выражения и выполняет его на автоматически выбранном процессе, возвращая Future
к результату. Этот макрос устарел; вместо него следует использовать @spawnat :any expr
.
Примеры
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Начиная с Julia 1.3 этот макрос устарел. Используйте @spawnat :any
вместо.
Distributed.@spawnat
— Macro@spawnat p expr
Создайте замыкание вокруг выражения и выполните замыкание асинхронно на процессе p
. Верните Future
к результату. Если p
является цитируемым литеральным символом :any
, то система автоматически выберет процессор для использования.
Примеры
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
Аргумент :any
доступен начиная с Julia 1.3.
Distributed.@fetch
— Macro@fetch expr
Эквивалентно fetch(@spawnat :any expr)
. См. fetch
и @spawnat
.
Примеры
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
Эквивалентно fetch(@spawnat p expr)
. См. fetch
и @spawnat
.
Примеры
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
Распределенный параллельный цикл for следующего вида:
@distributed [reducer] for var = range
body
end
Указанный диапазон разбивается на части и локально выполняется на всех рабочих узлах. В случае, если указана необязательная функция редуктора, @distributed
выполняет локальные редукции на каждом рабочем узле с финальной редукцией на вызывающем процессе.
Обратите внимание, что без функции редуктора @distributed
выполняется асинхронно, т.е. он запускает независимые задачи на всех доступных рабочих узлах и сразу возвращает, не дожидаясь завершения. Чтобы дождаться завершения, добавьте префикс к вызову @sync
, например:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
Выполните выражение в Main
на всех procs
. Ошибки на любом из процессов собираются в CompositeException
и выбрасываются. Например:
@everywhere bar = 1
определит Main.bar
на всех текущих процессах. Любые процессы, добавленные позже (например, с помощью addprocs()
), не будут иметь определенное выражение.
В отличие от @spawnat
, @everywhere
не захватывает локальные переменные. Вместо этого локальные переменные могут быть переданы с помощью интерполяции:
foo = 1
@everywhere bar = $foo
Необязательный аргумент procs
позволяет указать подмножество всех процессов для выполнения выражения.
Похоже на вызов remotecall_eval(Main, procs, expr)
, но с двумя дополнительными функциями:
- Операторы `using` и `import` выполняются сначала на вызывающем процессе, чтобы гарантировать,
что пакеты предварительно скомпилированы.
- Текущий путь к исходному файлу, используемому `include`, передается другим процессам.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s и RemoteChannel
s идентифицируются по полям:
where
- ссылается на узел, где на самом деле существует основной объект/хранилище, на которое ссылается ссылка.whence
- ссылается на узел, из которого была создана удаленная ссылка. Обратите внимание, что это отличается от узла, где на самом деле существует основной объект, на который ссылается ссылка. Например, вызовRemoteChannel(2)
из главного процесса приведет к значениюwhere
равному 2 и значениюwhence
равному 1.id
уникален для всех ссылок, созданных из рабочего процесса, указанного вwhence
.
Вместе whence
и id
уникально идентифицируют ссылку среди всех рабочих процессов.
remoteref_id
- это низкоуровневый API, который возвращает объект RRID
, оборачивающий значения whence
и id
удаленной ссылки.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
Низкоуровневый API, который возвращает базовый AbstractChannel
для id
, возвращенного функцией remoteref_id
. Вызов действителен только на узле, где существует базовый канал.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
Низкоуровневый API, который, принимая IO
соединение или Worker
, возвращает pid
рабочего, к которому он подключен. Это полезно при написании пользовательских serialize
методов для типа, которые оптимизируют записываемые данные в зависимости от идентификатора процесса получателя.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
Вернуть кластерный куки.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
Установите переданный cookie как кластерный cookie, затем верните его.
Cluster Manager Interface
Этот интерфейс предоставляет механизм для запуска и управления рабочими процессами Julia в различных кластерных средах. В Base присутствуют два типа менеджеров: LocalManager
, для запуска дополнительных рабочих процессов на том же хосте, и SSHManager
, для запуска на удаленных хостах через ssh
. Для соединения и передачи сообщений между процессами используются сокеты TCP/IP. Менеджеры кластеров могут предоставлять другой транспорт.
Distributed.ClusterManager
— TypeClusterManager
Супертип для менеджеров кластеров, которые контролируют рабочие процессы как кластер. Менеджеры кластеров реализуют, как рабочие процессы могут быть добавлены, удалены и с ними можно общаться. SSHManager
и LocalManager
являются подтипами этого.
Distributed.WorkerConfig
— TypeWorkerConfig
Тип, используемый ClusterManager
для управления рабочими процессами, добавленными в их кластеры. Некоторые поля используются всеми менеджерами кластера для доступа к хосту:
io
– соединение, используемое для доступа к рабочему процессу (подтипIO
илиNothing
)host
– адрес хоста (либоString
, либоNothing
)port
– порт на хосте, используемый для подключения к рабочему процессу (либоInt
, либоNothing
)
Некоторые используются менеджером кластера для добавления рабочих процессов на уже инициализированный хост:
count
– количество рабочих процессов, которые будут запущены на хостеexename
– путь к исполняемому файлу Julia на хосте, по умолчанию"$(Sys.BINDIR)/julia"
или"$(Sys.BINDIR)/julia-debug"
exeflags
– флаги, которые следует использовать при удаленном запуске Julia
Поле userdata
используется для хранения информации для каждого рабочего процесса внешними менеджерами.
Некоторые поля используются SSHManager
и аналогичными менеджерами:
tunnel
–true
(использовать туннелирование),false
(не использовать туннелирование) илиnothing
(использовать значение по умолчанию для менеджера)multiplex
–true
(использовать SSH мультиплексирование для туннелирования) илиfalse
forward
– опция пересылки, используемая для опции-L
sshbind_addr
– адрес на удаленном хосте, к которому нужно привязатьсяsshflags
– флаги, которые следует использовать при установлении SSH-соединенияmax_parallel
– максимальное количество рабочих процессов, к которым можно подключиться параллельно на хосте
Некоторые поля используются как LocalManager
, так и SSHManager
:
connect_at
– определяет, является ли это вызовом настройки от рабочего процесса к рабочему процессу или от драйвера к рабочему процессуprocess
– процесс, к которому будет подключено (обычно менеджер назначает это во времяaddprocs
)ospid
– идентификатор процесса в соответствии с ОС хоста, используемый для прерывания рабочих процессовenviron
– частный словарь, используемый для хранения временной информации менеджерами Local/SSHident
– рабочий процесс, идентифицированныйClusterManager
connect_idents
– список идентификаторов рабочих процессов, к которым рабочий процесс должен подключиться, если используется пользовательская топологияenable_threaded_blas
–true
,false
илиnothing
, использовать ли многопоточную BLAS на рабочих процессах или нет
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
Реализовано менеджерами кластеров. Для каждого рабочего процесса Julia, запущенного этой функцией, он должен добавить запись WorkerConfig
в launched
и уведомить launch_ntfy
. Функция ДОЛЖНА завершиться, как только все рабочие процессы, запрошенные manager
, будут запущены. params
— это словарь всех аргументов ключевых слов, с которыми был вызван addprocs
.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
Реализовано менеджерами кластеров. Вызывается в основном процессе, в течение жизни рабочего, с соответствующими значениями op
:
- с
:register
/:deregister
, когда рабочий добавляется / удаляется из пула рабочих Julia. - с
:interrupt
, когда вызываетсяinterrupt(workers)
.ClusterManager
должен сигнализировать соответствующему рабочему с помощью сигнала прерывания. - с
:finalize
для целей очистки.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
Реализовано менеджерами кластеров. Вызывается в главном процессе с помощью rmprocs
. Это должно привести к выходу удаленного рабочего, указанного pid
. kill(manager::ClusterManager.....)
выполняет удаленный exit()
на pid
.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
Реализовано менеджерами кластеров с использованием пользовательских транспортов. Он должен установить логическое соединение с рабочим с идентификатором pid
, указанным в config
, и вернуть пару объектов IO
. Сообщения от pid
к текущему процессу будут считываться из instrm
, в то время как сообщения, которые необходимо отправить pid
, будут записываться в outstrm
. Реализация пользовательского транспорта должна гарантировать, что сообщения доставляются и принимаются полностью и в порядке. connect(manager::ClusterManager.....)
настраивает соединения TCP/IP между рабочими.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
Вызывается менеджерами кластеров, реализующими пользовательские транспорты. Он инициализирует вновь запущенный процесс как рабочий. Аргумент командной строки --worker[=<cookie>]
имеет эффект инициализации процесса как рабочего с использованием TCP/IP сокетов для транспорта. cookie
является cluster_cookie
.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
— это внутренняя функция, которая является точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс как рабочего в кластере Julia.
Информация о хосте:порту записывается в поток out
(по умолчанию stdout).
Функция считывает cookie из stdin, если это необходимо, и слушает на свободном порту (или, если указано, на порту в параметре командной строки --bind-to
) и планирует задачи для обработки входящих TCP-соединений и запросов. Она также (по желанию) закрывает stdin и перенаправляет stderr в stdout.
Она не возвращает значения.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
Вызывается менеджерами кластера с использованием пользовательских транспортов. Он должен вызываться, когда реализация пользовательского транспорта получает первое сообщение от удаленного рабочего. Пользовательский транспорт должен управлять логическим соединением с удаленным рабочим и предоставлять два объекта IO
, один для входящих сообщений и другой для сообщений, адресованных удаленному рабочему. Если incoming
равно true
, удаленный партнер инициировал соединение. Тот, кто из пары инициирует соединение, отправляет кластерный куки и номер своей версии Julia для выполнения аутентификационного рукопожатия.
Смотрите также cluster_cookie
.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
Реализовано менеджерами кластеров. Параметры по умолчанию, передаваемые при вызове addprocs(mgr)
. Минимальный набор опций доступен при вызове default_addprocs_params()
.