Distributed Computing

DistributedModule

Инструменты для распределенной параллельной обработки.

source
Distributed.addprocsFunction
addprocs(manager::ClusterManager; kwargs...) -> Список идентификаторов процессов

Запускает рабочие процессы через указанный кластерный менеджер.

Например, кластеры Beowulf поддерживаются через пользовательский кластерный менеджер, реализованный в пакете ClusterManagers.jl.

Количество секунд, в течение которых вновь запущенный рабочий процесс ждет установления соединения с мастером, можно указать через переменную JULIA_WORKER_TIMEOUT в окружении рабочего процесса. Это имеет значение только при использовании TCP/IP в качестве транспорта.

Чтобы запустить рабочих процессов без блокировки REPL или содержащей функции, если рабочие процессы запускаются программно, выполните addprocs в своей задаче.

Примеры

# В загруженных кластерах вызывайте `addprocs` асинхронно
t = @async addprocs(...)
# Используйте рабочих, когда они выходят в онлайн
if nprocs() > 1   # Убедитесь, что доступен хотя бы один новый рабочий процесс
   ....   # выполните распределенное выполнение
end
# Получите идентификаторы вновь запущенных рабочих процессов или любые сообщения об ошибках
if istaskdone(t)   # Проверьте, завершился ли `addprocs`, чтобы убедиться, что `fetch` не блокирует
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> Список идентификаторов процессов

Добавьте рабочие процессы на удаленных машинах через SSH. Конфигурация выполняется с помощью именованных аргументов (см. ниже). В частности, ключевое слово exename можно использовать для указания пути к бинарному файлу julia на удаленной машине(ах).

machines — это вектор "спецификаций машин", которые задаются в виде строк формата [user@]host[:port] [bind_addr[:port]]. user по умолчанию равен текущему пользователю, а port — стандартному порту SSH. Если указано [bind_addr[:port]], другие рабочие процессы будут подключаться к этому рабочему процессу по указанному bind_addr и port.

Возможно запустить несколько процессов на удаленном хосте, используя кортеж в векторе machines или форму (machine_spec, count), где count — это количество рабочих процессов, которые будут запущены на указанном хосте. Передача :auto в качестве количества рабочих процессов запустит столько рабочих процессов, сколько потоков ЦП на удаленном хосте.

Примеры:

addprocs([
    "remote1",               # один рабочий процесс на 'remote1', входящий с текущим именем пользователя
    "user@remote2",          # один рабочий процесс на 'remote2', входящий с именем пользователя 'user'
    "user@remote3:2222",     # указание порта SSH на '2222' для 'remote3'
    ("user@remote4", 4),     # запустить 4 рабочих процесса на 'remote4'
    ("user@remote5", :auto), # запустить столько рабочих процессов, сколько потоков ЦП на 'remote5'
])

