Distributed Computing
Distributed
— Moduleأدوات المعالجة المتوازية الموزعة.
Distributed.addprocs
— Functionaddprocs(manager::ClusterManager; kwargs...) -> قائمة معرفات العمليات
يبدأ عمليات العمل عبر مدير الكتلة المحدد.
على سبيل المثال، يتم دعم مجموعات Beowulf عبر مدير كتلة مخصص تم تنفيذه في الحزمة ClusterManagers.jl
.
يمكن تحديد عدد الثواني التي تنتظر فيها عملية العمل الجديدة لإنشاء اتصال من الماستر عبر المتغير JULIA_WORKER_TIMEOUT
في بيئة عملية العمل. ذات صلة فقط عند استخدام TCP/IP كوسيلة نقل.
لإطلاق العمال دون حظر REPL، أو الوظيفة المحتوية إذا تم إطلاق العمال برمجيًا، نفذ addprocs
في مهمته الخاصة.
أمثلة
# في المجموعات المزدحمة، استدع `addprocs` بشكل غير متزامن
t = @async addprocs(...)
# استخدم العمال عند قدومهم عبر الإنترنت
if nprocs() > 1 # تأكد من توفر عامل جديد واحد على الأقل
.... # تنفيذ توزيع
end
# استرجاع معرفات العمال الجدد، أو أي رسائل خطأ
if istaskdone(t) # تحقق مما إذا كانت `addprocs` قد اكتملت لضمان عدم حظر `fetch`
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
end
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> قائمة معرفات العمليات
أضف عمليات عامل على آلات بعيدة عبر SSH. يتم تكوين ذلك باستخدام معلمات الكلمات الرئيسية (انظر أدناه). على وجه الخصوص، يمكن استخدام معلمة exename
لتحديد مسار ثنائي julia
على الآلة (الآلات) البعيدة.
machines
هو متجه من "مواصفات الآلات" التي تُعطى كسلاسل من الشكل [user@]host[:port] [bind_addr[:port]]
. المستخدم الافتراضي هو المستخدم الحالي وport
هو منفذ SSH القياسي. إذا تم تحديد [bind_addr[:port]]
، ستتصل العمال الآخرون بهذا العامل عند bind_addr
وport
المحددين.
من الممكن إطلاق عمليات متعددة على مضيف بعيد باستخدام مجموعة في متجه machines
أو الشكل (machine_spec, count)
، حيث count
هو عدد العمال الذين سيتم إطلاقهم على المضيف المحدد. تمرير :auto
كعدد العمال سيطلق عددًا من العمال يساوي عدد خيوط المعالجة على المضيف البعيد.
أمثلة:
addprocs([
"remote1", # عامل واحد على 'remote1' يسجل الدخول باستخدام اسم المستخدم الحالي
"user@remote2", # عامل واحد على 'remote2' يسجل الدخول باستخدام اسم المستخدم 'user'
"user@remote3:2222", # تحديد منفذ SSH إلى '2222' لـ 'remote3'
("user@remote4", 4), # إطلاق 4 عمال على 'remote4'
("user@remote5", :auto), # إطلاق عدد من العمال يساوي عدد خيوط المعالجة على 'remote5'
])
معلمات الكلمات الرئيسية:
tunnel
: إذا كانتtrue
فسيتم استخدام نفق SSH للاتصال بالعامل من عملية الماستر. الافتراضي هوfalse
.multiplex
: إذا كانتtrue
فسيتم استخدام تعدد الإرسال عبر SSH لنفق SSH. الافتراضي هوfalse
.ssh
: اسم أو مسار عميل SSH القابل للتنفيذ المستخدم لبدء العمال. الافتراضي هو"ssh"
.sshflags
: يحدد خيارات SSH إضافية، مثلsshflags=`-i /home/foo/bar.pem`
max_parallel
: يحدد الحد الأقصى لعدد العمال المتصلين في وقت واحد على مضيف. الافتراضي هو 10.shell
: يحدد نوع الصدفة التي يتصل بها SSH بالعمال.shell=:posix
: صدفة Unix/Linux متوافقة مع POSIX (sh، ksh، bash، dash، zsh، إلخ). الافتراضي.shell=:csh
: صدفة C Unix (csh، tcsh).shell=:wincmd
: Microsoft Windowscmd.exe
.
dir
: يحدد دليل العمل على العمال. الافتراضي هو الدليل الحالي للمضيف (كما هو موجود بواسطةpwd()
)enable_threaded_blas
: إذا كانتtrue
فسيتم تشغيل BLAS على خيوط متعددة في العمليات المضافة. الافتراضي هوfalse
.exename
: اسم القابل للتنفيذjulia
. الافتراضي هو"$(Sys.BINDIR)/julia"
أو"$(Sys.BINDIR)/julia-debug"
حسب الحالة. يُوصى باستخدام إصدار شائع من Julia على جميع الآلات البعيدة لأن التسلسل والتوزيع قد يفشل خلاف ذلك.exeflags
: علامات إضافية تمرر إلى عمليات العمال.topology
: يحدد كيفية اتصال العمال ببعضهم البعض. إرسال رسالة بين العمال غير المتصلين يؤدي إلى حدوث خطأ.topology=:all_to_all
: جميع العمليات متصلة ببعضها البعض. الافتراضي.topology=:master_worker
: فقط عملية السائق، أيpid
1 تتصل بالعمال. لا تتصل العمال ببعضها البعض.topology=:custom
: تحدد طريقةlaunch
لمدير الكتلة طوبولوجيا الاتصال عبر الحقولident
وconnect_idents
فيWorkerConfig
. سيتصل عامل له هوية مدير الكتلةident
بجميع العمال المحددين فيconnect_idents
.
lazy
: ينطبق فقط معtopology=:all_to_all
. إذا كانتtrue
، يتم إعداد اتصالات العامل-العامل بشكل كسول، أي يتم إعدادها عند أول حالة من استدعاء بعيد بين العمال. الافتراضي هو true.env
: قدم مصفوفة من أزواج السلاسل مثلenv=["JULIA_DEPOT_PATH"=>"/depot"]
لطلب تعيين متغيرات البيئة على الآلة البعيدة. بشكل افتراضي، يتم تمرير متغير البيئةJULIA_WORKER_TIMEOUT
تلقائيًا من البيئة المحلية إلى البيئة البعيدة.cmdline_cookie
: تمرير ملف تعريف المصادقة عبر خيار سطر الأوامر--worker
. قد يؤدي السلوك الافتراضي (الأكثر أمانًا) المتمثل في تمرير ملف تعريف عبر SSH stdio إلى تعليق مع العمال على Windows الذين يستخدمون إصدارات أقدم (قبل ConPTY) من Julia أو Windows، وفي هذه الحالة يقدمcmdline_cookie=true
حلاً بديلًا.
تمت إضافة معلمات الكلمات الرئيسية ssh
و shell
و env
و cmdline_cookie
في Julia 1.6.
متغيرات البيئة:
إذا فشلت عملية الماستر في إنشاء اتصال مع عامل تم إطلاقه حديثًا خلال 60.0 ثانية، فإن العامل يعتبر ذلك وضعًا مميتًا وينهي نفسه. يمكن التحكم في هذا المهلة عبر متغير البيئة JULIA_WORKER_TIMEOUT
. تحدد قيمة JULIA_WORKER_TIMEOUT
على عملية الماستر عدد الثواني التي ينتظرها العامل الذي تم إطلاقه حديثًا لإنشاء الاتصال.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> قائمة معرفات العمليات
قم بإطلاق عمال np
على المضيف المحلي باستخدام LocalManager
المدمج.
ترث العمال المحليون بيئة الحزمة الحالية (أي المشروع النشط، LOAD_PATH
، وDEPOT_PATH
) من العملية الرئيسية.
!!! تحذير لاحظ أن العمال لا يقومون بتشغيل نص بدء التشغيل ~/.julia/config/startup.jl
، ولا يقومون بمزامنة حالتهم العالمية (مثل مفاتيح سطر الأوامر، المتغيرات العالمية، تعريفات الطرق الجديدة، والوحدات المحملة) مع أي من العمليات الأخرى التي تعمل.
وسائط الكلمات الرئيسية:
restrict::Bool
: إذا كانتtrue
(افتراضي) فإن الربط مقيد إلى127.0.0.1
.dir
،exename
،exeflags
،env
،topology
،lazy
،enable_threaded_blas
: نفس التأثير كما هو الحال فيSSHManager
، انظر الوثائق لـaddprocs(machines::AbstractVector)
.
!!! توافق "جوليا 1.9" تم إضافة وراثة بيئة الحزمة ووسيط الكلمة الرئيسية env
في جوليا 1.9.
Distributed.nprocs
— Functionnprocs()
احصل على عدد العمليات المتاحة.
أمثلة
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.nworkers
— Functionnworkers()
احصل على عدد عمليات العمال المتاحة. هذا أقل بواحد من nprocs()
. يساوي nprocs()
إذا كان nprocs() == 1
.
أمثلة
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2
Distributed.procs
— Methodprocs()
إرجاع قائمة بجميع معرفات العمليات، بما في ذلك pid 1 (الذي لا يتم تضمينه بواسطة workers()
).
أمثلة
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3
Distributed.procs
— Methodprocs(pid::Integer)
إرجاع قائمة بجميع معرفات العمليات على نفس العقدة الفيزيائية. على وجه التحديد، يتم إرجاع جميع العمال المرتبطين بنفس عنوان ip مثل pid
.
Distributed.workers
— Functionworkers()
إرجاع قائمة بجميع معرفات عمليات العمال.
أمثلة
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3
Distributed.rmprocs
— Functionrmprocs(pids...; waitfor=typemax(Int))
قم بإزالة العمال المحددين. لاحظ أن العملية 1 فقط يمكنها إضافة أو إزالة العمال.
المعلمة waitfor
تحدد المدة التي يجب الانتظار فيها حتى يتم إيقاف تشغيل العمال:
- إذا لم يتم تحديدها، ستنتظر
rmprocs
حتى يتم إزالة جميعpids
المطلوبة. - يتم رفع
ErrorException
إذا لم يكن من الممكن إنهاء جميع العمال قبل انتهاء ثوانيwaitfor
المطلوبة. - مع قيمة
waitfor
تساوي 0، تعود المكالمة على الفور مع جدولة العمال للإزالة في مهمة مختلفة. يتم إرجاع كائنTask
المجدول. يجب على المستخدم استدعاءwait
على المهمة قبل استدعاء أي مكالمات متوازية أخرى.
أمثلة
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6
Distributed.interrupt
— Functioninterrupt(pids::Integer...)
قم بإيقاف المهمة الحالية التي يتم تنفيذها على العمال المحددين. هذا يعادل الضغط على Ctrl-C على الجهاز المحلي. إذا لم يتم إعطاء أي وسائط، يتم إيقاف جميع العمال.
interrupt(pids::AbstractVector=workers())
قم بإيقاف المهمة الحالية التي يتم تنفيذها على العمال المحددين. هذا يعادل الضغط على Ctrl-C على الجهاز المحلي. إذا لم يتم إعطاء أي معطيات، يتم إيقاف جميع العمال.
Distributed.myid
— Functionmyid()
احصل على معرف العملية الحالية.
أمثلة
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4
Distributed.pmap
— Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection
قم بتحويل المجموعة c
عن طريق تطبيق f
على كل عنصر باستخدام العمال والمهام المتاحة.
بالنسبة لعدة معطيات من المجموعات، يتم تطبيق f
عنصرًا بعنصر.
لاحظ أنه يجب جعل f
متاحًا لجميع عمليات العمال؛ انظر Code Availability and Loading Packages للحصول على التفاصيل.
إذا لم يتم تحديد مجموعة العمال، فسيتم استخدام جميع العمال المتاحين عبر CachingPool
.
بشكل افتراضي، يقوم pmap
بتوزيع الحساب على جميع العمال المحددين. لاستخدام العملية المحلية فقط وتوزيعها على المهام، حدد distributed=false
. هذا يعادل استخدام asyncmap
. على سبيل المثال، pmap(f, c; distributed=false)
يعادل asyncmap(f,c; ntasks=()->nworkers())
يمكن أيضًا استخدام pmap
مزيجًا من العمليات والمهام عبر معطى batch_size
. بالنسبة لأحجام الدفعات الأكبر من 1، تتم معالجة المجموعة في دفعات متعددة، كل منها بطول batch_size
أو أقل. يتم إرسال دفعة كطلب واحد إلى عامل مجاني، حيث تقوم asyncmap
المحلية بمعالجة العناصر من الدفعة باستخدام مهام متزامنة متعددة.
أي خطأ يوقف pmap
عن معالجة بقية المجموعة. لتجاوز هذا السلوك، يمكنك تحديد دالة معالجة الأخطاء عبر المعطى on_error
التي تأخذ في اعتبارها معطى واحد، أي الاستثناء. يمكن للدالة إيقاف المعالجة عن طريق إعادة طرح الخطأ، أو، للاستمرار، إرجاع أي قيمة يتم إرجاعها بعد ذلك مع النتائج إلى المتصل.
اعتبر المثالين التاليين. الأول يعيد كائن الاستثناء في السطر، والثاني 0 بدلاً من أي استثناء:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0
يمكن أيضًا معالجة الأخطاء عن طريق إعادة محاولة الحسابات الفاشلة. يتم تمرير المعطيات الرئيسية retry_delays
و retry_check
إلى retry
كمعطيات رئيسية delays
و check
على التوالي. إذا تم تحديد التجميع، وإذا فشلت دفعة كاملة، يتم إعادة محاولة جميع العناصر في الدفعة.
لاحظ أنه إذا تم تحديد كل من on_error
و retry_delays
، يتم استدعاء دالة on_error
قبل إعادة المحاولة. إذا لم تقم on_error
بإلقاء (أو إعادة إلقاء) استثناء، فلن يتم إعادة محاولة العنصر.
مثال: عند حدوث الأخطاء، أعد محاولة f
على عنصر بحد أقصى 3 مرات دون أي تأخير بين المحاولات.
pmap(f, c; retry_delays = zeros(3))
مثال: أعد محاولة f
فقط إذا لم يكن الاستثناء من نوع InexactError
، مع تأخيرات متزايدة بشكل أسي تصل إلى 3 مرات. أعد قيمة NaN
في مكان جميع حالات InexactError
.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
Distributed.RemoteException
— TypeRemoteException(captured)
يتم التقاط الاستثناءات في الحسابات عن بُعد وإعادة طرحها محليًا. يقوم RemoteException
بلف pid
للعامل واستثناء تم التقاطه. يقوم CapturedException
بالتقاط الاستثناء عن بُعد وشكل قابل للتسلسل من مكدس الاستدعاءات عندما تم رفع الاستثناء.
Distributed.ProcessExitedException
— TypeProcessExitedException(worker_id::Int)
بعد أن تخرج عملية جوليا العميل، ستؤدي المحاولات الإضافية للإشارة إلى الطفل الميت إلى إلقاء هذا الاستثناء.
Distributed.Future
— TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)
Future
هو عنصر نائب لحساب واحد بحالة إنهاء غير معروفة ووقت غير محدد. للحسابات المحتملة المتعددة، انظر RemoteChannel
. انظر remoteref_id
لتحديد AbstractRemoteRef
.
Distributed.RemoteChannel
— TypeRemoteChannel(pid::Integer=myid())
قم بإنشاء مرجع إلى Channel{Any}(1)
على العملية pid
. pid
الافتراضي هو العملية الحالية.
RemoteChannel(f::Function, pid::Integer=myid())
قم بإنشاء مراجع لقنوات بعيدة من حجم ونوع محددين. f
هو دالة عند تنفيذها على pid
يجب أن تعيد تنفيذًا لـ AbstractChannel
.
على سبيل المثال، RemoteChannel(()->Channel{Int}(10), pid)
، ستعيد مرجعًا إلى قناة من النوع Int
وحجم 10 على pid
.
pid
الافتراضي هو العملية الحالية.
Base.fetch
— Methodfetch(x::Future)
انتظر واحصل على قيمة Future
. القيمة المسترجعة مخزنة محليًا. المكالمات الإضافية إلى fetch
على نفس المرجع تعيد القيمة المخزنة. إذا كانت القيمة البعيدة استثناءً، يتم إلقاء RemoteException
التي تلتقط الاستثناء البعيد وتتبع الخطأ.
Base.fetch
— Methodfetch(c::RemoteChannel)
انتظر واحصل على قيمة من RemoteChannel
. الاستثناءات التي تثار هي نفسها كما في Future
. لا يزيل العنصر الذي تم جلبه.
fetch(x::Any)
إرجاع x
.
Distributed.remotecall
— Methodremotecall(f, id::Integer, args...; kwargs...) -> Future
استدعاء دالة f
بشكل غير متزامن مع المعطيات المعطاة على العملية المحددة. يُرجع Future
. يتم تمرير الوسائط الرئيسية، إن وجدت، إلى f
.
Distributed.remotecall_wait
— Methodremotecall_wait(f, id::Integer, args...; kwargs...)
قم بتنفيذ wait(remotecall(...))
بشكل أسرع في رسالة واحدة على Worker
المحدد بواسطة معرف العامل id
. يتم تمرير الوسائط الرئيسية، إن وجدت، إلى f
.
انظر أيضًا wait
و remotecall
.
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, id::Integer, args...; kwargs...)
قم بتنفيذ fetch(remotecall(...))
في رسالة واحدة. يتم تمرير أي معلمات مفتاحية، إن وجدت، إلى f
. يتم التقاط أي استثناءات عن بُعد في RemoteException
وإلقاؤها.
انظر أيضًا fetch
و remotecall
.
أمثلة
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
Distributed.remote_do
— Methodremote_do(f, id::Integer, args...; kwargs...) -> nothing
ينفذ f
على العامل id
بشكل غير متزامن. على عكس remotecall
، فإنه لا يخزن نتيجة الحساب، ولا توجد طريقة للانتظار حتى اكتماله.
تشير الدعوة الناجحة إلى أن الطلب قد تم قبوله للتنفيذ على العقدة البعيدة.
بينما يتم تسلسل remotecall
s المتتالية إلى نفس العامل بالترتيب الذي تم استدعاؤها به، فإن ترتيب التنفيذ على العامل البعيد غير محدد. على سبيل المثال، remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2)
ستسلسل الاستدعاء إلى f1
، يليه f2
و f3
بهذا الترتيب. ومع ذلك، لا يضمن أن يتم تنفيذ f1
قبل f3
على العامل 2.
أي استثناءات يتم طرحها بواسطة f
يتم طباعتها إلى stderr
على العامل البعيد.
تُمرر الوسائط الرئيسية، إن وجدت، إلى f
.
Base.put!
— Methodput!(rr::RemoteChannel, args...)
قم بتخزين مجموعة من القيم في RemoteChannel
. إذا كانت القناة ممتلئة، يتم حظرها حتى يتوفر مساحة. ارجع الحجة الأولى.
Base.put!
— Methodput!(rr::Future, v)
قم بتخزين قيمة في Future
rr
. تعتبر Future
s مراجع بعيدة للكتابة مرة واحدة. يؤدي استخدام put!
على Future
تم تعيينه بالفعل إلى رمي Exception
. جميع المكالمات البعيدة غير المتزامنة تعيد Future
s وتقوم بتعيين القيمة إلى قيمة الإرجاع للمكالمة عند الانتهاء.
Base.take!
— Methodtake!(rr::RemoteChannel, args...)
استرجع القيمة (القيم) من RemoteChannel
rr
، مع إزالة القيمة (القيم) في هذه العملية.
Base.isready
— Methodisready(rr::RemoteChannel, args...)
تحديد ما إذا كان RemoteChannel
يحتوي على قيمة مخزنة فيه. لاحظ أن هذه الدالة يمكن أن تسبب حالات سباق، حيث أنه بحلول الوقت الذي تتلقى فيه نتيجتها قد لا يكون ذلك صحيحًا بعد الآن. ومع ذلك، يمكن استخدامها بأمان على Future
لأنها تُعين مرة واحدة فقط.
Base.isready
— Methodisready(rr::Future)
تحديد ما إذا كان Future
يحتوي على قيمة مخزنة فيه.
إذا كانت الحجة Future
مملوكة من قبل عقدة مختلفة، ستقوم هذه المكالمة بالانتظار للحصول على الإجابة. يُوصى بالانتظار لـ rr
في مهمة منفصلة بدلاً من ذلك أو استخدام Channel
محلي كوكيل:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # لن يتم حظره
Distributed.AbstractWorkerPool
— TypeAbstractWorkerPool
نوع أساسي لمجموعات العمال مثل WorkerPool
و CachingPool
. يجب على AbstractWorkerPool
تنفيذ:
push!
- إضافة عامل جديد إلى المجموعة العامة (متاح + مشغول)put!
- إعادة عامل إلى المجموعة المتاحةtake!
- أخذ عامل من المجموعة المتاحة (لاستخدامه في تنفيذ الدوال عن بُعد)length
- عدد العمال المتاحين في المجموعة العامةisready
- إرجاع false إذا كانtake!
على المجموعة سيؤدي إلى حظر، وإلا true
تتطلب التنفيذات الافتراضية لما سبق (على AbstractWorkerPool
) حقول
channel::Channel{Int}
workers::Set{Int}
حيث يحتوي channel
على معرفات العمليات الحرة للعمال و workers
هو مجموعة جميع العمال المرتبطين بهذه المجموعة.
Distributed.WorkerPool
— TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})
أنشئ WorkerPool
من متجه أو نطاق من معرفات العمال.
أمثلة
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
Distributed.CachingPool
— TypeCachingPool(workers::Vector{Int})
تنفيذ لـ AbstractWorkerPool
. remote
، remotecall_fetch
، pmap
(وغيرها من المكالمات البعيدة التي تنفذ الدوال عن بُعد) تستفيد من تخزين الدوال المرسلة/المسترجعة على عقد العمال، خاصة الإغلاقات (التي قد تلتقط كميات كبيرة من البيانات).
يتم الحفاظ على ذاكرة التخزين المؤقت البعيدة طوال فترة حياة كائن CachingPool
المعاد. لمسح الذاكرة المؤقتة في وقت مبكر، استخدم clear!(pool)
.
بالنسبة للمتغيرات العالمية، يتم التقاط الروابط فقط في إغلاق، وليس البيانات. يمكن استخدام كتل let
لالتقاط البيانات العالمية.
أمثلة
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
end
سيتم نقل foo
أعلاه مرة واحدة فقط إلى كل عامل.
Distributed.default_worker_pool
— Functiondefault_worker_pool()
AbstractWorkerPool
تحتوي على workers
غير المستخدمة - تُستخدم بواسطة remote(f)
و pmap
(بشكل افتراضي). ما لم يتم تعيين واحدة بشكل صريح عبر default_worker_pool!(pool)
, يتم تهيئة مجموعة العمال الافتراضية إلى WorkerPool
.
أمثلة
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
Distributed.clear!
— Functionclear!(syms, pids=workers(); mod=Main)
يمسح الربط العالمي في الوحدات عن طريق تهيئتها إلى nothing
. يجب أن يكون syms
من نوع Symbol
أو مجموعة من Symbol
s. تحدد pids
و mod
العمليات والوحدة التي يجب إعادة تهيئة المتغيرات العالمية فيها. يتم مسح الأسماء التي تم العثور عليها فقط والتي تم تعريفها تحت mod
.
يتم رفع استثناء إذا تم طلب مسح ثابت عالمي.
clear!(pool::CachingPool) -> pool
يُزيل جميع الدوال المخزنة من جميع العمال المشاركين.
Distributed.remote
— Functionremote([p::AbstractWorkerPool], f) -> Function
إرجاع دالة غير مسماة تنفذ الدالة f
على عامل متاح (مأخوذ من WorkerPool
p
إذا تم توفيره) باستخدام remotecall_fetch
.
Distributed.remotecall
— Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
نسخة من remotecall(f, pid, ....)
. انتظر وخذ عاملًا مجانيًا من pool
وقم بتنفيذ remotecall
عليه.
أمثلة
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)
في هذا المثال، تم تنفيذ المهمة على pid 2، وتم استدعاؤها من pid 1.
Distributed.remotecall_wait
— Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future
WorkerPool
نسخة من remotecall_wait(f, pid, ....)
. انتظر وخذ عاملًا مجانيًا من pool
وقم بتنفيذ remotecall_wait
عليه.
أمثلة
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958
Distributed.remotecall_fetch
— Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result
WorkerPool
نسخة من remotecall_fetch(f, pid, ....)
. تنتظر وتأخذ عاملاً مجانياً من pool
وتقوم بتنفيذ remotecall_fetch
عليه.
أمثلة
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
Distributed.remote_do
— Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing
نسخة WorkerPool
من remote_do(f, pid, ....)
. انتظر واحصل على عامل مجاني من pool
وقم بتنفيذ remote_do
عليه.
Distributed.@spawn
— Macro@spawn expr
قم بإنشاء إغلاق حول تعبير وتشغيله على عملية يتم اختيارها تلقائيًا، مع إرجاع Future
للنتيجة. هذه الماكرو قديمة؛ يجب استخدام @spawnat :any expr
بدلاً من ذلك.
أمثلة
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
اعتبارًا من Julia 1.3، هذه الماكرو قديمة. استخدم @spawnat :any
بدلاً من ذلك.
Distributed.@spawnat
— Macro@spawnat p expr
قم بإنشاء إغلاق حول تعبير وتشغيل الإغلاق بشكل غير متزامن على العملية p
. أعد Future
للنتيجة. إذا كانت p
هي الرمز الحرفي المقتبس :any
، فسيقوم النظام باختيار معالج للاستخدام تلقائيًا.
أمثلة
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3
الوسيطة :any
متاحة اعتبارًا من Julia 1.3.
Distributed.@fetch
— Macro@fetch expr
يعادل fetch(@spawnat :any expr)
. انظر fetch
و @spawnat
.
أمثلة
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2
Distributed.@fetchfrom
— Macro@fetchfrom
يعادل fetch(@spawnat p expr)
. انظر fetch
و @spawnat
.
أمثلة
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4
Distributed.@distributed
— Macro@distributed
حلقة for متوازية في الذاكرة الموزعة بالشكل:
@distributed [reducer] for var = range
body
end
يتم تقسيم النطاق المحدد وتنفيذه محليًا عبر جميع العمال. في حالة تحديد دالة مختزلة اختيارية، يقوم @distributed
بإجراء اختزالات محلية على كل عامل مع اختزال نهائي على العملية المستدعية.
لاحظ أنه بدون دالة مختزلة، يقوم @distributed
بالتنفيذ بشكل غير متزامن، أي أنه يطلق مهام مستقلة على جميع العمال المتاحين ويعود على الفور دون الانتظار للاكتمال. للانتظار حتى الاكتمال، قم بإضافة بادئة للنداء بـ @sync
، مثل:
@sync @distributed for var = range
body
end
Distributed.@everywhere
— Macro@everywhere [procs()] expr
نفذ تعبيرًا تحت Main
على جميع procs
. يتم جمع الأخطاء في أي من العمليات في CompositeException
وإلقاؤها. على سبيل المثال:
@everywhere bar = 1
سيقوم بتعريف Main.bar
على جميع العمليات الحالية. أي عمليات تضاف لاحقًا (مثل addprocs()
) لن يكون لديها التعبير معرف.
على عكس @spawnat
، فإن @everywhere
لا تلتقط أي متغيرات محلية. بدلاً من ذلك، يمكن بث المتغيرات المحلية باستخدام الاستيفاء:
foo = 1
@everywhere bar = $foo
تسمح الحجة الاختيارية procs
بتحديد مجموعة فرعية من جميع العمليات لتنفيذ التعبير.
مماثل لاستدعاء remotecall_eval(Main, procs, expr)
، ولكن مع ميزتين إضافيتين:
- يتم تنفيذ عبارات `using` و `import` على عملية الاستدعاء أولاً، لضمان
تجميع الحزم مسبقًا.
- يتم تمرير مسار ملف المصدر الحالي المستخدم بواسطة `include` إلى العمليات الأخرى.
Distributed.remoteref_id
— Functionremoteref_id(r::AbstractRemoteRef) -> RRID
Future
s و RemoteChannel
s يتم التعرف عليها من خلال الحقول:
where
- تشير إلى العقدة التي يوجد فيها الكائن/التخزين الأساسي المشار إليه بواسطة المرجع.whence
- تشير إلى العقدة التي تم إنشاء المرجع البعيد منها. لاحظ أن هذا يختلف عن العقدة التي يوجد فيها الكائن الأساسي المشار إليه. على سبيل المثال، استدعاءRemoteChannel(2)
من عملية الماستر سيؤدي إلى قيمةwhere
تساوي 2 وقيمةwhence
تساوي 1.id
فريد عبر جميع المراجع التي تم إنشاؤها من العامل المحدد بواسطةwhence
.
معًا، whence
و id
يحددان مرجعًا بشكل فريد عبر جميع العمال.
remoteref_id
هو واجهة برمجة تطبيقات منخفضة المستوى تعيد كائن RRID
الذي يلتف حول قيم whence
و id
لمرجع بعيد.
Distributed.channel_from_id
— Functionchannel_from_id(id) -> c
واجهة برمجة تطبيقات منخفضة المستوى تعيد AbstractChannel
الداعمة لـ id
الذي تم إرجاعه بواسطة remoteref_id
. الاستدعاء صالح فقط على العقدة حيث توجد القناة الداعمة.
Distributed.worker_id_from_socket
— Functionworker_id_from_socket(s) -> pid
واجهة برمجة تطبيقات منخفضة المستوى، والتي، عند إعطائها اتصال IO
أو Worker
، تعيد pid
للعامل المتصل به. هذا مفيد عند كتابة طرق serialize
مخصصة لنوع ما، والتي تعمل على تحسين البيانات المكتوبة بناءً على معرف عملية الاستقبال.
Distributed.cluster_cookie
— Methodcluster_cookie() -> cookie
إرجاع ملف تعريف الارتباط العنقودي.
Distributed.cluster_cookie
— Methodcluster_cookie(cookie) -> cookie
قم بتعيين الكوكي الممرر ككوكي العنقود، ثم أعده.
Cluster Manager Interface
تقدم هذه الواجهة آلية لإطلاق وإدارة عمال جوليا في بيئات الكلاستر المختلفة. هناك نوعان من المديرين موجودين في Base: LocalManager
، لإطلاق عمال إضافيين على نفس المضيف، و SSHManager
، لإطلاقهم على مضيفين بعيدين عبر ssh
. يتم استخدام مآخذ TCP/IP للاتصال ونقل الرسائل بين العمليات. من الممكن أن توفر مدراء الكلاستر وسيلة نقل مختلفة.
Distributed.ClusterManager
— TypeClusterManager
نوع أعلى لمديري الكتل، الذين يتحكمون في عمليات العمال ككتلة. يقوم مدراء الكتل بتنفيذ كيفية إضافة العمال وإزالتهم والتواصل معهم. SSHManager
و LocalManager
هما نوعان فرعيان من هذا.
Distributed.WorkerConfig
— TypeWorkerConfig
نوع يستخدمه ClusterManager
s للتحكم في العمال المضافين إلى مجموعاتهم. يتم استخدام بعض الحقول من قبل جميع مديري المجموعات للوصول إلى مضيف:
io
– الاتصال المستخدم للوصول إلى العامل (نوع فرعي منIO
أوNothing
)host
– عنوان المضيف (إماString
أوNothing
)port
– المنفذ على المضيف المستخدم للاتصال بالعامل (إماInt
أوNothing
)
يتم استخدام بعضها من قبل مدير المجموعة لإضافة العمال إلى مضيف تم تهيئته بالفعل:
count
– عدد العمال الذين سيتم إطلاقهم على المضيفexename
– المسار إلى الملف التنفيذي لجوليا على المضيف، الافتراضي هو"$(Sys.BINDIR)/julia"
أو"$(Sys.BINDIR)/julia-debug"
exeflags
– العلامات التي يجب استخدامها عند إطلاق جوليا عن بُعد
يتم استخدام حقل userdata
لتخزين المعلومات لكل عامل من قبل المديرين الخارجيين.
يتم استخدام بعض الحقول من قبل SSHManager
ومديرين مشابهين:
tunnel
–true
(استخدام النفق)،false
(عدم استخدام النفق)، أوnothing
(استخدام الافتراضي للمدير)multiplex
–true
(استخدام تعدد الإرسال SSH للنفق) أوfalse
forward
– خيار التوجيه المستخدم لخيار-L
من sshbind_addr
– العنوان على المضيف البعيد الذي سيتم الربط بهsshflags
– العلامات التي يجب استخدامها في إنشاء اتصال SSHmax_parallel
– الحد الأقصى لعدد العمال للاتصال به بشكل متوازي على المضيف
يتم استخدام بعض الحقول من قبل كل من LocalManager
s و SSHManager
s:
connect_at
– يحدد ما إذا كانت هذه مكالمة إعداد من عامل إلى عامل أو من سائق إلى عاملprocess
– العملية التي سيتم الاتصال بها (عادةً ما يقوم المدير بتعيين هذا أثناءaddprocs
)ospid
– معرف العملية وفقًا لنظام تشغيل المضيف، يستخدم لقطع عمليات العمالenviron
– قاموس خاص يستخدم لتخزين المعلومات المؤقتة من قبل المديرين المحليين/SSHident
– العامل كما تم التعرف عليه بواسطةClusterManager
connect_idents
– قائمة بمعرفات العمال التي يجب على العامل الاتصال بها إذا كان يستخدم طوبولوجيا مخصصةenable_threaded_blas
–true
،false
، أوnothing
، سواء لاستخدام BLAS متعدد الخيوط أم لا على العمال
Distributed.launch
— Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)
تم تنفيذها بواسطة مديري الكلاستر. يجب على كل عامل جوليا يتم إطلاقه بواسطة هذه الوظيفة أن يضيف إدخال WorkerConfig
إلى launched
ويقوم بإخطار launch_ntfy
. يجب أن تخرج الوظيفة بمجرد إطلاق جميع العمال، الذين طلبهم manager
. params
هو قاموس لجميع الوسائط الرئيسية التي تم استدعاء addprocs
بها.
Distributed.manage
— Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)
تم تنفيذها بواسطة مديري الكلاستر. يتم استدعاؤها في عملية الماستر، خلال فترة حياة العامل، مع قيم op
المناسبة:
- مع
:register
/:deregister
عند إضافة عامل / إزالة عامل من مجموعة عمال جوليا. - مع
:interrupt
عند استدعاءinterrupt(workers)
. يجب علىClusterManager
إرسال إشارة مقاطعة للعامل المناسب. - مع
:finalize
لأغراض التنظيف.
Base.kill
— Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)
يتم تنفيذه بواسطة مديري الكلاستر. يتم استدعاؤه على عملية الماستر، بواسطة rmprocs
. يجب أن يتسبب في خروج العامل البعيد المحدد بواسطة pid
. يقوم kill(manager::ClusterManager.....)
بتنفيذ exit()
عن بُعد على pid
.
Sockets.connect
— Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)
تم تنفيذها بواسطة مديري الكلاستر باستخدام وسائل النقل المخصصة. يجب أن تؤسس اتصالًا منطقيًا بالعمالة ذات المعرف pid
، المحدد بواسطة config
وتعيد زوجًا من كائنات IO
. سيتم قراءة الرسائل من pid
إلى العملية الحالية من instrm
، بينما سيتم كتابة الرسائل المرسلة إلى pid
إلى outstrm
. يجب على تنفيذ وسائل النقل المخصصة ضمان تسليم الرسائل واستلامها بالكامل وبالترتيب. connect(manager::ClusterManager.....)
يقوم بإعداد اتصالات مقبس TCP/IP بين العمال.
Distributed.init_worker
— Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())
يتم استدعاؤه بواسطة مديري الكلاستر الذين ينفذون وسائل النقل المخصصة. يقوم بتهيئة عملية جديدة تم إطلاقها كعامل. حجة سطر الأوامر --worker[=<cookie>]
لها تأثير تهيئة عملية كعامل باستخدام مآخذ TCP/IP للنقل. cookie
هو cluster_cookie
.
Distributed.start_worker
— Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)
start_worker
هي دالة داخلية وهي نقطة الدخول الافتراضية لعمليات العمال المتصلة عبر TCP/IP. تقوم بإعداد العملية كعامل في مجموعة جوليا.
معلومات المضيف:المنفذ تُكتب إلى التدفق out
(الذي يكون افتراضيًا stdout).
تقرأ الدالة الكوكي من stdin إذا لزم الأمر، وتستمع على منفذ مجاني (أو إذا تم تحديده، المنفذ في خيار سطر الأوامر --bind-to
) وتجدول المهام لمعالجة اتصالات TCP الواردة والطلبات. كما أنها (اختياريًا) تغلق stdin وتعيد توجيه stderr إلى stdout.
لا تُرجع أي قيمة.
Distributed.process_messages
— Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)
يتم استدعاؤه بواسطة مديري الكلاستر باستخدام وسائل النقل المخصصة. يجب استدعاؤه عندما تتلقى تنفيذ وسيلة النقل المخصصة الرسالة الأولى من عامل بعيد. يجب على وسيلة النقل المخصصة إدارة اتصال منطقي بالعامل البعيد وتوفير كائنين من نوع IO
، واحد للرسائل الواردة والآخر للرسائل الموجهة إلى العامل البعيد. إذا كان incoming
هو true
، فإن النظير البعيد هو الذي بدأ الاتصال. أي من الزوجين يبدأ الاتصال يرسل ملف تعريف الكلاستر ورقم إصدار جوليا الخاص به لإجراء مصافحة المصادقة.
انظر أيضًا cluster_cookie
.
Distributed.default_addprocs_params
— Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}
تم تنفيذها بواسطة مديري المجموعات. معلمات الكلمات الرئيسية الافتراضية المرسلة عند استدعاء addprocs(mgr)
. تتوفر مجموعة الخيارات الدنيا عن طريق استدعاء default_addprocs_params()