Distributed Computing
Distributed — Moduleأدوات المعالجة المتوازية الموزعة.
Distributed.addprocs — Functionaddprocs(manager::ClusterManager; kwargs...) -> قائمة معرفات العملياتيبدأ عمليات العمل عبر مدير الكتلة المحدد.
على سبيل المثال، يتم دعم مجموعات Beowulf عبر مدير كتلة مخصص تم تنفيذه في الحزمة ClusterManagers.jl.
يمكن تحديد عدد الثواني التي تنتظر فيها عملية العمل الجديدة لإنشاء اتصال من الماستر عبر المتغير JULIA_WORKER_TIMEOUT في بيئة عملية العمل. ذات صلة فقط عند استخدام TCP/IP كوسيلة نقل.
لإطلاق العمال دون حظر REPL، أو الوظيفة المحتوية إذا تم إطلاق العمال برمجيًا، نفذ addprocs في مهمته الخاصة.
أمثلة
# في المجموعات المزدحمة، استدع `addprocs` بشكل غير متزامن
t = @async addprocs(...)# استخدم العمال عند قدومهم عبر الإنترنت
if nprocs() > 1 # تأكد من توفر عامل جديد واحد على الأقل
.... # تنفيذ توزيع
end# استرجاع معرفات العمال الجدد، أو أي رسائل خطأ
if istaskdone(t) # تحقق مما إذا كانت `addprocs` قد اكتملت لضمان عدم حظر `fetch`
if nworkers() == N
new_pids = fetch(t)
else
fetch(t)
end
endaddprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> قائمة معرفات العملياتأضف عمليات عامل على آلات بعيدة عبر SSH. يتم تكوين ذلك باستخدام معلمات الكلمات الرئيسية (انظر أدناه). على وجه الخصوص، يمكن استخدام معلمة exename لتحديد مسار ثنائي julia على الآلة (الآلات) البعيدة.
machines هو متجه من "مواصفات الآلات" التي تُعطى كسلاسل من الشكل [user@]host[:port] [bind_addr[:port]]. المستخدم الافتراضي هو المستخدم الحالي وport هو منفذ SSH القياسي. إذا تم تحديد [bind_addr[:port]]، ستتصل العمال الآخرون بهذا العامل عند bind_addr وport المحددين.
من الممكن إطلاق عمليات متعددة على مضيف بعيد باستخدام مجموعة في متجه machines أو الشكل (machine_spec, count)، حيث count هو عدد العمال الذين سيتم إطلاقهم على المضيف المحدد. تمرير :auto كعدد العمال سيطلق عددًا من العمال يساوي عدد خيوط المعالجة على المضيف البعيد.
أمثلة:
addprocs([
"remote1", # عامل واحد على 'remote1' يسجل الدخول باستخدام اسم المستخدم الحالي
"user@remote2", # عامل واحد على 'remote2' يسجل الدخول باستخدام اسم المستخدم 'user'
"user@remote3:2222", # تحديد منفذ SSH إلى '2222' لـ 'remote3'
("user@remote4", 4), # إطلاق 4 عمال على 'remote4'
("user@remote5", :auto), # إطلاق عدد من العمال يساوي عدد خيوط المعالجة على 'remote5'
])معلمات الكلمات الرئيسية:
tunnel: إذا كانتtrueفسيتم استخدام نفق SSH للاتصال بالعامل من عملية الماستر. الافتراضي هوfalse.multiplex: إذا كانتtrueفسيتم استخدام تعدد الإرسال عبر SSH لنفق SSH. الافتراضي هوfalse.ssh: اسم أو مسار عميل SSH القابل للتنفيذ المستخدم لبدء العمال. الافتراضي هو"ssh".sshflags: يحدد خيارات SSH إضافية، مثلsshflags=`-i /home/foo/bar.pem`max_parallel: يحدد الحد الأقصى لعدد العمال المتصلين في وقت واحد على مضيف. الافتراضي هو 10.shell: يحدد نوع الصدفة التي يتصل بها SSH بالعمال.shell=:posix: صدفة Unix/Linux متوافقة مع POSIX (sh، ksh، bash، dash، zsh، إلخ). الافتراضي.shell=:csh: صدفة C Unix (csh، tcsh).shell=:wincmd: Microsoft Windowscmd.exe.
dir: يحدد دليل العمل على العمال. الافتراضي هو الدليل الحالي للمضيف (كما هو موجود بواسطةpwd())enable_threaded_blas: إذا كانتtrueفسيتم تشغيل BLAS على خيوط متعددة في العمليات المضافة. الافتراضي هوfalse.exename: اسم القابل للتنفيذjulia. الافتراضي هو"$(Sys.BINDIR)/julia"أو"$(Sys.BINDIR)/julia-debug"حسب الحالة. يُوصى باستخدام إصدار شائع من Julia على جميع الآلات البعيدة لأن التسلسل والتوزيع قد يفشل خلاف ذلك.exeflags: علامات إضافية تمرر إلى عمليات العمال.topology: يحدد كيفية اتصال العمال ببعضهم البعض. إرسال رسالة بين العمال غير المتصلين يؤدي إلى حدوث خطأ.topology=:all_to_all: جميع العمليات متصلة ببعضها البعض. الافتراضي.topology=:master_worker: فقط عملية السائق، أيpid1 تتصل بالعمال. لا تتصل العمال ببعضها البعض.topology=:custom: تحدد طريقةlaunchلمدير الكتلة طوبولوجيا الاتصال عبر الحقولidentوconnect_identsفيWorkerConfig. سيتصل عامل له هوية مدير الكتلةidentبجميع العمال المحددين فيconnect_idents.
lazy: ينطبق فقط معtopology=:all_to_all. إذا كانتtrue، يتم إعداد اتصالات العامل-العامل بشكل كسول، أي يتم إعدادها عند أول حالة من استدعاء بعيد بين العمال. الافتراضي هو true.env: قدم مصفوفة من أزواج السلاسل مثلenv=["JULIA_DEPOT_PATH"=>"/depot"]لطلب تعيين متغيرات البيئة على الآلة البعيدة. بشكل افتراضي، يتم تمرير متغير البيئةJULIA_WORKER_TIMEOUTتلقائيًا من البيئة المحلية إلى البيئة البعيدة.cmdline_cookie: تمرير ملف تعريف المصادقة عبر خيار سطر الأوامر--worker. قد يؤدي السلوك الافتراضي (الأكثر أمانًا) المتمثل في تمرير ملف تعريف عبر SSH stdio إلى تعليق مع العمال على Windows الذين يستخدمون إصدارات أقدم (قبل ConPTY) من Julia أو Windows، وفي هذه الحالة يقدمcmdline_cookie=trueحلاً بديلًا.
تمت إضافة معلمات الكلمات الرئيسية ssh و shell و env و cmdline_cookie في Julia 1.6.
متغيرات البيئة:
إذا فشلت عملية الماستر في إنشاء اتصال مع عامل تم إطلاقه حديثًا خلال 60.0 ثانية، فإن العامل يعتبر ذلك وضعًا مميتًا وينهي نفسه. يمكن التحكم في هذا المهلة عبر متغير البيئة JULIA_WORKER_TIMEOUT. تحدد قيمة JULIA_WORKER_TIMEOUT على عملية الماستر عدد الثواني التي ينتظرها العامل الذي تم إطلاقه حديثًا لإنشاء الاتصال.
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> قائمة معرفات العملياتقم بإطلاق عمال np على المضيف المحلي باستخدام LocalManager المدمج.
ترث العمال المحليون بيئة الحزمة الحالية (أي المشروع النشط، LOAD_PATH، وDEPOT_PATH) من العملية الرئيسية.
!!! تحذير لاحظ أن العمال لا يقومون بتشغيل نص بدء التشغيل ~/.julia/config/startup.jl، ولا يقومون بمزامنة حالتهم العالمية (مثل مفاتيح سطر الأوامر، المتغيرات العالمية، تعريفات الطرق الجديدة، والوحدات المحملة) مع أي من العمليات الأخرى التي تعمل.
وسائط الكلمات الرئيسية:
restrict::Bool: إذا كانتtrue(افتراضي) فإن الربط مقيد إلى127.0.0.1.dir،exename،exeflags،env،topology،lazy،enable_threaded_blas: نفس التأثير كما هو الحال فيSSHManager، انظر الوثائق لـaddprocs(machines::AbstractVector).
!!! توافق "جوليا 1.9" تم إضافة وراثة بيئة الحزمة ووسيط الكلمة الرئيسية env في جوليا 1.9.
Distributed.nprocs — Functionnprocs()احصل على عدد العمليات المتاحة.
أمثلة
julia> nprocs()
3
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.nworkers — Functionnworkers()احصل على عدد عمليات العمال المتاحة. هذا أقل بواحد من nprocs(). يساوي nprocs() إذا كان nprocs() == 1.
أمثلة
$ julia -p 2
julia> nprocs()
3
julia> nworkers()
2Distributed.procs — Methodprocs()إرجاع قائمة بجميع معرفات العمليات، بما في ذلك pid 1 (الذي لا يتم تضمينه بواسطة workers()).
أمثلة
$ julia -p 2
julia> procs()
3-element Array{Int64,1}:
1
2
3Distributed.procs — Methodprocs(pid::Integer)إرجاع قائمة بجميع معرفات العمليات على نفس العقدة الفيزيائية. على وجه التحديد، يتم إرجاع جميع العمال المرتبطين بنفس عنوان ip مثل pid.
Distributed.workers — Functionworkers()إرجاع قائمة بجميع معرفات عمليات العمال.
أمثلة
$ julia -p 2
julia> workers()
2-element Array{Int64,1}:
2
3Distributed.rmprocs — Functionrmprocs(pids...; waitfor=typemax(Int))قم بإزالة العمال المحددين. لاحظ أن العملية 1 فقط يمكنها إضافة أو إزالة العمال.
المعلمة waitfor تحدد المدة التي يجب الانتظار فيها حتى يتم إيقاف تشغيل العمال:
- إذا لم يتم تحديدها، ستنتظر
rmprocsحتى يتم إزالة جميعpidsالمطلوبة. - يتم رفع
ErrorExceptionإذا لم يكن من الممكن إنهاء جميع العمال قبل انتهاء ثوانيwaitforالمطلوبة. - مع قيمة
waitforتساوي 0، تعود المكالمة على الفور مع جدولة العمال للإزالة في مهمة مختلفة. يتم إرجاع كائنTaskالمجدول. يجب على المستخدم استدعاءwaitعلى المهمة قبل استدعاء أي مكالمات متوازية أخرى.
أمثلة
$ julia -p 5
julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0
julia> wait(t)
julia> workers()
3-element Array{Int64,1}:
4
5
6Distributed.interrupt — Functioninterrupt(pids::Integer...)قم بإيقاف المهمة الحالية التي يتم تنفيذها على العمال المحددين. هذا يعادل الضغط على Ctrl-C على الجهاز المحلي. إذا لم يتم إعطاء أي وسائط، يتم إيقاف جميع العمال.
interrupt(pids::AbstractVector=workers())قم بإيقاف المهمة الحالية التي يتم تنفيذها على العمال المحددين. هذا يعادل الضغط على Ctrl-C على الجهاز المحلي. إذا لم يتم إعطاء أي معطيات، يتم إيقاف جميع العمال.
Distributed.myid — Functionmyid()احصل على معرف العملية الحالية.
أمثلة
julia> myid()
1
julia> remotecall_fetch(() -> myid(), 4)
4Distributed.pmap — Functionpmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collectionقم بتحويل المجموعة c عن طريق تطبيق f على كل عنصر باستخدام العمال والمهام المتاحة.
بالنسبة لعدة معطيات من المجموعات، يتم تطبيق f عنصرًا بعنصر.
لاحظ أنه يجب جعل f متاحًا لجميع عمليات العمال؛ انظر Code Availability and Loading Packages للحصول على التفاصيل.
إذا لم يتم تحديد مجموعة العمال، فسيتم استخدام جميع العمال المتاحين عبر CachingPool.
بشكل افتراضي، يقوم pmap بتوزيع الحساب على جميع العمال المحددين. لاستخدام العملية المحلية فقط وتوزيعها على المهام، حدد distributed=false. هذا يعادل استخدام asyncmap. على سبيل المثال، pmap(f, c; distributed=false) يعادل asyncmap(f,c; ntasks=()->nworkers())
يمكن أيضًا استخدام pmap مزيجًا من العمليات والمهام عبر معطى batch_size. بالنسبة لأحجام الدفعات الأكبر من 1، تتم معالجة المجموعة في دفعات متعددة، كل منها بطول batch_size أو أقل. يتم إرسال دفعة كطلب واحد إلى عامل مجاني، حيث تقوم asyncmap المحلية بمعالجة العناصر من الدفعة باستخدام مهام متزامنة متعددة.
أي خطأ يوقف pmap عن معالجة بقية المجموعة. لتجاوز هذا السلوك، يمكنك تحديد دالة معالجة الأخطاء عبر المعطى on_error التي تأخذ في اعتبارها معطى واحد، أي الاستثناء. يمكن للدالة إيقاف المعالجة عن طريق إعادة طرح الخطأ، أو، للاستمرار، إرجاع أي قيمة يتم إرجاعها بعد ذلك مع النتائج إلى المتصل.
اعتبر المثالين التاليين. الأول يعيد كائن الاستثناء في السطر، والثاني 0 بدلاً من أي استثناء:
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
1
ErrorException("foo")
3
ErrorException("foo")
julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
1
0
3
0يمكن أيضًا معالجة الأخطاء عن طريق إعادة محاولة الحسابات الفاشلة. يتم تمرير المعطيات الرئيسية retry_delays و retry_check إلى retry كمعطيات رئيسية delays و check على التوالي. إذا تم تحديد التجميع، وإذا فشلت دفعة كاملة، يتم إعادة محاولة جميع العناصر في الدفعة.
لاحظ أنه إذا تم تحديد كل من on_error و retry_delays، يتم استدعاء دالة on_error قبل إعادة المحاولة. إذا لم تقم on_error بإلقاء (أو إعادة إلقاء) استثناء، فلن يتم إعادة محاولة العنصر.
مثال: عند حدوث الأخطاء، أعد محاولة f على عنصر بحد أقصى 3 مرات دون أي تأخير بين المحاولات.
pmap(f, c; retry_delays = zeros(3))مثال: أعد محاولة f فقط إذا لم يكن الاستثناء من نوع InexactError، مع تأخيرات متزايدة بشكل أسي تصل إلى 3 مرات. أعد قيمة NaN في مكان جميع حالات InexactError.
pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))Distributed.RemoteException — TypeRemoteException(captured)يتم التقاط الاستثناءات في الحسابات عن بُعد وإعادة طرحها محليًا. يقوم RemoteException بلف pid للعامل واستثناء تم التقاطه. يقوم CapturedException بالتقاط الاستثناء عن بُعد وشكل قابل للتسلسل من مكدس الاستدعاءات عندما تم رفع الاستثناء.
Distributed.ProcessExitedException — TypeProcessExitedException(worker_id::Int)بعد أن تخرج عملية جوليا العميل، ستؤدي المحاولات الإضافية للإشارة إلى الطفل الميت إلى إلقاء هذا الاستثناء.
Distributed.Future — TypeFuture(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)Future هو عنصر نائب لحساب واحد بحالة إنهاء غير معروفة ووقت غير محدد. للحسابات المحتملة المتعددة، انظر RemoteChannel. انظر remoteref_id لتحديد AbstractRemoteRef.
Distributed.RemoteChannel — TypeRemoteChannel(pid::Integer=myid())قم بإنشاء مرجع إلى Channel{Any}(1) على العملية pid. pid الافتراضي هو العملية الحالية.
RemoteChannel(f::Function, pid::Integer=myid())قم بإنشاء مراجع لقنوات بعيدة من حجم ونوع محددين. f هو دالة عند تنفيذها على pid يجب أن تعيد تنفيذًا لـ AbstractChannel.
على سبيل المثال، RemoteChannel(()->Channel{Int}(10), pid)، ستعيد مرجعًا إلى قناة من النوع Int وحجم 10 على pid.
pid الافتراضي هو العملية الحالية.
Base.fetch — Methodfetch(x::Future)انتظر واحصل على قيمة Future. القيمة المسترجعة مخزنة محليًا. المكالمات الإضافية إلى fetch على نفس المرجع تعيد القيمة المخزنة. إذا كانت القيمة البعيدة استثناءً، يتم إلقاء RemoteException التي تلتقط الاستثناء البعيد وتتبع الخطأ.
Base.fetch — Methodfetch(c::RemoteChannel)انتظر واحصل على قيمة من RemoteChannel. الاستثناءات التي تثار هي نفسها كما في Future. لا يزيل العنصر الذي تم جلبه.
fetch(x::Any)إرجاع x.
Distributed.remotecall — Methodremotecall(f, id::Integer, args...; kwargs...) -> Futureاستدعاء دالة f بشكل غير متزامن مع المعطيات المعطاة على العملية المحددة. يُرجع Future. يتم تمرير الوسائط الرئيسية، إن وجدت، إلى f.
Distributed.remotecall_wait — Methodremotecall_wait(f, id::Integer, args...; kwargs...)قم بتنفيذ wait(remotecall(...)) بشكل أسرع في رسالة واحدة على Worker المحدد بواسطة معرف العامل id. يتم تمرير الوسائط الرئيسية، إن وجدت، إلى f.
انظر أيضًا wait و remotecall.
Distributed.remotecall_fetch — Methodremotecall_fetch(f, id::Integer, args...; kwargs...)قم بتنفيذ fetch(remotecall(...)) في رسالة واحدة. يتم تمرير أي معلمات مفتاحية، إن وجدت، إلى f. يتم التقاط أي استثناءات عن بُعد في RemoteException وإلقاؤها.
انظر أيضًا fetch و remotecall.
أمثلة
$ julia -p 2
julia> remotecall_fetch(sqrt, 2, 4)
2.0
julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...Distributed.remote_do — Methodremote_do(f, id::Integer, args...; kwargs...) -> nothingينفذ f على العامل id بشكل غير متزامن. على عكس remotecall، فإنه لا يخزن نتيجة الحساب، ولا توجد طريقة للانتظار حتى اكتماله.
تشير الدعوة الناجحة إلى أن الطلب قد تم قبوله للتنفيذ على العقدة البعيدة.
بينما يتم تسلسل remotecalls المتتالية إلى نفس العامل بالترتيب الذي تم استدعاؤها به، فإن ترتيب التنفيذ على العامل البعيد غير محدد. على سبيل المثال، remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) ستسلسل الاستدعاء إلى f1، يليه f2 و f3 بهذا الترتيب. ومع ذلك، لا يضمن أن يتم تنفيذ f1 قبل f3 على العامل 2.
أي استثناءات يتم طرحها بواسطة f يتم طباعتها إلى stderr على العامل البعيد.
تُمرر الوسائط الرئيسية، إن وجدت، إلى f.
Base.put! — Methodput!(rr::RemoteChannel, args...)قم بتخزين مجموعة من القيم في RemoteChannel. إذا كانت القناة ممتلئة، يتم حظرها حتى يتوفر مساحة. ارجع الحجة الأولى.
Base.put! — Methodput!(rr::Future, v)قم بتخزين قيمة في Future rr. تعتبر Futures مراجع بعيدة للكتابة مرة واحدة. يؤدي استخدام put! على Future تم تعيينه بالفعل إلى رمي Exception. جميع المكالمات البعيدة غير المتزامنة تعيد Futures وتقوم بتعيين القيمة إلى قيمة الإرجاع للمكالمة عند الانتهاء.
Base.take! — Methodtake!(rr::RemoteChannel, args...)استرجع القيمة (القيم) من RemoteChannel rr، مع إزالة القيمة (القيم) في هذه العملية.
Base.isready — Methodisready(rr::RemoteChannel, args...)تحديد ما إذا كان RemoteChannel يحتوي على قيمة مخزنة فيه. لاحظ أن هذه الدالة يمكن أن تسبب حالات سباق، حيث أنه بحلول الوقت الذي تتلقى فيه نتيجتها قد لا يكون ذلك صحيحًا بعد الآن. ومع ذلك، يمكن استخدامها بأمان على Future لأنها تُعين مرة واحدة فقط.
Base.isready — Methodisready(rr::Future)تحديد ما إذا كان Future يحتوي على قيمة مخزنة فيه.
إذا كانت الحجة Future مملوكة من قبل عقدة مختلفة، ستقوم هذه المكالمة بالانتظار للحصول على الإجابة. يُوصى بالانتظار لـ rr في مهمة منفصلة بدلاً من ذلك أو استخدام Channel محلي كوكيل:
p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f) # لن يتم حظرهDistributed.AbstractWorkerPool — TypeAbstractWorkerPoolنوع أساسي لمجموعات العمال مثل WorkerPool و CachingPool. يجب على AbstractWorkerPool تنفيذ:
push!- إضافة عامل جديد إلى المجموعة العامة (متاح + مشغول)put!- إعادة عامل إلى المجموعة المتاحةtake!- أخذ عامل من المجموعة المتاحة (لاستخدامه في تنفيذ الدوال عن بُعد)length- عدد العمال المتاحين في المجموعة العامةisready- إرجاع false إذا كانtake!على المجموعة سيؤدي إلى حظر، وإلا true
تتطلب التنفيذات الافتراضية لما سبق (على AbstractWorkerPool) حقول
channel::Channel{Int}workers::Set{Int}
حيث يحتوي channel على معرفات العمليات الحرة للعمال و workers هو مجموعة جميع العمال المرتبطين بهذه المجموعة.
Distributed.WorkerPool — TypeWorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})أنشئ WorkerPool من متجه أو نطاق من معرفات العمال.
أمثلة
$ julia -p 3
julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))
julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))Distributed.CachingPool — TypeCachingPool(workers::Vector{Int})تنفيذ لـ AbstractWorkerPool. remote، remotecall_fetch، pmap (وغيرها من المكالمات البعيدة التي تنفذ الدوال عن بُعد) تستفيد من تخزين الدوال المرسلة/المسترجعة على عقد العمال، خاصة الإغلاقات (التي قد تلتقط كميات كبيرة من البيانات).
يتم الحفاظ على ذاكرة التخزين المؤقت البعيدة طوال فترة حياة كائن CachingPool المعاد. لمسح الذاكرة المؤقتة في وقت مبكر، استخدم clear!(pool).
بالنسبة للمتغيرات العالمية، يتم التقاط الروابط فقط في إغلاق، وليس البيانات. يمكن استخدام كتل let لالتقاط البيانات العالمية.
أمثلة
const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
pmap(i -> sum(foo) + i, wp, 1:100);
endسيتم نقل foo أعلاه مرة واحدة فقط إلى كل عامل.
Distributed.default_worker_pool — Functiondefault_worker_pool()AbstractWorkerPool تحتوي على workers غير المستخدمة - تُستخدم بواسطة remote(f) و pmap (بشكل افتراضي). ما لم يتم تعيين واحدة بشكل صريح عبر default_worker_pool!(pool), يتم تهيئة مجموعة العمال الافتراضية إلى WorkerPool.
أمثلة
$ julia -p 3
julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))Distributed.clear! — Functionclear!(syms, pids=workers(); mod=Main)يمسح الربط العالمي في الوحدات عن طريق تهيئتها إلى nothing. يجب أن يكون syms من نوع Symbol أو مجموعة من Symbols. تحدد pids و mod العمليات والوحدة التي يجب إعادة تهيئة المتغيرات العالمية فيها. يتم مسح الأسماء التي تم العثور عليها فقط والتي تم تعريفها تحت mod.
يتم رفع استثناء إذا تم طلب مسح ثابت عالمي.
clear!(pool::CachingPool) -> poolيُزيل جميع الدوال المخزنة من جميع العمال المشاركين.
Distributed.remote — Functionremote([p::AbstractWorkerPool], f) -> Functionإرجاع دالة غير مسماة تنفذ الدالة f على عامل متاح (مأخوذ من WorkerPool p إذا تم توفيره) باستخدام remotecall_fetch.
Distributed.remotecall — Methodremotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool نسخة من remotecall(f, pid, ....). انتظر وخذ عاملًا مجانيًا من pool وقم بتنفيذ remotecall عليه.
أمثلة
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)في هذا المثال، تم تنفيذ المهمة على pid 2، وتم استدعاؤها من pid 1.
Distributed.remotecall_wait — Methodremotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> FutureWorkerPool نسخة من remotecall_wait(f, pid, ....). انتظر وخذ عاملًا مجانيًا من pool وقم بتنفيذ remotecall_wait عليه.
أمثلة
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)
julia> fetch(f)
0.9995177101692958Distributed.remotecall_fetch — Methodremotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> resultWorkerPool نسخة من remotecall_fetch(f, pid, ....). تنتظر وتأخذ عاملاً مجانياً من pool وتقوم بتنفيذ remotecall_fetch عليه.
أمثلة
$ julia -p 3
julia> wp = WorkerPool([2, 3]);
julia> A = rand(3000);
julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958Distributed.remote_do — Methodremote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothingنسخة WorkerPool من remote_do(f, pid, ....). انتظر واحصل على عامل مجاني من pool وقم بتنفيذ remote_do عليه.
Distributed.@spawn — Macro@spawn exprقم بإنشاء إغلاق حول تعبير وتشغيله على عملية يتم اختيارها تلقائيًا، مع إرجاع Future للنتيجة. هذه الماكرو قديمة؛ يجب استخدام @spawnat :any expr بدلاً من ذلك.
أمثلة
julia> addprocs(3);
julia> f = @spawn myid()
Future(2, 1, 5, nothing)
julia> fetch(f)
2
julia> f = @spawn myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3اعتبارًا من Julia 1.3، هذه الماكرو قديمة. استخدم @spawnat :any بدلاً من ذلك.
Distributed.@spawnat — Macro@spawnat p exprقم بإنشاء إغلاق حول تعبير وتشغيل الإغلاق بشكل غير متزامن على العملية p. أعد Future للنتيجة. إذا كانت p هي الرمز الحرفي المقتبس :any، فسيقوم النظام باختيار معالج للاستخدام تلقائيًا.
أمثلة
julia> addprocs(3);
julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)
julia> fetch(f)
2
julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)
julia> fetch(f)
3الوسيطة :any متاحة اعتبارًا من Julia 1.3.
Distributed.@fetch — Macro@fetch exprيعادل fetch(@spawnat :any expr). انظر fetch و @spawnat.
أمثلة
julia> addprocs(3);
julia> @fetch myid()
2
julia> @fetch myid()
3
julia> @fetch myid()
4
julia> @fetch myid()
2Distributed.@fetchfrom — Macro@fetchfromيعادل fetch(@spawnat p expr). انظر fetch و @spawnat.
أمثلة
julia> addprocs(3);
julia> @fetchfrom 2 myid()
2
julia> @fetchfrom 4 myid()
4Distributed.@distributed — Macro@distributedحلقة for متوازية في الذاكرة الموزعة بالشكل:
@distributed [reducer] for var = range
body
endيتم تقسيم النطاق المحدد وتنفيذه محليًا عبر جميع العمال. في حالة تحديد دالة مختزلة اختيارية، يقوم @distributed بإجراء اختزالات محلية على كل عامل مع اختزال نهائي على العملية المستدعية.
لاحظ أنه بدون دالة مختزلة، يقوم @distributed بالتنفيذ بشكل غير متزامن، أي أنه يطلق مهام مستقلة على جميع العمال المتاحين ويعود على الفور دون الانتظار للاكتمال. للانتظار حتى الاكتمال، قم بإضافة بادئة للنداء بـ @sync، مثل:
@sync @distributed for var = range
body
endDistributed.@everywhere — Macro@everywhere [procs()] exprنفذ تعبيرًا تحت Main على جميع procs. يتم جمع الأخطاء في أي من العمليات في CompositeException وإلقاؤها. على سبيل المثال:
@everywhere bar = 1سيقوم بتعريف Main.bar على جميع العمليات الحالية. أي عمليات تضاف لاحقًا (مثل addprocs()) لن يكون لديها التعبير معرف.
على عكس @spawnat، فإن @everywhere لا تلتقط أي متغيرات محلية. بدلاً من ذلك، يمكن بث المتغيرات المحلية باستخدام الاستيفاء:
foo = 1
@everywhere bar = $fooتسمح الحجة الاختيارية procs بتحديد مجموعة فرعية من جميع العمليات لتنفيذ التعبير.
مماثل لاستدعاء remotecall_eval(Main, procs, expr)، ولكن مع ميزتين إضافيتين:
- يتم تنفيذ عبارات `using` و `import` على عملية الاستدعاء أولاً، لضمان
تجميع الحزم مسبقًا.
- يتم تمرير مسار ملف المصدر الحالي المستخدم بواسطة `include` إلى العمليات الأخرى.Distributed.remoteref_id — Functionremoteref_id(r::AbstractRemoteRef) -> RRIDFutures و RemoteChannels يتم التعرف عليها من خلال الحقول:
where- تشير إلى العقدة التي يوجد فيها الكائن/التخزين الأساسي المشار إليه بواسطة المرجع.whence- تشير إلى العقدة التي تم إنشاء المرجع البعيد منها. لاحظ أن هذا يختلف عن العقدة التي يوجد فيها الكائن الأساسي المشار إليه. على سبيل المثال، استدعاءRemoteChannel(2)من عملية الماستر سيؤدي إلى قيمةwhereتساوي 2 وقيمةwhenceتساوي 1.idفريد عبر جميع المراجع التي تم إنشاؤها من العامل المحدد بواسطةwhence.
معًا، whence و id يحددان مرجعًا بشكل فريد عبر جميع العمال.
remoteref_id هو واجهة برمجة تطبيقات منخفضة المستوى تعيد كائن RRID الذي يلتف حول قيم whence و id لمرجع بعيد.
Distributed.channel_from_id — Functionchannel_from_id(id) -> cواجهة برمجة تطبيقات منخفضة المستوى تعيد AbstractChannel الداعمة لـ id الذي تم إرجاعه بواسطة remoteref_id. الاستدعاء صالح فقط على العقدة حيث توجد القناة الداعمة.
Distributed.worker_id_from_socket — Functionworker_id_from_socket(s) -> pidواجهة برمجة تطبيقات منخفضة المستوى، والتي، عند إعطائها اتصال IO أو Worker، تعيد pid للعامل المتصل به. هذا مفيد عند كتابة طرق serialize مخصصة لنوع ما، والتي تعمل على تحسين البيانات المكتوبة بناءً على معرف عملية الاستقبال.
Distributed.cluster_cookie — Methodcluster_cookie() -> cookieإرجاع ملف تعريف الارتباط العنقودي.
Distributed.cluster_cookie — Methodcluster_cookie(cookie) -> cookieقم بتعيين الكوكي الممرر ككوكي العنقود، ثم أعده.
Cluster Manager Interface
تقدم هذه الواجهة آلية لإطلاق وإدارة عمال جوليا في بيئات الكلاستر المختلفة. هناك نوعان من المديرين موجودين في Base: LocalManager، لإطلاق عمال إضافيين على نفس المضيف، و SSHManager، لإطلاقهم على مضيفين بعيدين عبر ssh. يتم استخدام مآخذ TCP/IP للاتصال ونقل الرسائل بين العمليات. من الممكن أن توفر مدراء الكلاستر وسيلة نقل مختلفة.
Distributed.ClusterManager — TypeClusterManagerنوع أعلى لمديري الكتل، الذين يتحكمون في عمليات العمال ككتلة. يقوم مدراء الكتل بتنفيذ كيفية إضافة العمال وإزالتهم والتواصل معهم. SSHManager و LocalManager هما نوعان فرعيان من هذا.
Distributed.WorkerConfig — TypeWorkerConfigنوع يستخدمه ClusterManagers للتحكم في العمال المضافين إلى مجموعاتهم. يتم استخدام بعض الحقول من قبل جميع مديري المجموعات للوصول إلى مضيف:
io– الاتصال المستخدم للوصول إلى العامل (نوع فرعي منIOأوNothing)host– عنوان المضيف (إماStringأوNothing)port– المنفذ على المضيف المستخدم للاتصال بالعامل (إماIntأوNothing)
يتم استخدام بعضها من قبل مدير المجموعة لإضافة العمال إلى مضيف تم تهيئته بالفعل:
count– عدد العمال الذين سيتم إطلاقهم على المضيفexename– المسار إلى الملف التنفيذي لجوليا على المضيف، الافتراضي هو"$(Sys.BINDIR)/julia"أو"$(Sys.BINDIR)/julia-debug"exeflags– العلامات التي يجب استخدامها عند إطلاق جوليا عن بُعد
يتم استخدام حقل userdata لتخزين المعلومات لكل عامل من قبل المديرين الخارجيين.
يتم استخدام بعض الحقول من قبل SSHManager ومديرين مشابهين:
tunnel–true(استخدام النفق)،false(عدم استخدام النفق)، أوnothing(استخدام الافتراضي للمدير)multiplex–true(استخدام تعدد الإرسال SSH للنفق) أوfalseforward– خيار التوجيه المستخدم لخيار-Lمن sshbind_addr– العنوان على المضيف البعيد الذي سيتم الربط بهsshflags– العلامات التي يجب استخدامها في إنشاء اتصال SSHmax_parallel– الحد الأقصى لعدد العمال للاتصال به بشكل متوازي على المضيف
يتم استخدام بعض الحقول من قبل كل من LocalManagers و SSHManagers:
connect_at– يحدد ما إذا كانت هذه مكالمة إعداد من عامل إلى عامل أو من سائق إلى عاملprocess– العملية التي سيتم الاتصال بها (عادةً ما يقوم المدير بتعيين هذا أثناءaddprocs)ospid– معرف العملية وفقًا لنظام تشغيل المضيف، يستخدم لقطع عمليات العمالenviron– قاموس خاص يستخدم لتخزين المعلومات المؤقتة من قبل المديرين المحليين/SSHident– العامل كما تم التعرف عليه بواسطةClusterManagerconnect_idents– قائمة بمعرفات العمال التي يجب على العامل الاتصال بها إذا كان يستخدم طوبولوجيا مخصصةenable_threaded_blas–true،false، أوnothing، سواء لاستخدام BLAS متعدد الخيوط أم لا على العمال
Distributed.launch — Functionlaunch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)تم تنفيذها بواسطة مديري الكلاستر. يجب على كل عامل جوليا يتم إطلاقه بواسطة هذه الوظيفة أن يضيف إدخال WorkerConfig إلى launched ويقوم بإخطار launch_ntfy. يجب أن تخرج الوظيفة بمجرد إطلاق جميع العمال، الذين طلبهم manager. params هو قاموس لجميع الوسائط الرئيسية التي تم استدعاء addprocs بها.
Distributed.manage — Functionmanage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)تم تنفيذها بواسطة مديري الكلاستر. يتم استدعاؤها في عملية الماستر، خلال فترة حياة العامل، مع قيم op المناسبة:
- مع
:register/:deregisterعند إضافة عامل / إزالة عامل من مجموعة عمال جوليا. - مع
:interruptعند استدعاءinterrupt(workers). يجب علىClusterManagerإرسال إشارة مقاطعة للعامل المناسب. - مع
:finalizeلأغراض التنظيف.
Base.kill — Methodkill(manager::ClusterManager, pid::Int, config::WorkerConfig)يتم تنفيذه بواسطة مديري الكلاستر. يتم استدعاؤه على عملية الماستر، بواسطة rmprocs. يجب أن يتسبب في خروج العامل البعيد المحدد بواسطة pid. يقوم kill(manager::ClusterManager.....) بتنفيذ exit() عن بُعد على pid.
Sockets.connect — Methodconnect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)تم تنفيذها بواسطة مديري الكلاستر باستخدام وسائل النقل المخصصة. يجب أن تؤسس اتصالًا منطقيًا بالعمالة ذات المعرف pid، المحدد بواسطة config وتعيد زوجًا من كائنات IO. سيتم قراءة الرسائل من pid إلى العملية الحالية من instrm، بينما سيتم كتابة الرسائل المرسلة إلى pid إلى outstrm. يجب على تنفيذ وسائل النقل المخصصة ضمان تسليم الرسائل واستلامها بالكامل وبالترتيب. connect(manager::ClusterManager.....) يقوم بإعداد اتصالات مقبس TCP/IP بين العمال.
Distributed.init_worker — Functioninit_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())يتم استدعاؤه بواسطة مديري الكلاستر الذين ينفذون وسائل النقل المخصصة. يقوم بتهيئة عملية جديدة تم إطلاقها كعامل. حجة سطر الأوامر --worker[=<cookie>] لها تأثير تهيئة عملية كعامل باستخدام مآخذ TCP/IP للنقل. cookie هو cluster_cookie.
Distributed.start_worker — Functionstart_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)start_worker هي دالة داخلية وهي نقطة الدخول الافتراضية لعمليات العمال المتصلة عبر TCP/IP. تقوم بإعداد العملية كعامل في مجموعة جوليا.
معلومات المضيف:المنفذ تُكتب إلى التدفق out (الذي يكون افتراضيًا stdout).
تقرأ الدالة الكوكي من stdin إذا لزم الأمر، وتستمع على منفذ مجاني (أو إذا تم تحديده، المنفذ في خيار سطر الأوامر --bind-to) وتجدول المهام لمعالجة اتصالات TCP الواردة والطلبات. كما أنها (اختياريًا) تغلق stdin وتعيد توجيه stderr إلى stdout.
لا تُرجع أي قيمة.
Distributed.process_messages — Functionprocess_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)يتم استدعاؤه بواسطة مديري الكلاستر باستخدام وسائل النقل المخصصة. يجب استدعاؤه عندما تتلقى تنفيذ وسيلة النقل المخصصة الرسالة الأولى من عامل بعيد. يجب على وسيلة النقل المخصصة إدارة اتصال منطقي بالعامل البعيد وتوفير كائنين من نوع IO، واحد للرسائل الواردة والآخر للرسائل الموجهة إلى العامل البعيد. إذا كان incoming هو true، فإن النظير البعيد هو الذي بدأ الاتصال. أي من الزوجين يبدأ الاتصال يرسل ملف تعريف الكلاستر ورقم إصدار جوليا الخاص به لإجراء مصافحة المصادقة.
انظر أيضًا cluster_cookie.
Distributed.default_addprocs_params — Functiondefault_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}تم تنفيذها بواسطة مديري المجموعات. معلمات الكلمات الرئيسية الافتراضية المرسلة عند استدعاء addprocs(mgr). تتوفر مجموعة الخيارات الدنيا عن طريق استدعاء default_addprocs_params()