Именованные аргументы:

  • tunnel: если true, то будет использоваться SSH-туннелирование для подключения к рабочему процессу из главного процесса. По умолчанию false.

  • multiplex: если true, то для SSH-туннелирования используется SSH-мультиплексирование. По умолчанию false.

  • ssh: имя или путь к исполняемому файлу SSH-клиента, используемого для запуска рабочих процессов. По умолчанию "ssh".

  • sshflags: указывает дополнительные параметры ssh, например sshflags=`-i /home/foo/bar.pem`

  • max_parallel: указывает максимальное количество рабочих процессов, подключенных параллельно на хосте. По умолчанию 10.

  • shell: указывает тип оболочки, к которой ssh подключается на рабочих процессах.

    • shell=:posix: совместимая с POSIX оболочка Unix/Linux (sh, ksh, bash, dash, zsh и т. д.). По умолчанию.
    • shell=:csh: оболочка C Unix (csh, tcsh).
    • shell=:wincmd: Microsoft Windows cmd.exe.
  • dir: указывает рабочий каталог на рабочих процессах. По умолчанию текущий каталог хоста (как найдено с помощью pwd())

  • enable_threaded_blas: если true, то BLAS будет работать на нескольких потоках в добавленных процессах. По умолчанию false.

  • exename: имя исполняемого файла julia. По умолчанию "$(Sys.BINDIR)/julia" или "$(Sys.BINDIR)/julia-debug" в зависимости от случая. Рекомендуется использовать одну и ту же версию Julia на всех удаленных машинах, так как сериализация и распределение кода могут не сработать в противном случае.

  • exeflags: дополнительные флаги, передаваемые рабочим процессам.

  • topology: указывает, как рабочие процессы подключаются друг к другу. Отправка сообщения между неподключенными рабочими процессами приводит к ошибке.

    • topology=:all_to_all: Все процессы подключены друг к другу. По умолчанию.
    • topology=:master_worker: Только главный процесс, т.е. pid 1, подключается к рабочим процессам. Рабочие процессы не подключаются друг к другу.
    • topology=:custom: Метод launch менеджера кластера указывает топологию подключения через поля ident и connect_idents в WorkerConfig. Рабочий процесс с идентичностью менеджера кластера ident подключится ко всем рабочим процессам, указанным в connect_idents.
  • lazy: Применимо только с topology=:all_to_all. Если true, соединения между рабочими процессами устанавливаются лениво, т.е. они устанавливаются при первом вызове удаленного вызова между рабочими процессами. По умолчанию true.

  • env: предоставьте массив пар строк, таких как env=["JULIA_DEPOT_PATH"=>"/depot"], чтобы запросить установку переменных окружения на удаленной машине. По умолчанию только переменная окружения JULIA_WORKER_TIMEOUT передается автоматически из локальной среды в удаленную.

  • cmdline_cookie: передайте аутентификационное cookie через параметр командной строки --worker. (более безопасное) поведение по умолчанию передачи cookie через ssh stdio может зависнуть с рабочими процессами Windows, которые используют более старые (предшествующие ConPTY) версии Julia или Windows, в этом случае cmdline_cookie=true предлагает обходной путь.

Julia 1.6

Именованные аргументы ssh, shell, env и cmdline_cookie были добавлены в Julia 1.6.

Переменные окружения:

Если главный процесс не может установить соединение с вновь запущенным рабочим процессом в течение 60.0 секунд, рабочий процесс рассматривает это как фатальную ситуацию и завершает работу. Этот тайм-аут можно контролировать с помощью переменной окружения JULIA_WORKER_TIMEOUT. Значение JULIA_WORKER_TIMEOUT на главном процессе указывает количество секунд, в течение которых вновь запущенный рабочий процесс ждет установления соединения.

source
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> Список идентификаторов процессов

Запустите np рабочих на локальном хосте, используя встроенный LocalManager.

Локальные рабочие наследуют текущее окружение пакета (т.е. активный проект, LOAD_PATH и DEPOT_PATH) от основного процесса.

Warning

Обратите внимание, что рабочие не выполняют скрипт инициализации ~/.julia/config/startup.jl, и они не синхронизируют свое глобальное состояние (такие как параметры командной строки, глобальные переменные, определения новых методов и загруженные модули) с любыми другими запущенными процессами.

Ключевые аргументы:

  • restrict::Bool: если true (по умолчанию) привязка ограничена 127.0.0.1.
  • dir, exename, exeflags, env, topology, lazy, enable_threaded_blas: тот же эффект, что и для SSHManager, см. документацию для addprocs(machines::AbstractVector).
Julia 1.9

Наследование окружения пакета и ключевой аргумент env были добавлены в Julia 1.9.

source
Distributed.nprocsFunction
nprocs()

Получите количество доступных процессов.

Примеры

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.nworkersFunction
nworkers()

Получите количество доступных рабочих процессов. Это на единицу меньше, чем nprocs(). Равно nprocs(), если nprocs() == 1.

Примеры

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
source
Distributed.procsMethod
procs()

Возвращает список всех идентификаторов процессов, включая pid 1 (который не включен в workers()).

