Distributed Computing
Distributed — ModuleИнструменты для распределенной параллельной обработки.
Distributed.addprocs — Functionaddprocs(manager::ClusterManager; kwargs...) -> Список идентификаторов процессовЗапускает рабочие процессы через указанный кластерный менеджер.
Например, кластеры Beowulf поддерживаются через пользовательский кластерный менеджер, реализованный в пакете ClusterManagers.jl.
Количество секунд, в течение которых вновь запущенный рабочий процесс ждет установления соединения с мастером, можно указать через переменную JULIA_WORKER_TIMEOUT в окружении рабочего процесса. Это имеет значение только при использовании TCP/IP в качестве транспорта.
Чтобы запустить рабочих процессов без блокировки REPL или содержащей функции, если рабочие процессы запускаются программно, выполните addprocs в своей задаче.
Примеры
# В загруженных кластерах вызывайте `addprocs` асинхронно
t = @async addprocs(...)# Используйте рабочих, когда они выходят в онлайн
if nprocs() > 1 # Убедитесь, что доступен хотя бы один новый рабочий процесс
.... # выполните распределенное выполнение
end# Получите идентификаторы вновь запущенных рабочих процессов или любые сообщения об ошибках
if istaskdone(t) # Проверьте, завершился ли `addprocs`, чтобы убедиться, что `fetch` не блокирует
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Список идентификаторов процессовДобавьте рабочие процессы на удаленных машинах через SSH. Конфигурация выполняется с помощью именованных аргументов (см. ниже). В частности, ключевое слово exename можно использовать для указания пути к бинарному файлу julia на удаленной машине(ах).
machines — это вектор "спецификаций машин", которые задаются в виде строк формата [user@]host[:port] [bind_addr[:port]]. user по умолчанию равен текущему пользователю, а port — стандартному порту SSH. Если указано [bind_addr[:port]], другие рабочие процессы будут подключаться к этому рабочему процессу по указанному bind_addr и port.
Возможно запустить несколько процессов на удаленном хосте, используя кортеж в векторе machines или форму (machine_spec, count), где count — это количество рабочих процессов, которые будут запущены на указанном хосте. Передача :auto в качестве количества рабочих процессов запустит столько рабочих процессов, сколько потоков ЦП на удаленном хосте.
Примеры:
addprocs([
"remote1", # один рабочий процесс на 'remote1', входящий с текущим именем пользователя
"user@remote2", # один рабочий процесс на 'remote2', входящий с именем пользователя 'user'
"user@remote3:2222", # указание порта SSH на '2222' для 'remote3'
("user@remote4", 4), # запустить 4 рабочих процесса на 'remote4'
("user@remote5", :auto), # запустить столько рабочих процессов, сколько потоков ЦП на 'remote5'
])Именованные аргументы:
tunnel: еслиtrue, то будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. По умолчаниюfalse.multiplex: еслиtrue, то для SSH-туннелирования используется SSH-мультиплексирование. По умолчаниюfalse.ssh: имя или путь к исполняемому файлу SSH-клиента, используемого для запуска рабочих процессов. По умолчанию"ssh".sshflags: указывает дополнительные параметры ssh, напримерsshflags=`-i /home/foo/bar.pem`max_parallel: указывает максимальное количество рабочих процессов, подключенных параллельно на хосте. По умолчанию 10.shell: указывает тип оболочки, к которой ssh подключается на рабочих процессах.shell=:posix: совместимая с POSIX оболочка Unix/Linux (sh, ksh, bash, dash, zsh и т. д.). По умолчанию.shell=:csh: оболочка C Unix (csh, tcsh).shell=:wincmd: Microsoft Windowscmd.exe.
dir: указывает рабочий каталог на рабочих процессах. По умолчанию текущий каталог хоста (как найдено с помощьюpwd())enable_threaded_blas: еслиtrue, то BLAS будет работать на нескольких потоках в добавленных процессах. По умолчаниюfalse.exename: имя исполняемого файлаjulia. По умолчанию"$(Sys.BINDIR)/julia"или"$(Sys.BINDIR)/julia-debug"в зависимости от случая. Рекомендуется использовать одну и ту же версию Julia на всех удаленных машинах, так как сериализация и распределение кода могут не сработать в противном случае.exeflags: дополнительные флаги, передаваемые рабочим процессам.topology: указывает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами приводит к ошибке.topology=:all_to_all: Все процессы подключены друг к другу. По умолчанию.topology=:master_worker: Только главный процесс, т.е.pid1, подключается к рабочим процессам. Рабочие процессы не подключаются друг к другу.topology=:custom: Методlaunchменеджера кластера указывает топологию подключения через поляidentиconnect_identsвWorkerConfig. Рабочий процесс с идентичностью менеджера кластераidentподключится ко всем рабочим процессам, указанным вconnect_idents.
lazy: Применимо только сtopology=:all_to_all. Еслиtrue, соединения между рабочими процессами устанавливаются лениво, т.е. они устанавливаются при первом вызове удаленного вызова между рабочими процессами. По умолчанию true.env: предоставьте массив пар строк, таких какenv=["JULIA_DEPOT_PATH"=>"/depot"], чтобы запросить установку переменных окружения на удаленной машине. По умолчанию только переменная окруженияJULIA_WORKER_TIMEOUTпередается автоматически из локальной среды в удаленную.cmdline_cookie: передайте аутентификационное cookie через параметр командной строки--worker. (более безопасное) поведение по умолчанию передачи cookie через ssh stdio может зависнуть с рабочими процессами Windows, которые используют более старые (предшествующие ConPTY) версии Julia или Windows, в этом случаеcmdline_cookie=trueпредлагает обходной путь.
Именованные аргументы ssh, shell, env и cmdline_cookie были добавлены в Julia 1.6.
Переменные окружения:
Если главный процесс не может установить соединение с вновь запущенным рабочим процессом в течение 60.0 секунд, рабочий процесс рассматривает это как фатальную ситуацию и завершает работу. Этот тайм-аут можно контролировать с помощью переменной окружения JULIA_WORKER_TIMEOUT. Значение JULIA_WORKER_TIMEOUT на главном процессе указывает количество секунд, в течение которых вновь запущенный рабочий процесс ждет установления соединения.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Список идентификаторов процессовЗапустите np рабочих на локальном хосте, используя встроенный LocalManager.
Локальные рабочие наследуют текущее окружение пакета (т.е. активный проект, LOAD_PATH и DEPOT_PATH) от основного процесса.
Обратите внимание, что рабочие не выполняют скрипт инициализации ~/.julia/config/startup.jl, и они не синхронизируют свое глобальное состояние (такие как параметры командной строки, глобальные переменные, определения новых методов и загруженные модули) с любыми другими запущенными процессами.
Ключевые аргументы:
restrict::Bool: еслиtrue(по умолчанию) привязка ограничена127.0.0.1.dir,exename,exeflags,env,topology,lazy,enable_threaded_blas: тот же эффект, что и дляSSHManager, см. документацию дляaddprocs(machines::AbstractVector).
Наследование окружения пакета и ключевой аргумент env были добавлены в Julia 1.9.
Distributed.nprocs — Functionnprocs()Получите количество доступных процессов.
Примеры
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — Functionnworkers()Получите количество доступных рабочих процессов. Это на единицу меньше, чем nprocs(). Равно nprocs(), если nprocs() == 1.
Примеры
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — Methodprocs()Возвращает список всех идентификаторов процессов, включая pid 1 (который не включен в workers()).
Примеры
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — Methodprocs(pid::Integer)Возвращает список всех идентификаторов процессов на том же физическом узле. В частности, возвращаются все рабочие процессы, привязанные к тому же IP-адресу, что и pid.
Distributed.workers — Functionworkers()Возвращает список всех идентификаторов рабочих процессов.
Примеры
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — Functionrmprocs(pids...; waitfor=typemax(Int))Удалите указанных работников. Обратите внимание, что только процесс 1 может добавлять или удалять работников.
Аргумент waitfor указывает, как долго ждать завершения работы работников:
- Если не указано,
rmprocsбудет ждать, пока все запрашиваемыеpidsне будут удалены. - Исключение
ErrorExceptionбудет вызвано, если все работники не могут быть завершены до истечения запрашиваемых секундwaitfor. - При значении
waitforравном 0, вызов возвращается немедленно с работниками, запланированными на удаление в другой задаче. Запланированный объектTaskвозвращается. Пользователь должен вызватьwaitна задаче перед вызовом любых других параллельных вызовов.
Примеры
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — Functioninterrupt(pids::Integer...)Прерывает текущую выполняемую задачу на указанных рабочих узлах. Это эквивалентно нажатию Ctrl-C на локальной машине. Если аргументы не указаны, все рабочие узлы прерываются.
interrupt(pids::AbstractVector=workers())Прерывает текущую выполняемую задачу на указанных рабочих. Это эквивалентно нажатию Ctrl-C на локальной машине. Если аргументы не указаны, все рабочие прерываются.
Distributed.myid — Functionmyid()Получите идентификатор текущего процесса.
Примеры
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collectionПреобразуйте коллекцию c, применяя f к каждому элементу с использованием доступных рабочих процессов и задач.
Для нескольких аргументов коллекции применяйте f поэлементно.
Обратите внимание, что f должен быть доступен всем рабочим процессам; смотрите Доступность кода и загрузка пакетов для получения подробной информации.
Если пул рабочих процессов не указан, будут использованы все доступные рабочие процессы через CachingPool.
По умолчанию pmap распределяет вычисления между всеми указанными рабочими процессами. Чтобы использовать только локальный процесс и распределить вычисления по задачам, укажите distributed=false. Это эквивалентно использованию asyncmap. Например, pmap(f, c; distributed=false) эквивалентно asyncmap(f,c; ntasks=()->nworkers())
pmap также может использовать смесь процессов и задач через аргумент batch_size. Для размеров пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size или меньше. Пакет отправляется как единый запрос к свободному рабочему процессу, где локальный asyncmap обрабатывает элементы из пакета, используя несколько параллельных задач.
Любая ошибка останавливает pmap от обработки оставшейся части коллекции. Чтобы переопределить это поведение, вы можете указать функцию обработки ошибок через аргумент on_error, которая принимает один аргумент, т.е. исключение. Функция может остановить обработку, повторно выбросив ошибку, или, чтобы продолжить, вернуть любое значение, которое затем возвращается вместе с результатами вызывающему.
Рассмотрим следующие два примера. Первый возвращает объект исключения на месте, второй - 0 вместо любого исключения:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0Ошибки также могут обрабатываться путем повторной попытки неудачных вычислений. Ключевые аргументы retry_delays и retry_check передаются в retry как ключевые аргументы delays и check соответственно. Если указано пакетирование, и весь пакет не удался, все элементы в пакете будут повторно попытаны.
Обратите внимание, что если указаны как on_error, так и retry_delays, хук on_error вызывается перед повторной попыткой. Если on_error не выбрасывает (или не повторно выбрасывает) исключение, элемент не будет повторно попытан.
Пример: При ошибках повторно пытайтесь f на элементе максимум 3 раза без задержки между попытками.
pmap(f, c; retry_delays = zeros(3))Пример: Повторяйте f только если исключение не является типом InexactError, с экспоненциально увеличивающимися задержками до 3 раз. Верните NaN вместо всех случаев InexactError.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — TypeRemoteException(захвачено)Исключения при удаленных вычислениях захватываются и повторно выбрасываются локально. RemoteException оборачивает pid рабочего и захваченное исключение. CapturedException захватывает удаленное исключение и сериализуемую форму стека вызовов, когда было вызвано исключение.
Distributed.ProcessExitedException — TypeProcessExitedException(worker_id::Int)После выхода клиентского процесса Julia дальнейшие попытки сослаться на мертвый дочерний процесс вызовут это исключение.
Distributed.Future — TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Future — это заполнитель для одной вычислительной операции с неизвестным статусом завершения и временем. Для нескольких потенциальных вычислений см. RemoteChannel. См. remoteref_id для идентификации AbstractRemoteRef.
Distributed.RemoteChannel — TypeRemoteChannel(pid::Integer=myid())Создает ссылку на Channel{Any}(1) в процессе pid. Значение по умолчанию для pid — текущий процесс.
RemoteChannel(f::Function, pid::Integer=myid())Создает ссылки на удаленные каналы определенного размера и типа. f — это функция, которая при выполнении на pid должна вернуть реализацию AbstractChannel.
Например, RemoteChannel(()->Channel{Int}(10), pid), вернет ссылку на канал типа Int и размера 10 на pid.
Значение по умолчанию для pid — текущий процесс.
Base.fetch — Methodfetch(x::Future)Ожидает и получает значение Future. Полученное значение кэшируется локально. Повторные вызовы fetch с той же ссылкой возвращают кэшированное значение. Если удаленное значение является исключением, выбрасывает RemoteException, который захватывает удаленное исключение и трассировку стека.
Base.fetch — Methodfetch(c::RemoteChannel)Ожидает и получает значение из RemoteChannel. Исключения, которые возникают, такие же, как для Future. Не удаляет извлеченный элемент.
fetch(x::Any)Возвращает x.
Distributed.remotecall — Methodremotecall(f, id::Integer, args...; kwargs...) -> FutureВызовите функцию f асинхронно с заданными аргументами на указанном процессе. Верните Future. Ключевые аргументы, если таковые имеются, передаются в f.
Distributed.remotecall_wait — Methodremotecall_wait(f, id::Integer, args...; kwargs...)Выполняет более быстрый wait(remotecall(...)) в одном сообщении на Worker, указанном идентификатором рабочего id. Ключевые аргументы, если таковые имеются, передаются в f.
Смотрите также wait и remotecall.
Distributed.remotecall_fetch — Methodremotecall_fetch(f, id::Integer, args...; kwargs...)Выполняет fetch(remotecall(...)) в одном сообщении. Ключевые аргументы, если они есть, передаются в f. Все удаленные исключения захватываются в RemoteException и выбрасываются.
Смотрите также fetch и remotecall.
Примеры
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: На рабочем узле 2:
DomainError с -4.0:
sqrt был вызван с отрицательным действительным аргументом, но вернет комплексный результат только если будет вызван с комплексным аргументом. Попробуйте sqrt(Complex(x)).
...Distributed.remote_do — Methodremote_do(f, id::Integer, args...; kwargs...) -> nothingВыполняет f на рабочем узле id асинхронно. В отличие от remotecall, он не сохраняет результат вычисления и нет возможности дождаться его завершения.
Успешный вызов указывает на то, что запрос был принят для выполнения на удаленном узле.
Хотя последовательные remotecall к одному и тому же рабочему узлу сериализуются в порядке их вызова, порядок выполнения на удаленном рабочем узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) сериализует вызов f1, за которым следуют f2 и f3 в этом порядке. Однако не гарантируется, что f1 будет выполнен до f3 на рабочем узле 2.
Любые исключения, выбрасываемые f, выводятся в stderr на удаленном рабочем узле.
Аргументы ключевых слов, если таковые имеются, передаются в f.
Base.put! — Methodput!(rr::RemoteChannel, args...)Сохраните набор значений в RemoteChannel. Если канал полон, блокируется до тех пор, пока не появится место. Возвращает первый аргумент.
Base.put! — Methodput!(rr::Future, v)Сохраните значение в Future rr. Future — это ссылки на удаленные объекты, которые можно записать только один раз. Вызов put! на уже установленном Future вызывает Exception. Все асинхронные удаленные вызовы возвращают Future и устанавливают значение в возвращаемое значение вызова по завершении.
Base.take! — Methodtake!(rr::RemoteChannel, args...)Извлекает значение(я) из RemoteChannel rr, удаляя значение(я) в процессе.
Base.isready — Methodisready(rr::RemoteChannel, args...)Определите, имеет ли RemoteChannel сохраненное значение. Обратите внимание, что эта функция может вызвать гонки, так как к моменту получения результата это может уже не быть верным. Тем не менее, ее можно безопасно использовать с Future, так как они назначаются только один раз.
Base.isready — Methodisready(rr::Future)Определяет, имеет ли Future сохраненное значение.
Если аргумент Future принадлежит другой ноде, этот вызов будет блокировать выполнение до получения ответа. Рекомендуется ожидать rr в отдельной задаче или использовать локальный Channel в качестве прокси:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # не будет блокироватьDistributed.AbstractWorkerPool — TypeAbstractWorkerPoolСупертип для пулов рабочих, таких как WorkerPool и CachingPool. AbstractWorkerPool должен реализовывать:
push!- добавить нового рабочего в общий пул (доступные + занятые)put!- вернуть рабочего в доступный пулtake!- взять рабочего из доступного пула (для использования в удаленном выполнении функции)length- количество рабочих, доступных в общем пулеisready- вернуть false, еслиtake!в пуле заблокирует, иначе true
Стандартные реализации вышеуказанных методов (в AbstractWorkerPool) требуют поля
channel::Channel{Int}workers::Set{Int}
где channel содержит свободные pid рабочих, а workers - это множество всех рабочих, связанных с этим пулом.
Distributed.WorkerPool — TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})Создайте WorkerPool из вектора или диапазона идентификаторов рабочих.
Примеры
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — TypeCachingPool(workers::Vector{Int})Реализация AbstractWorkerPool. remote, remotecall_fetch, pmap (и другие удаленные вызовы, которые выполняют функции удаленно) получают выгоду от кэширования сериализованных/десериализованных функций на узлах-работниках, особенно замыканий (которые могут захватывать большие объемы данных).
Удаленный кэш поддерживается на протяжении всего времени жизни возвращаемого объекта CachingPool. Чтобы очистить кэш раньше, используйте clear!(pool).
Для глобальных переменных в замыкании захватываются только привязки, а не данные. Блоки let могут быть использованы для захвата глобальных данных.
Примеры
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
endВышеуказанное передаст foo только один раз каждому работнику.
Distributed.default_worker_pool — Functiondefault_worker_pool()AbstractWorkerPool, содержащий бездействующих workers - используется в remote(f) и pmap (по умолчанию). Если явно не задано через default_worker_pool!(pool), пул рабочих по умолчанию инициализируется как WorkerPool.
Примеры
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — Functionclear!(syms, pids=workers(); mod=Main)Очищает глобальные привязки в модулях, инициализируя их значением nothing. syms должен быть типа Symbol или коллекцией Symbols. pids и mod определяют процессы и модуль, в котором глобальные переменные должны быть переинициализированы. Очищаются только те имена, которые были определены в mod.
Исключение возникает, если запрашивается очистка глобальной константы.
clear!(pool::CachingPool) -> poolУдаляет все кэшированные функции у всех участвующих рабочих.
Distributed.remote — Functionremote([p::AbstractWorkerPool], f) -> FunctionВозвращает анонимную функцию, которая выполняет функцию f на доступном рабочем узле (взятым из WorkerPool p, если он предоставлен) с использованием remotecall_fetch.
Distributed.remotecall — Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool вариант remotecall(f, pid, ....). Ожидает и берет свободного работника из pool и выполняет remotecall на нем.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)В этом примере задача выполнялась на pid 2, вызванная из pid 1.
Distributed.remotecall_wait — Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool вариант remotecall_wait(f, pid, ....). Ожидает и берет свободного работника из pool и выполняет remotecall_wait на нем.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> результатWorkerPool вариант remotecall_fetch(f, pid, ....). Ожидает и берет свободного работника из pool и выполняет remotecall_fetch на нем.
Примеры
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> ничегоWorkerPool вариант remote_do(f, pid, ....). Ждите и возьмите свободного работника из pool и выполните remote_do на нем.
Distributed.@spawn — Macro@spawn exprСоздает замыкание вокруг выражения и выполняет его на автоматически выбранном процессе, возвращая Future к результату. Этот макрос устарел; вместо него следует использовать @spawnat :any expr.
Примеры
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3Начиная с Julia 1.3 этот макрос устарел. Используйте @spawnat :any вместо.
Distributed.@spawnat — Macro@spawnat p exprСоздайте замыкание вокруг выражения и выполните замыкание асинхронно на процессе p. Верните Future к результату. Если p является цитируемым литеральным символом :any, то система автоматически выберет процессор для использования.
Примеры
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3Аргумент :any доступен начиная с Julia 1.3.
Distributed.@fetch — Macro@fetch exprЭквивалентно fetch(@spawnat :any expr). См. fetch и @spawnat.
Примеры
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — Macro@fetchfromЭквивалентно fetch(@spawnat p expr). См. fetch и @spawnat.
Примеры
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — Macro@distributedРаспределенный параллельный цикл for следующего вида:
@distributed [reducer] for var = range
body
endУказанный диапазон разбивается на части и локально выполняется на всех рабочих узлах. В случае, если указана необязательная функция редуктора, @distributed выполняет локальные редукции на каждом рабочем узле с финальной редукцией на вызывающем процессе.
Обратите внимание, что без функции редуктора @distributed выполняется асинхронно, т.е. он запускает независимые задачи на всех доступных рабочих узлах и сразу возвращает, не дожидаясь завершения. Чтобы дождаться завершения, добавьте префикс к вызову @sync, например:
@sync @distributed for var = range
body
endDistributed.@everywhere — Macro@everywhere [procs()] exprВыполните выражение в Main на всех procs. Ошибки на любом из процессов собираются в CompositeException и выбрасываются. Например:
@everywhere bar = 1определит Main.bar на всех текущих процессах. Любые процессы, добавленные позже (например, с помощью addprocs()), не будут иметь определенное выражение.
В отличие от @spawnat, @everywhere не захватывает локальные переменные. Вместо этого локальные переменные могут быть переданы с помощью интерполяции:
foo = 1
@everywhere bar = $fooНеобязательный аргумент procs позволяет указать подмножество всех процессов для выполнения выражения.
Похоже на вызов remotecall_eval(Main, procs, expr), но с двумя дополнительными функциями:
- Операторы `using` и `import` выполняются сначала на вызывающем процессе, чтобы гарантировать,
что пакеты предварительно скомпилированы.
- Текущий путь к исходному файлу, используемому `include`, передается другим процессам.Distributed.remoteref_id — Functionremoteref_id(r::AbstractRemoteRef) -> RRIDFutures и RemoteChannels идентифицируются по полям:
where- ссылается на узел, где на самом деле существует основной объект/хранилище, на которое ссылается ссылка.whence- ссылается на узел, из которого была создана удаленная ссылка. Обратите внимание, что это отличается от узла, где на самом деле существует основной объект, на который ссылается ссылка. Например, вызовRemoteChannel(2)из главного процесса приведет к значениюwhereравному 2 и значениюwhenceравному 1.idуникален для всех ссылок, созданных из рабочего процесса, указанного вwhence.
Вместе whence и id уникально идентифицируют ссылку среди всех рабочих процессов.
remoteref_id - это низкоуровневый API, который возвращает объект RRID, оборачивающий значения whence и id удаленной ссылки.
Distributed.channel_from_id — Functionchannel_from_id(id) -> cНизкоуровневый API, который возвращает базовый AbstractChannel для id, возвращенного функцией remoteref_id. Вызов действителен только на узле, где существует базовый канал.
Distributed.worker_id_from_socket — Functionworker_id_from_socket(s) -> pidНизкоуровневый API, который, принимая IO соединение или Worker, возвращает pid рабочего, к которому он подключен. Это полезно при написании пользовательских serialize методов для типа, которые оптимизируют записываемые данные в зависимости от идентификатора процесса получателя.
Distributed.cluster_cookie — Methodcluster_cookie() -> cookieВернуть кластерный куки.
Distributed.cluster_cookie — Methodcluster_cookie(cookie) -> cookieУстановите переданный cookie как кластерный cookie, затем верните его.
Cluster Manager Interface
Этот интерфейс предоставляет механизм для запуска и управления рабочими процессами Julia в различных кластерных средах. В Base присутствуют два типа менеджеров: LocalManager, для запуска дополнительных рабочих процессов на том же хосте, и SSHManager, для запуска на удаленных хостах через ssh. Для соединения и передачи сообщений между процессами используются сокеты TCP/IP. Менеджеры кластеров могут предоставлять другой транспорт.
Distributed.ClusterManager — TypeClusterManagerСупертип для менеджеров кластеров, которые контролируют рабочие процессы как кластер. Менеджеры кластеров реализуют, как рабочие процессы могут быть добавлены, удалены и с ними можно общаться. SSHManager и LocalManager являются подтипами этого.
Distributed.WorkerConfig — TypeWorkerConfigТип, используемый ClusterManager для управления рабочими процессами, добавленными в их кластеры. Некоторые поля используются всеми менеджерами кластера для доступа к хосту:
io– соединение, используемое для доступа к рабочему процессу (подтипIOилиNothing)host– адрес хоста (либоString, либоNothing)port– порт на хосте, используемый для подключения к рабочему процессу (либоInt, либоNothing)
Некоторые используются менеджером кластера для добавления рабочих процессов на уже инициализированный хост:
count– количество рабочих процессов, которые будут запущены на хостеexename– путь к исполняемому файлу Julia на хосте, по умолчанию"$(Sys.BINDIR)/julia"или"$(Sys.BINDIR)/julia-debug"exeflags– флаги, которые следует использовать при удаленном запуске Julia
Поле userdata используется для хранения информации для каждого рабочего процесса внешними менеджерами.
Некоторые поля используются SSHManager и аналогичными менеджерами:
tunnel–true(использовать туннелирование),false(не использовать туннелирование) илиnothing(использовать значение по умолчанию для менеджера)multiplex–true(использовать SSH мультиплексирование для туннелирования) илиfalseforward– опция пересылки, используемая для опции-Lsshbind_addr– адрес на удаленном хосте, к которому нужно привязатьсяsshflags– флаги, которые следует использовать при установлении SSH-соединенияmax_parallel– максимальное количество рабочих процессов, к которым можно подключиться параллельно на хосте
Некоторые поля используются как LocalManager, так и SSHManager:
connect_at– определяет, является ли это вызовом настройки от рабочего процесса к рабочему процессу или от драйвера к рабочему процессуprocess– процесс, к которому будет подключено (обычно менеджер назначает это во времяaddprocs)ospid– идентификатор процесса в соответствии с ОС хоста, используемый для прерывания рабочих процессовenviron– частный словарь, используемый для хранения временной информации менеджерами Local/SSHident– рабочий процесс, идентифицированныйClusterManagerconnect_idents– список идентификаторов рабочих процессов, к которым рабочий процесс должен подключиться, если используется пользовательская топологияenable_threaded_blas–true,falseилиnothing, использовать ли многопоточную BLAS на рабочих процессах или нет
Distributed.launch — Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)Реализовано менеджерами кластеров. Для каждого рабочего процесса Julia, запущенного этой функцией, он должен добавить запись WorkerConfig в launched и уведомить launch_ntfy. Функция ДОЛЖНА завершиться, как только все рабочие процессы, запрошенные manager, будут запущены. params — это словарь всех аргументов ключевых слов, с которыми был вызван addprocs.
Distributed.manage — Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)Реализовано менеджерами кластеров. Вызывается в основном процессе, в течение жизни рабочего, с соответствующими значениями op:
- с
:register/:deregister, когда рабочий добавляется / удаляется из пула рабочих Julia. - с
:interrupt, когда вызываетсяinterrupt(workers).ClusterManagerдолжен сигнализировать соответствующему рабочему с помощью сигнала прерывания. - с
:finalizeдля целей очистки.
Base.kill — Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)Реализовано менеджерами кластеров. Вызывается в главном процессе с помощью rmprocs. Это должно привести к выходу удаленного рабочего, указанного pid. kill(manager::ClusterManager.....) выполняет удаленный exit() на pid.
Sockets.connect — Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)Реализовано менеджерами кластеров с использованием пользовательских транспортов. Он должен установить логическое соединение с рабочим с идентификатором pid, указанным в config, и вернуть пару объектов IO. Сообщения от pid к текущему процессу будут считываться из instrm, в то время как сообщения, которые необходимо отправить pid, будут записываться в outstrm. Реализация пользовательского транспорта должна гарантировать, что сообщения доставляются и принимаются полностью и в порядке. connect(manager::ClusterManager.....) настраивает соединения TCP/IP между рабочими.
Distributed.init_worker — Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())Вызывается менеджерами кластеров, реализующими пользовательские транспорты. Он инициализирует вновь запущенный процесс как рабочий. Аргумент командной строки --worker[=<cookie>] имеет эффект инициализации процесса как рабочего с использованием TCP/IP сокетов для транспорта. cookie является cluster_cookie.
Distributed.start_worker — Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker — это внутренняя функция, которая является точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс как рабочего в кластере Julia.
Информация о хосте:порту записывается в поток out (по умолчанию stdout).
Функция считывает cookie из stdin, если это необходимо, и слушает на свободном порту (или, если указано, на порту в параметре командной строки --bind-to) и планирует задачи для обработки входящих TCP-соединений и запросов. Она также (по желанию) закрывает stdin и перенаправляет stderr в stdout.
Она не возвращает значения.
Distributed.process_messages — Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)Вызывается менеджерами кластера с использованием пользовательских транспортов. Он должен вызываться, когда реализация пользовательского транспорта получает первое сообщение от удаленного рабочего. Пользовательский транспорт должен управлять логическим соединением с удаленным рабочим и предоставлять два объекта IO, один для входящих сообщений и другой для сообщений, адресованных удаленному рабочему. Если incoming равно true, удаленный партнер инициировал соединение. Тот, кто из пары инициирует соединение, отправляет кластерный куки и номер своей версии Julia для выполнения аутентификационного рукопожатия.
Смотрите также cluster_cookie.
Distributed.default_addprocs_params — Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}Реализовано менеджерами кластеров. Параметры по умолчанию, передаваемые при вызове addprocs(mgr). Минимальный набор опций доступен при вызове default_addprocs_params().