Примеры

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source
Distributed.procsMethod
procs(pid::Integer)

Возвращает список всех идентификаторов процессов на том же физическом узле. В частности, возвращаются все рабочие процессы, привязанные к тому же IP-адресу, что и pid.

source
Distributed.workersFunction
workers()

Возвращает список всех идентификаторов рабочих процессов.

Примеры

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.rmprocsFunction
rmprocs(pids...; waitfor=typemax(Int))

Удалите указанных работников. Обратите внимание, что только процесс 1 может добавлять или удалять работников.

Аргумент waitfor указывает, как долго ждать завершения работы работников:

  • Если не указано, rmprocs будет ждать, пока все запрашиваемые pids не будут удалены.
  • Исключение ErrorException будет вызвано, если все работники не могут быть завершены до истечения запрашиваемых секунд waitfor.
  • При значении waitfor равном 0, вызов возвращается немедленно с работниками, запланированными на удаление в другой задаче. Запланированный объект Task возвращается. Пользователь должен вызвать wait на задаче перед вызовом любых других параллельных вызовов.

Примеры

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source
Distributed.interruptFunction
interrupt(pids::Integer...)

Прерывает текущую выполняемую задачу на указанных рабочих узлах. Это эквивалентно нажатию Ctrl-C на локальной машине. Если аргументы не указаны, все рабочие узлы прерываются.

source
interrupt(pids::AbstractVector=workers())

Прерывает текущую выполняемую задачу на указанных рабочих. Это эквивалентно нажатию Ctrl-C на локальной машине. Если аргументы не указаны, все рабочие прерываются.

source
Distributed.myidFunction
myid()

Получите идентификатор текущего процесса.

Примеры

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source
Distributed.pmapFunction
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

Преобразуйте коллекцию c, применяя f к каждому элементу с использованием доступных рабочих процессов и задач.

Для нескольких аргументов коллекции применяйте f поэлементно.

Обратите внимание, что f должен быть доступен всем рабочим процессам; смотрите Доступность кода и загрузка пакетов для получения подробной информации.

Если пул рабочих процессов не указан, будут использованы все доступные рабочие процессы через CachingPool.

По умолчанию pmap распределяет вычисления между всеми указанными рабочими процессами. Чтобы использовать только локальный процесс и распределить вычисления по задачам, укажите distributed=false. Это эквивалентно использованию asyncmap. Например, pmap(f, c; distributed=false) эквивалентно asyncmap(f,c; ntasks=()->nworkers())

pmap также может использовать смесь процессов и задач через аргумент batch_size. Для размеров пакетов больше 1 коллекция обрабатывается в нескольких пакетах, каждый из которых имеет длину batch_size или меньше. Пакет отправляется как единый запрос к свободному рабочему процессу, где локальный asyncmap обрабатывает элементы из пакета, используя несколько параллельных задач.

Любая ошибка останавливает pmap от обработки оставшейся части коллекции. Чтобы переопределить это поведение, вы можете указать функцию обработки ошибок через аргумент on_error, которая принимает один аргумент, т.е. исключение. Функция может остановить обработку, повторно выбросив ошибку, или, чтобы продолжить, вернуть любое значение, которое затем возвращается вместе с результатами вызывающему.

Рассмотрим следующие два примера. Первый возвращает объект исключения на месте, второй - 0 вместо любого исключения:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

Ошибки также могут обрабатываться путем повторной попытки неудачных вычислений. Ключевые аргументы retry_delays и retry_check передаются в retry как ключевые аргументы delays и check соответственно. Если указано пакетирование, и весь пакет не удался, все элементы в пакете будут повторно попытаны.

Обратите внимание, что если указаны как on_error, так и retry_delays, хук on_error вызывается перед повторной попыткой. Если on_error не выбрасывает (или не повторно выбрасывает) исключение, элемент не будет повторно попытан.

Пример: При ошибках повторно пытайтесь f на элементе максимум 3 раза без задержки между попытками.

pmap(f, c; retry_delays = zeros(3))

Пример: Повторяйте f только если исключение не является типом InexactError, с экспоненциально увеличивающимися задержками до 3 раз. Верните NaN вместо всех случаев InexactError.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
source
Distributed.RemoteExceptionType
RemoteException(захвачено)

Исключения при удаленных вычислениях захватываются и повторно выбрасываются локально. RemoteException оборачивает pid рабочего и захваченное исключение. CapturedException захватывает удаленное исключение и сериализуемую форму стека вызовов, когда было вызвано исключение.

source
Distributed.ProcessExitedExceptionType
ProcessExitedException(worker_id::Int)

После выхода клиентского процесса Julia дальнейшие попытки сослаться на мертвый дочерний процесс вызовут это исключение.

source
Distributed.FutureType
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Future — это заполнитель для одной вычислительной операции с неизвестным статусом завершения и временем. Для нескольких потенциальных вычислений см. RemoteChannel. См. remoteref_id для идентификации AbstractRemoteRef.

source
Distributed.RemoteChannelType
RemoteChannel(pid::Integer=myid())

Создает ссылку на Channel{Any}(1) в процессе pid. Значение по умолчанию для pid — текущий процесс.

RemoteChannel(f::Function, pid::Integer=myid())

Создает ссылки на удаленные каналы определенного размера и типа. f — это функция, которая при выполнении на pid должна вернуть реализацию AbstractChannel.

Например, RemoteChannel(()->Channel{Int}(10), pid), вернет ссылку на канал типа Int и размера 10 на pid.

Значение по умолчанию для pid — текущий процесс.

source
Base.fetchMethod
fetch(x::Future)

Ожидает и получает значение Future. Полученное значение кэшируется локально. Повторные вызовы fetch с той же ссылкой возвращают кэшированное значение. Если удаленное значение является исключением, выбрасывает RemoteException, который захватывает удаленное исключение и трассировку стека.

source
Base.fetchMethod
fetch(c::RemoteChannel)

Ожидает и получает значение из RemoteChannel. Исключения, которые возникают, такие же, как для Future. Не удаляет извлеченный элемент.

source
fetch(x::Any)

Возвращает x.

source
Distributed.remotecallMethod
remotecall(f, id::Integer, args...; kwargs...) -> Future

Вызовите функцию f асинхронно с заданными аргументами на указанном процессе. Верните Future. Ключевые аргументы, если таковые имеются, передаются в f.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, id::Integer, args...; kwargs...)

Выполняет более быстрый wait(remotecall(...)) в одном сообщении на Worker, указанном идентификатором рабочего id. Ключевые аргументы, если таковые имеются, передаются в f.

Смотрите также wait и remotecall.

source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, id::Integer, args...; kwargs...)

Выполняет fetch(remotecall(...)) в одном сообщении. Ключевые аргументы, если они есть, передаются в f. Все удаленные исключения захватываются в RemoteException и выбрасываются.

Смотрите также fetch и remotecall.

Примеры

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: На рабочем узле 2:
DomainError с -4.0:
sqrt был вызван с отрицательным действительным аргументом, но вернет комплексный результат только если будет вызван с комплексным аргументом. Попробуйте sqrt(Complex(x)).
...
source
Distributed.remote_doMethod
remote_do(f, id::Integer, args...; kwargs...) -> nothing

Выполняет f на рабочем узле id асинхронно. В отличие от remotecall, он не сохраняет результат вычисления и нет возможности дождаться его завершения.

Успешный вызов указывает на то, что запрос был принят для выполнения на удаленном узле.

Хотя последовательные remotecall к одному и тому же рабочему узлу сериализуются в порядке их вызова, порядок выполнения на удаленном рабочем узле не определен. Например, remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) сериализует вызов f1, за которым следуют f2 и f3 в этом порядке. Однако не гарантируется, что f1 будет выполнен до f3 на рабочем узле 2.

Любые исключения, выбрасываемые f, выводятся в stderr на удаленном рабочем узле.

Аргументы ключевых слов, если таковые имеются, передаются в f.

source
Base.put!Method
put!(rr::RemoteChannel, args...)

Сохраните набор значений в RemoteChannel. Если канал полон, блокируется до тех пор, пока не появится место. Возвращает первый аргумент.

source
Base.put!Method
put!(rr::Future, v)

Сохраните значение в Future rr. Future — это ссылки на удаленные объекты, которые можно записать только один раз. Вызов put! на уже установленном Future вызывает Exception. Все асинхронные удаленные вызовы возвращают Future и устанавливают значение в возвращаемое значение вызова по завершении.

source
Base.take!Method
take!(rr::RemoteChannel, args...)

Извлекает значение(я) из RemoteChannel rr, удаляя значение(я) в процессе.

source
Base.isreadyMethod
isready(rr::RemoteChannel, args...)

Определите, имеет ли RemoteChannel сохраненное значение. Обратите внимание, что эта функция может вызвать гонки, так как к моменту получения результата это может уже не быть верным. Тем не менее, ее можно безопасно использовать с Future, так как они назначаются только один раз.

source
Base.isreadyMethod
isready(rr::Future)

Определяет, имеет ли Future сохраненное значение.

Если аргумент Future принадлежит другой ноде, этот вызов будет блокировать выполнение до получения ответа. Рекомендуется ожидать rr в отдельной задаче или использовать локальный Channel в качестве прокси:

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # не будет блокировать
source
Distributed.AbstractWorkerPoolType
AbstractWorkerPool

Супертип для пулов рабочих, таких как WorkerPool и CachingPool. AbstractWorkerPool должен реализовывать:

  • push! - добавить нового рабочего в общий пул (доступные + занятые)
  • put! - вернуть рабочего в доступный пул
  • take! - взять рабочего из доступного пула (для использования в удаленном выполнении функции)
  • length - количество рабочих, доступных в общем пуле
  • isready - вернуть false, если take! в пуле заблокирует, иначе true

Стандартные реализации вышеуказанных методов (в AbstractWorkerPool) требуют поля

  • channel::Channel{Int}
  • workers::Set{Int}

где channel содержит свободные pid рабочих, а workers - это множество всех рабочих, связанных с этим пулом.

source
Distributed.WorkerPoolType
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

Создайте WorkerPool из вектора или диапазона идентификаторов рабочих.

Примеры

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
source
Distributed.CachingPoolType
CachingPool(workers::Vector{Int})

Реализация AbstractWorkerPool. remote, remotecall_fetch, pmap (и другие удаленные вызовы, которые выполняют функции удаленно) получают выгоду от кэширования сериализованных/десериализованных функций на узлах-работниках, особенно замыканий (которые могут захватывать большие объемы данных).

Удаленный кэш поддерживается на протяжении всего времени жизни возвращаемого объекта CachingPool. Чтобы очистить кэш раньше, используйте clear!(pool).

Для глобальных переменных в замыкании захватываются только привязки, а не данные. Блоки let могут быть использованы для захвата глобальных данных.

Примеры

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

Вышеуказанное передаст foo только один раз каждому работнику.

source
Distributed.default_worker_poolFunction
default_worker_pool()

AbstractWorkerPool, содержащий бездействующих workers - используется в remote(f) и pmap (по умолчанию). Если явно не задано через default_worker_pool!(pool), пул рабочих по умолчанию инициализируется как WorkerPool.

Примеры

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source
Distributed.clear!Function
clear!(syms, pids=workers(); mod=Main)

Очищает глобальные привязки в модулях, инициализируя их значением nothing. syms должен быть типа Symbol или коллекцией Symbols. pids и mod определяют процессы и модуль, в котором глобальные переменные должны быть переинициализированы. Очищаются только те имена, которые были определены в mod.

Исключение возникает, если запрашивается очистка глобальной константы.

source
clear!(pool::CachingPool) -> pool

Удаляет все кэшированные функции у всех участвующих рабочих.

source
Distributed.remoteFunction
remote([p::AbstractWorkerPool], f) -> Function

Возвращает анонимную функцию, которая выполняет функцию f на доступном рабочем узле (взятым из WorkerPool p, если он предоставлен) с использованием remotecall_fetch.

source
Distributed.remotecallMethod
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool вариант remotecall(f, pid, ....). Ожидает и берет свободного работника из pool и выполняет remotecall на нем.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

В этом примере задача выполнялась на pid 2, вызванная из pid 1.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool вариант remotecall_wait(f, pid, ....). Ожидает и берет свободного работника из pool и выполняет remotecall_wait на нем.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> результат

WorkerPool вариант remotecall_fetch(f, pid, ....). Ожидает и берет свободного работника из pool и выполняет remotecall_fetch на нем.

Примеры

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source
Distributed.remote_doMethod
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> ничего

WorkerPool вариант remote_do(f, pid, ....). Ждите и возьмите свободного работника из pool и выполните remote_do на нем.

source
Distributed.@spawnMacro
@spawn expr

Создает замыкание вокруг выражения и выполняет его на автоматически выбранном процессе, возвращая Future к результату. Этот макрос устарел; вместо него следует использовать @spawnat :any expr.

Примеры

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

Начиная с Julia 1.3 этот макрос устарел. Используйте @spawnat :any вместо.

source
Distributed.@spawnatMacro
@spawnat p expr

Создайте замыкание вокруг выражения и выполните замыкание асинхронно на процессе p. Верните Future к результату. Если p является цитируемым литеральным символом :any, то система автоматически выберет процессор для использования.

Примеры

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

Аргумент :any доступен начиная с Julia 1.3.

source
Distributed.@fetchMacro
@fetch expr

Эквивалентно fetch(@spawnat :any expr). См. fetch и @spawnat.

Примеры

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source
Distributed.@fetchfromMacro
@fetchfrom

Эквивалентно fetch(@spawnat p expr). См. fetch и @spawnat.

Примеры

julia> addprocs(3);

julia> @fetchfrom 2 myid()
2

julia> @fetchfrom 4 myid()
4
source
Distributed.@distributedMacro
@distributed

Распределенный параллельный цикл for следующего вида:

@distributed [reducer] for var = range
    body
end

Указанный диапазон разбивается на части и локально выполняется на всех рабочих узлах. В случае, если указана необязательная функция редуктора, @distributed выполняет локальные редукции на каждом рабочем узле с финальной редукцией на вызывающем процессе.

Обратите внимание, что без функции редуктора @distributed выполняется асинхронно, т.е. он запускает независимые задачи на всех доступных рабочих узлах и сразу возвращает, не дожидаясь завершения. Чтобы дождаться завершения, добавьте префикс к вызову @sync, например:

@sync @distributed for var = range
    body
end
source
Distributed.@everywhereMacro
@everywhere [procs()] expr

Выполните выражение в Main на всех procs. Ошибки на любом из процессов собираются в CompositeException и выбрасываются. Например:

@everywhere bar = 1

определит Main.bar на всех текущих процессах. Любые процессы, добавленные позже (например, с помощью addprocs()), не будут иметь определенное выражение.

В отличие от @spawnat, @everywhere не захватывает локальные переменные. Вместо этого локальные переменные могут быть переданы с помощью интерполяции:

foo = 1
@everywhere bar = $foo

Необязательный аргумент procs позволяет указать подмножество всех процессов для выполнения выражения.

Похоже на вызов remotecall_eval(Main, procs, expr), но с двумя дополнительными функциями:

- Операторы `using` и `import` выполняются сначала на вызывающем процессе, чтобы гарантировать,
  что пакеты предварительно скомпилированы.
- Текущий путь к исходному файлу, используемому `include`, передается другим процессам.
source
Distributed.remoteref_idFunction
remoteref_id(r::AbstractRemoteRef) -> RRID

Futures и RemoteChannels идентифицируются по полям:

  • where - ссылается на узел, где на самом деле существует основной объект/хранилище, на которое ссылается ссылка.
  • whence - ссылается на узел, из которого была создана удаленная ссылка. Обратите внимание, что это отличается от узла, где на самом деле существует основной объект, на который ссылается ссылка. Например, вызов RemoteChannel(2) из главного процесса приведет к значению where равному 2 и значению whence равному 1.
  • id уникален для всех ссылок, созданных из рабочего процесса, указанного в whence.

Вместе whence и id уникально идентифицируют ссылку среди всех рабочих процессов.

remoteref_id - это низкоуровневый API, который возвращает объект RRID, оборачивающий значения whence и id удаленной ссылки.

source
Distributed.channel_from_idFunction
channel_from_id(id) -> c

Низкоуровневый API, который возвращает базовый AbstractChannel для id, возвращенного функцией remoteref_id. Вызов действителен только на узле, где существует базовый канал.

source
Distributed.worker_id_from_socketFunction
worker_id_from_socket(s) -> pid

Низкоуровневый API, который, принимая IO соединение или Worker, возвращает pid рабочего, к которому он подключен. Это полезно при написании пользовательских serialize методов для типа, которые оптимизируют записываемые данные в зависимости от идентификатора процесса получателя.

source
Distributed.cluster_cookieMethod
cluster_cookie(cookie) -> cookie

Установите переданный cookie как кластерный cookie, затем верните его.

source

Cluster Manager Interface

Этот интерфейс предоставляет механизм для запуска и управления рабочими процессами Julia в различных кластерных средах. В Base присутствуют два типа менеджеров: LocalManager, для запуска дополнительных рабочих процессов на том же хосте, и SSHManager, для запуска на удаленных хостах через ssh. Для соединения и передачи сообщений между процессами используются сокеты TCP/IP. Менеджеры кластеров могут предоставлять другой транспорт.

Distributed.ClusterManagerType
ClusterManager

Супертип для менеджеров кластеров, которые контролируют рабочие процессы как кластер. Менеджеры кластеров реализуют, как рабочие процессы могут быть добавлены, удалены и с ними можно общаться. SSHManager и LocalManager являются подтипами этого.

source
Distributed.WorkerConfigType
WorkerConfig

Тип, используемый ClusterManager для управления рабочими процессами, добавленными в их кластеры. Некоторые поля используются всеми менеджерами кластера для доступа к хосту:

  • io – соединение, используемое для доступа к рабочему процессу (подтип IO или Nothing)
  • host – адрес хоста (либо String, либо Nothing)
  • port – порт на хосте, используемый для подключения к рабочему процессу (либо Int, либо Nothing)

Некоторые используются менеджером кластера для добавления рабочих процессов на уже инициализированный хост:

  • count – количество рабочих процессов, которые будут запущены на хосте
  • exename – путь к исполняемому файлу Julia на хосте, по умолчанию "$(Sys.BINDIR)/julia" или "$(Sys.BINDIR)/julia-debug"
  • exeflags – флаги, которые следует использовать при удаленном запуске Julia

Поле userdata используется для хранения информации для каждого рабочего процесса внешними менеджерами.

Некоторые поля используются SSHManager и аналогичными менеджерами:

  • tunneltrue (использовать туннелирование), false (не использовать туннелирование) или nothing (использовать значение по умолчанию для менеджера)
  • multiplextrue (использовать SSH мультиплексирование для туннелирования) или false
  • forward – опция пересылки, используемая для опции -L ssh
  • bind_addr – адрес на удаленном хосте, к которому нужно привязаться
  • sshflags – флаги, которые следует использовать при установлении SSH-соединения
  • max_parallel – максимальное количество рабочих процессов, к которым можно подключиться параллельно на хосте

Некоторые поля используются как LocalManager, так и SSHManager:

  • connect_at – определяет, является ли это вызовом настройки от рабочего процесса к рабочему процессу или от драйвера к рабочему процессу
  • process – процесс, к которому будет подключено (обычно менеджер назначает это во время addprocs)
  • ospid – идентификатор процесса в соответствии с ОС хоста, используемый для прерывания рабочих процессов
  • environ – частный словарь, используемый для хранения временной информации менеджерами Local/SSH
  • ident – рабочий процесс, идентифицированный ClusterManager
  • connect_idents – список идентификаторов рабочих процессов, к которым рабочий процесс должен подключиться, если используется пользовательская топология
  • enable_threaded_blastrue, false или nothing, использовать ли многопоточную BLAS на рабочих процессах или нет
source
Distributed.launchFunction
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

Реализовано менеджерами кластеров. Для каждого рабочего процесса Julia, запущенного этой функцией, он должен добавить запись WorkerConfig в launched и уведомить launch_ntfy. Функция ДОЛЖНА завершиться, как только все рабочие процессы, запрошенные manager, будут запущены. params — это словарь всех аргументов ключевых слов, с которыми был вызван addprocs.

source
Distributed.manageFunction
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

Реализовано менеджерами кластеров. Вызывается в основном процессе, в течение жизни рабочего, с соответствующими значениями op:

  • с :register/:deregister, когда рабочий добавляется / удаляется из пула рабочих Julia.
  • с :interrupt, когда вызывается interrupt(workers). ClusterManager должен сигнализировать соответствующему рабочему с помощью сигнала прерывания.
  • с :finalize для целей очистки.
source
Base.killMethod
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

Реализовано менеджерами кластеров. Вызывается в главном процессе с помощью rmprocs. Это должно привести к выходу удаленного рабочего, указанного pid. kill(manager::ClusterManager.....) выполняет удаленный exit() на pid.

source
Sockets.connectMethod
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

Реализовано менеджерами кластеров с использованием пользовательских транспортов. Он должен установить логическое соединение с рабочим с идентификатором pid, указанным в config, и вернуть пару объектов IO. Сообщения от pid к текущему процессу будут считываться из instrm, в то время как сообщения, которые необходимо отправить pid, будут записываться в outstrm. Реализация пользовательского транспорта должна гарантировать, что сообщения доставляются и принимаются полностью и в порядке. connect(manager::ClusterManager.....) настраивает соединения TCP/IP между рабочими.

source
Distributed.init_workerFunction
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

Вызывается менеджерами кластеров, реализующими пользовательские транспорты. Он инициализирует вновь запущенный процесс как рабочий. Аргумент командной строки --worker[=<cookie>] имеет эффект инициализации процесса как рабочего с использованием TCP/IP сокетов для транспорта. cookie является cluster_cookie.

source
Distributed.start_workerFunction
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker — это внутренняя функция, которая является точкой входа по умолчанию для рабочих процессов, подключающихся через TCP/IP. Она настраивает процесс как рабочего в кластере Julia.

Информация о хосте:порту записывается в поток out (по умолчанию stdout).

Функция считывает cookie из stdin, если это необходимо, и слушает на свободном порту (или, если указано, на порту в параметре командной строки --bind-to) и планирует задачи для обработки входящих TCP-соединений и запросов. Она также (по желанию) закрывает stdin и перенаправляет stderr в stdout.

Она не возвращает значения.

source
Distributed.process_messagesFunction
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

Вызывается менеджерами кластера с использованием пользовательских транспортов. Он должен вызываться, когда реализация пользовательского транспорта получает первое сообщение от удаленного рабочего. Пользовательский транспорт должен управлять логическим соединением с удаленным рабочим и предоставлять два объекта IO, один для входящих сообщений и другой для сообщений, адресованных удаленному рабочему. Если incoming равно true, удаленный партнер инициировал соединение. Тот, кто из пары инициирует соединение, отправляет кластерный куки и номер своей версии Julia для выполнения аутентификационного рукопожатия.

Смотрите также cluster_cookie.

source
Distributed.default_addprocs_paramsFunction
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

Реализовано менеджерами кластеров. Параметры по умолчанию, передаваемые при вызове addprocs(mgr). Минимальный набор опций доступен при вызове default_addprocs_params().

source