Distributed Computing

DistributedModule

أدوات المعالجة المتوازية الموزعة.

source
Distributed.addprocsFunction
addprocs(manager::ClusterManager; kwargs...) -> قائمة معرفات العمليات

يبدأ عمليات العمل عبر مدير الكتلة المحدد.

على سبيل المثال، يتم دعم مجموعات Beowulf عبر مدير كتلة مخصص تم تنفيذه في الحزمة ClusterManagers.jl.

يمكن تحديد عدد الثواني التي تنتظر فيها عملية العمل الجديدة لإنشاء اتصال من الماستر عبر المتغير JULIA_WORKER_TIMEOUT في بيئة عملية العمل. ذات صلة فقط عند استخدام TCP/IP كوسيلة نقل.

لإطلاق العمال دون حظر REPL، أو الوظيفة المحتوية إذا تم إطلاق العمال برمجيًا، نفذ addprocs في مهمته الخاصة.

أمثلة

# في المجموعات المزدحمة، استدع `addprocs` بشكل غير متزامن
t = @async addprocs(...)
# استخدم العمال عند قدومهم عبر الإنترنت
if nprocs() > 1   # تأكد من توفر عامل جديد واحد على الأقل
   ....   # تنفيذ توزيع
end
# استرجاع معرفات العمال الجدد، أو أي رسائل خطأ
if istaskdone(t)   # تحقق مما إذا كانت `addprocs` قد اكتملت لضمان عدم حظر `fetch`
    if nworkers() == N
        new_pids = fetch(t)
    else
        fetch(t)
    end
end
source
addprocs(machines; tunnel=false, sshflags=``, max_parallel=10, kwargs...) -> قائمة معرفات العمليات

أضف عمليات عامل على آلات بعيدة عبر SSH. يتم تكوين ذلك باستخدام معلمات الكلمات الرئيسية (انظر أدناه). على وجه الخصوص، يمكن استخدام معلمة exename لتحديد مسار ثنائي julia على الآلة (الآلات) البعيدة.

machines هو متجه من "مواصفات الآلات" التي تُعطى كسلاسل من الشكل [user@]host[:port] [bind_addr[:port]]. المستخدم الافتراضي هو المستخدم الحالي وport هو منفذ SSH القياسي. إذا تم تحديد [bind_addr[:port]]، ستتصل العمال الآخرون بهذا العامل عند bind_addr وport المحددين.

من الممكن إطلاق عمليات متعددة على مضيف بعيد باستخدام مجموعة في متجه machines أو الشكل (machine_spec, count)، حيث count هو عدد العمال الذين سيتم إطلاقهم على المضيف المحدد. تمرير :auto كعدد العمال سيطلق عددًا من العمال يساوي عدد خيوط المعالجة على المضيف البعيد.

أمثلة:

addprocs([
    "remote1",               # عامل واحد على 'remote1' يسجل الدخول باستخدام اسم المستخدم الحالي
    "user@remote2",          # عامل واحد على 'remote2' يسجل الدخول باستخدام اسم المستخدم 'user'
    "user@remote3:2222",     # تحديد منفذ SSH إلى '2222' لـ 'remote3'
    ("user@remote4", 4),     # إطلاق 4 عمال على 'remote4'
    ("user@remote5", :auto), # إطلاق عدد من العمال يساوي عدد خيوط المعالجة على 'remote5'
])

معلمات الكلمات الرئيسية:

  • tunnel: إذا كانت true فسيتم استخدام نفق SSH للاتصال بالعامل من عملية الماستر. الافتراضي هو false.

  • multiplex: إذا كانت true فسيتم استخدام تعدد الإرسال عبر SSH لنفق SSH. الافتراضي هو false.

  • ssh: اسم أو مسار عميل SSH القابل للتنفيذ المستخدم لبدء العمال. الافتراضي هو "ssh".

  • sshflags: يحدد خيارات SSH إضافية، مثل sshflags=`-i /home/foo/bar.pem`

  • max_parallel: يحدد الحد الأقصى لعدد العمال المتصلين في وقت واحد على مضيف. الافتراضي هو 10.

  • shell: يحدد نوع الصدفة التي يتصل بها SSH بالعمال.

    • shell=:posix: صدفة Unix/Linux متوافقة مع POSIX (sh، ksh، bash، dash، zsh، إلخ). الافتراضي.
    • shell=:csh: صدفة C Unix (csh، tcsh).
    • shell=:wincmd: Microsoft Windows cmd.exe.
  • dir: يحدد دليل العمل على العمال. الافتراضي هو الدليل الحالي للمضيف (كما هو موجود بواسطة pwd())

  • enable_threaded_blas: إذا كانت true فسيتم تشغيل BLAS على خيوط متعددة في العمليات المضافة. الافتراضي هو false.

  • exename: اسم القابل للتنفيذ julia. الافتراضي هو "$(Sys.BINDIR)/julia" أو "$(Sys.BINDIR)/julia-debug" حسب الحالة. يُوصى باستخدام إصدار شائع من Julia على جميع الآلات البعيدة لأن التسلسل والتوزيع قد يفشل خلاف ذلك.

  • exeflags: علامات إضافية تمرر إلى عمليات العمال.

  • topology: يحدد كيفية اتصال العمال ببعضهم البعض. إرسال رسالة بين العمال غير المتصلين يؤدي إلى حدوث خطأ.

    • topology=:all_to_all: جميع العمليات متصلة ببعضها البعض. الافتراضي.
    • topology=:master_worker: فقط عملية السائق، أي pid 1 تتصل بالعمال. لا تتصل العمال ببعضها البعض.
    • topology=:custom: تحدد طريقة launch لمدير الكتلة طوبولوجيا الاتصال عبر الحقول ident و connect_idents في WorkerConfig. سيتصل عامل له هوية مدير الكتلة ident بجميع العمال المحددين في connect_idents.
  • lazy: ينطبق فقط مع topology=:all_to_all. إذا كانت true، يتم إعداد اتصالات العامل-العامل بشكل كسول، أي يتم إعدادها عند أول حالة من استدعاء بعيد بين العمال. الافتراضي هو true.

  • env: قدم مصفوفة من أزواج السلاسل مثل env=["JULIA_DEPOT_PATH"=>"/depot"] لطلب تعيين متغيرات البيئة على الآلة البعيدة. بشكل افتراضي، يتم تمرير متغير البيئة JULIA_WORKER_TIMEOUT تلقائيًا من البيئة المحلية إلى البيئة البعيدة.

  • cmdline_cookie: تمرير ملف تعريف المصادقة عبر خيار سطر الأوامر --worker. قد يؤدي السلوك الافتراضي (الأكثر أمانًا) المتمثل في تمرير ملف تعريف عبر SSH stdio إلى تعليق مع العمال على Windows الذين يستخدمون إصدارات أقدم (قبل ConPTY) من Julia أو Windows، وفي هذه الحالة يقدم cmdline_cookie=true حلاً بديلًا.

Julia 1.6

تمت إضافة معلمات الكلمات الرئيسية ssh و shell و env و cmdline_cookie في Julia 1.6.

متغيرات البيئة:

إذا فشلت عملية الماستر في إنشاء اتصال مع عامل تم إطلاقه حديثًا خلال 60.0 ثانية، فإن العامل يعتبر ذلك وضعًا مميتًا وينهي نفسه. يمكن التحكم في هذا المهلة عبر متغير البيئة JULIA_WORKER_TIMEOUT. تحدد قيمة JULIA_WORKER_TIMEOUT على عملية الماستر عدد الثواني التي ينتظرها العامل الذي تم إطلاقه حديثًا لإنشاء الاتصال.

source
addprocs(np::Integer=Sys.CPU_THREADS; restrict=true, kwargs...) -> قائمة معرفات العمليات

قم بإطلاق عمال np على المضيف المحلي باستخدام LocalManager المدمج.

ترث العمال المحليون بيئة الحزمة الحالية (أي المشروع النشط، LOAD_PATH، وDEPOT_PATH) من العملية الرئيسية.

!!! تحذير لاحظ أن العمال لا يقومون بتشغيل نص بدء التشغيل ~/.julia/config/startup.jl، ولا يقومون بمزامنة حالتهم العالمية (مثل مفاتيح سطر الأوامر، المتغيرات العالمية، تعريفات الطرق الجديدة، والوحدات المحملة) مع أي من العمليات الأخرى التي تعمل.

وسائط الكلمات الرئيسية:

  • restrict::Bool: إذا كانت true (افتراضي) فإن الربط مقيد إلى 127.0.0.1.
  • dir، exename، exeflags، env، topology، lazy، enable_threaded_blas: نفس التأثير كما هو الحال في SSHManager، انظر الوثائق لـ addprocs(machines::AbstractVector).

!!! توافق "جوليا 1.9" تم إضافة وراثة بيئة الحزمة ووسيط الكلمة الرئيسية env في جوليا 1.9.

source
Distributed.nprocsFunction
nprocs()

احصل على عدد العمليات المتاحة.

أمثلة

julia> nprocs()
3

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.nworkersFunction
nworkers()

احصل على عدد عمليات العمال المتاحة. هذا أقل بواحد من nprocs(). يساوي nprocs() إذا كان nprocs() == 1.

أمثلة

$ julia -p 2

julia> nprocs()
3

julia> nworkers()
2
source
Distributed.procsMethod
procs()

إرجاع قائمة بجميع معرفات العمليات، بما في ذلك pid 1 (الذي لا يتم تضمينه بواسطة workers()).

أمثلة

$ julia -p 2

julia> procs()
3-element Array{Int64,1}:
 1
 2
 3
source
Distributed.procsMethod
procs(pid::Integer)

إرجاع قائمة بجميع معرفات العمليات على نفس العقدة الفيزيائية. على وجه التحديد، يتم إرجاع جميع العمال المرتبطين بنفس عنوان ip مثل pid.

source
Distributed.workersFunction
workers()

إرجاع قائمة بجميع معرفات عمليات العمال.

أمثلة

$ julia -p 2

julia> workers()
2-element Array{Int64,1}:
 2
 3
source
Distributed.rmprocsFunction
rmprocs(pids...; waitfor=typemax(Int))

قم بإزالة العمال المحددين. لاحظ أن العملية 1 فقط يمكنها إضافة أو إزالة العمال.

المعلمة waitfor تحدد المدة التي يجب الانتظار فيها حتى يتم إيقاف تشغيل العمال:

  • إذا لم يتم تحديدها، ستنتظر rmprocs حتى يتم إزالة جميع pids المطلوبة.
  • يتم رفع ErrorException إذا لم يكن من الممكن إنهاء جميع العمال قبل انتهاء ثواني waitfor المطلوبة.
  • مع قيمة waitfor تساوي 0، تعود المكالمة على الفور مع جدولة العمال للإزالة في مهمة مختلفة. يتم إرجاع كائن Task المجدول. يجب على المستخدم استدعاء wait على المهمة قبل استدعاء أي مكالمات متوازية أخرى.

أمثلة

$ julia -p 5

julia> t = rmprocs(2, 3, waitfor=0)
Task (runnable) @0x0000000107c718d0

julia> wait(t)

julia> workers()
3-element Array{Int64,1}:
 4
 5
 6
source
Distributed.interruptFunction
interrupt(pids::Integer...)

قم بإيقاف المهمة الحالية التي يتم تنفيذها على العمال المحددين. هذا يعادل الضغط على Ctrl-C على الجهاز المحلي. إذا لم يتم إعطاء أي وسائط، يتم إيقاف جميع العمال.

source
interrupt(pids::AbstractVector=workers())

قم بإيقاف المهمة الحالية التي يتم تنفيذها على العمال المحددين. هذا يعادل الضغط على Ctrl-C على الجهاز المحلي. إذا لم يتم إعطاء أي معطيات، يتم إيقاف جميع العمال.

source
Distributed.myidFunction
myid()

احصل على معرف العملية الحالية.

أمثلة

julia> myid()
1

julia> remotecall_fetch(() -> myid(), 4)
4
source
Distributed.pmapFunction
pmap(f, [::AbstractWorkerPool], c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[], retry_check=nothing) -> collection

قم بتحويل المجموعة c عن طريق تطبيق f على كل عنصر باستخدام العمال والمهام المتاحة.

بالنسبة لعدة معطيات من المجموعات، يتم تطبيق f عنصرًا بعنصر.

لاحظ أنه يجب جعل f متاحًا لجميع عمليات العمال؛ انظر Code Availability and Loading Packages للحصول على التفاصيل.

إذا لم يتم تحديد مجموعة العمال، فسيتم استخدام جميع العمال المتاحين عبر CachingPool.

بشكل افتراضي، يقوم pmap بتوزيع الحساب على جميع العمال المحددين. لاستخدام العملية المحلية فقط وتوزيعها على المهام، حدد distributed=false. هذا يعادل استخدام asyncmap. على سبيل المثال، pmap(f, c; distributed=false) يعادل asyncmap(f,c; ntasks=()->nworkers())

يمكن أيضًا استخدام pmap مزيجًا من العمليات والمهام عبر معطى batch_size. بالنسبة لأحجام الدفعات الأكبر من 1، تتم معالجة المجموعة في دفعات متعددة، كل منها بطول batch_size أو أقل. يتم إرسال دفعة كطلب واحد إلى عامل مجاني، حيث تقوم asyncmap المحلية بمعالجة العناصر من الدفعة باستخدام مهام متزامنة متعددة.

أي خطأ يوقف pmap عن معالجة بقية المجموعة. لتجاوز هذا السلوك، يمكنك تحديد دالة معالجة الأخطاء عبر المعطى on_error التي تأخذ في اعتبارها معطى واحد، أي الاستثناء. يمكن للدالة إيقاف المعالجة عن طريق إعادة طرح الخطأ، أو، للاستمرار، إرجاع أي قيمة يتم إرجاعها بعد ذلك مع النتائج إلى المتصل.

اعتبر المثالين التاليين. الأول يعيد كائن الاستثناء في السطر، والثاني 0 بدلاً من أي استثناء:

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=identity)
4-element Array{Any,1}:
 1
  ErrorException("foo")
 3
  ErrorException("foo")

julia> pmap(x->iseven(x) ? error("foo") : x, 1:4; on_error=ex->0)
4-element Array{Int64,1}:
 1
 0
 3
 0

يمكن أيضًا معالجة الأخطاء عن طريق إعادة محاولة الحسابات الفاشلة. يتم تمرير المعطيات الرئيسية retry_delays و retry_check إلى retry كمعطيات رئيسية delays و check على التوالي. إذا تم تحديد التجميع، وإذا فشلت دفعة كاملة، يتم إعادة محاولة جميع العناصر في الدفعة.

لاحظ أنه إذا تم تحديد كل من on_error و retry_delays، يتم استدعاء دالة on_error قبل إعادة المحاولة. إذا لم تقم on_error بإلقاء (أو إعادة إلقاء) استثناء، فلن يتم إعادة محاولة العنصر.

مثال: عند حدوث الأخطاء، أعد محاولة f على عنصر بحد أقصى 3 مرات دون أي تأخير بين المحاولات.

pmap(f, c; retry_delays = zeros(3))

مثال: أعد محاولة f فقط إذا لم يكن الاستثناء من نوع InexactError، مع تأخيرات متزايدة بشكل أسي تصل إلى 3 مرات. أعد قيمة NaN في مكان جميع حالات InexactError.

pmap(f, c; on_error = e->(isa(e, InexactError) ? NaN : rethrow()), retry_delays = ExponentialBackOff(n = 3))
source
Distributed.RemoteExceptionType
RemoteException(captured)

يتم التقاط الاستثناءات في الحسابات عن بُعد وإعادة طرحها محليًا. يقوم RemoteException بلف pid للعامل واستثناء تم التقاطه. يقوم CapturedException بالتقاط الاستثناء عن بُعد وشكل قابل للتسلسل من مكدس الاستدعاءات عندما تم رفع الاستثناء.

source
Distributed.ProcessExitedExceptionType
ProcessExitedException(worker_id::Int)

بعد أن تخرج عملية جوليا العميل، ستؤدي المحاولات الإضافية للإشارة إلى الطفل الميت إلى إلقاء هذا الاستثناء.

source
Distributed.FutureType
Future(w::Int, rrid::RRID, v::Union{Some, Nothing}=nothing)

Future هو عنصر نائب لحساب واحد بحالة إنهاء غير معروفة ووقت غير محدد. للحسابات المحتملة المتعددة، انظر RemoteChannel. انظر remoteref_id لتحديد AbstractRemoteRef.

source
Distributed.RemoteChannelType
RemoteChannel(pid::Integer=myid())

قم بإنشاء مرجع إلى Channel{Any}(1) على العملية pid. pid الافتراضي هو العملية الحالية.

RemoteChannel(f::Function, pid::Integer=myid())

قم بإنشاء مراجع لقنوات بعيدة من حجم ونوع محددين. f هو دالة عند تنفيذها على pid يجب أن تعيد تنفيذًا لـ AbstractChannel.

على سبيل المثال، RemoteChannel(()->Channel{Int}(10), pid)، ستعيد مرجعًا إلى قناة من النوع Int وحجم 10 على pid.

pid الافتراضي هو العملية الحالية.

source
Base.fetchMethod
fetch(x::Future)

انتظر واحصل على قيمة Future. القيمة المسترجعة مخزنة محليًا. المكالمات الإضافية إلى fetch على نفس المرجع تعيد القيمة المخزنة. إذا كانت القيمة البعيدة استثناءً، يتم إلقاء RemoteException التي تلتقط الاستثناء البعيد وتتبع الخطأ.

source
Base.fetchMethod
fetch(c::RemoteChannel)

انتظر واحصل على قيمة من RemoteChannel. الاستثناءات التي تثار هي نفسها كما في Future. لا يزيل العنصر الذي تم جلبه.

source
fetch(x::Any)

إرجاع x.

source
Distributed.remotecallMethod
remotecall(f, id::Integer, args...; kwargs...) -> Future

استدعاء دالة f بشكل غير متزامن مع المعطيات المعطاة على العملية المحددة. يُرجع Future. يتم تمرير الوسائط الرئيسية، إن وجدت، إلى f.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, id::Integer, args...; kwargs...)

قم بتنفيذ wait(remotecall(...)) بشكل أسرع في رسالة واحدة على Worker المحدد بواسطة معرف العامل id. يتم تمرير الوسائط الرئيسية، إن وجدت، إلى f.

انظر أيضًا wait و remotecall.

source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, id::Integer, args...; kwargs...)

قم بتنفيذ fetch(remotecall(...)) في رسالة واحدة. يتم تمرير أي معلمات مفتاحية، إن وجدت، إلى f. يتم التقاط أي استثناءات عن بُعد في RemoteException وإلقاؤها.

انظر أيضًا fetch و remotecall.

أمثلة

$ julia -p 2

julia> remotecall_fetch(sqrt, 2, 4)
2.0

julia> remotecall_fetch(sqrt, 2, -4)
ERROR: On worker 2:
DomainError with -4.0:
sqrt was called with a negative real argument but will only return a complex result if called with a complex argument. Try sqrt(Complex(x)).
...
source
Distributed.remote_doMethod
remote_do(f, id::Integer, args...; kwargs...) -> nothing

ينفذ f على العامل id بشكل غير متزامن. على عكس remotecall، فإنه لا يخزن نتيجة الحساب، ولا توجد طريقة للانتظار حتى اكتماله.

تشير الدعوة الناجحة إلى أن الطلب قد تم قبوله للتنفيذ على العقدة البعيدة.

بينما يتم تسلسل remotecalls المتتالية إلى نفس العامل بالترتيب الذي تم استدعاؤها به، فإن ترتيب التنفيذ على العامل البعيد غير محدد. على سبيل المثال، remote_do(f1, 2); remotecall(f2, 2); remote_do(f3, 2) ستسلسل الاستدعاء إلى f1، يليه f2 و f3 بهذا الترتيب. ومع ذلك، لا يضمن أن يتم تنفيذ f1 قبل f3 على العامل 2.

أي استثناءات يتم طرحها بواسطة f يتم طباعتها إلى stderr على العامل البعيد.

تُمرر الوسائط الرئيسية، إن وجدت، إلى f.

source
Base.put!Method
put!(rr::RemoteChannel, args...)

قم بتخزين مجموعة من القيم في RemoteChannel. إذا كانت القناة ممتلئة، يتم حظرها حتى يتوفر مساحة. ارجع الحجة الأولى.

source
Base.put!Method
put!(rr::Future, v)

قم بتخزين قيمة في Future rr. تعتبر Futures مراجع بعيدة للكتابة مرة واحدة. يؤدي استخدام put! على Future تم تعيينه بالفعل إلى رمي Exception. جميع المكالمات البعيدة غير المتزامنة تعيد Futures وتقوم بتعيين القيمة إلى قيمة الإرجاع للمكالمة عند الانتهاء.

source
Base.take!Method
take!(rr::RemoteChannel, args...)

استرجع القيمة (القيم) من RemoteChannel rr، مع إزالة القيمة (القيم) في هذه العملية.

source
Base.isreadyMethod
isready(rr::RemoteChannel, args...)

تحديد ما إذا كان RemoteChannel يحتوي على قيمة مخزنة فيه. لاحظ أن هذه الدالة يمكن أن تسبب حالات سباق، حيث أنه بحلول الوقت الذي تتلقى فيه نتيجتها قد لا يكون ذلك صحيحًا بعد الآن. ومع ذلك، يمكن استخدامها بأمان على Future لأنها تُعين مرة واحدة فقط.

source
Base.isreadyMethod
isready(rr::Future)

تحديد ما إذا كان Future يحتوي على قيمة مخزنة فيه.

إذا كانت الحجة Future مملوكة من قبل عقدة مختلفة، ستقوم هذه المكالمة بالانتظار للحصول على الإجابة. يُوصى بالانتظار لـ rr في مهمة منفصلة بدلاً من ذلك أو استخدام Channel محلي كوكيل:

p = 1
f = Future(p)
errormonitor(@async put!(f, remotecall_fetch(long_computation, p)))
isready(f)  # لن يتم حظره
source
Distributed.AbstractWorkerPoolType
AbstractWorkerPool

نوع أساسي لمجموعات العمال مثل WorkerPool و CachingPool. يجب على AbstractWorkerPool تنفيذ:

  • push! - إضافة عامل جديد إلى المجموعة العامة (متاح + مشغول)
  • put! - إعادة عامل إلى المجموعة المتاحة
  • take! - أخذ عامل من المجموعة المتاحة (لاستخدامه في تنفيذ الدوال عن بُعد)
  • length - عدد العمال المتاحين في المجموعة العامة
  • isready - إرجاع false إذا كان take! على المجموعة سيؤدي إلى حظر، وإلا true

تتطلب التنفيذات الافتراضية لما سبق (على AbstractWorkerPool) حقول

  • channel::Channel{Int}
  • workers::Set{Int}

حيث يحتوي channel على معرفات العمليات الحرة للعمال و workers هو مجموعة جميع العمال المرتبطين بهذه المجموعة.

source
Distributed.WorkerPoolType
WorkerPool(workers::Union{Vector{Int},AbstractRange{Int}})

أنشئ WorkerPool من متجه أو نطاق من معرفات العمال.

أمثلة

$ julia -p 3

julia> WorkerPool([2, 3])
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([2, 3]), RemoteChannel{Channel{Any}}(1, 1, 6))

julia> WorkerPool(2:4)
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:2), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 7))
source
Distributed.CachingPoolType
CachingPool(workers::Vector{Int})

تنفيذ لـ AbstractWorkerPool. remote، remotecall_fetch، pmap (وغيرها من المكالمات البعيدة التي تنفذ الدوال عن بُعد) تستفيد من تخزين الدوال المرسلة/المسترجعة على عقد العمال، خاصة الإغلاقات (التي قد تلتقط كميات كبيرة من البيانات).

يتم الحفاظ على ذاكرة التخزين المؤقت البعيدة طوال فترة حياة كائن CachingPool المعاد. لمسح الذاكرة المؤقتة في وقت مبكر، استخدم clear!(pool).

بالنسبة للمتغيرات العالمية، يتم التقاط الروابط فقط في إغلاق، وليس البيانات. يمكن استخدام كتل let لالتقاط البيانات العالمية.

أمثلة

const foo = rand(10^8);
wp = CachingPool(workers())
let foo = foo
    pmap(i -> sum(foo) + i, wp, 1:100);
end

سيتم نقل foo أعلاه مرة واحدة فقط إلى كل عامل.

source
Distributed.default_worker_poolFunction
default_worker_pool()

AbstractWorkerPool تحتوي على workers غير المستخدمة - تُستخدم بواسطة remote(f) و pmap (بشكل افتراضي). ما لم يتم تعيين واحدة بشكل صريح عبر default_worker_pool!(pool), يتم تهيئة مجموعة العمال الافتراضية إلى WorkerPool.

أمثلة

$ julia -p 3

julia> default_worker_pool()
WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4))
source
Distributed.clear!Function
clear!(syms, pids=workers(); mod=Main)

يمسح الربط العالمي في الوحدات عن طريق تهيئتها إلى nothing. يجب أن يكون syms من نوع Symbol أو مجموعة من Symbols. تحدد pids و mod العمليات والوحدة التي يجب إعادة تهيئة المتغيرات العالمية فيها. يتم مسح الأسماء التي تم العثور عليها فقط والتي تم تعريفها تحت mod.

يتم رفع استثناء إذا تم طلب مسح ثابت عالمي.

source
clear!(pool::CachingPool) -> pool

يُزيل جميع الدوال المخزنة من جميع العمال المشاركين.

source
Distributed.remoteFunction
remote([p::AbstractWorkerPool], f) -> Function

إرجاع دالة غير مسماة تنفذ الدالة f على عامل متاح (مأخوذ من WorkerPool p إذا تم توفيره) باستخدام remotecall_fetch.

source
Distributed.remotecallMethod
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool نسخة من remotecall(f, pid, ....). انتظر وخذ عاملًا مجانيًا من pool وقم بتنفيذ remotecall عليه.

أمثلة

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall(maximum, wp, A)
Future(2, 1, 6, nothing)

في هذا المثال، تم تنفيذ المهمة على pid 2، وتم استدعاؤها من pid 1.

source
Distributed.remotecall_waitMethod
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) -> Future

WorkerPool نسخة من remotecall_wait(f, pid, ....). انتظر وخذ عاملًا مجانيًا من pool وقم بتنفيذ remotecall_wait عليه.

أمثلة

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> f = remotecall_wait(maximum, wp, A)
Future(3, 1, 9, nothing)

julia> fetch(f)
0.9995177101692958
source
Distributed.remotecall_fetchMethod
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) -> result

WorkerPool نسخة من remotecall_fetch(f, pid, ....). تنتظر وتأخذ عاملاً مجانياً من pool وتقوم بتنفيذ remotecall_fetch عليه.

أمثلة

$ julia -p 3

julia> wp = WorkerPool([2, 3]);

julia> A = rand(3000);

julia> remotecall_fetch(maximum, wp, A)
0.9995177101692958
source
Distributed.remote_doMethod
remote_do(f, pool::AbstractWorkerPool, args...; kwargs...) -> nothing

نسخة WorkerPool من remote_do(f, pid, ....). انتظر واحصل على عامل مجاني من pool وقم بتنفيذ remote_do عليه.

source
Distributed.@spawnMacro
@spawn expr

قم بإنشاء إغلاق حول تعبير وتشغيله على عملية يتم اختيارها تلقائيًا، مع إرجاع Future للنتيجة. هذه الماكرو قديمة؛ يجب استخدام @spawnat :any expr بدلاً من ذلك.

أمثلة

julia> addprocs(3);

julia> f = @spawn myid()
Future(2, 1, 5, nothing)

julia> fetch(f)
2

julia> f = @spawn myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

اعتبارًا من Julia 1.3، هذه الماكرو قديمة. استخدم @spawnat :any بدلاً من ذلك.

source
Distributed.@spawnatMacro
@spawnat p expr

قم بإنشاء إغلاق حول تعبير وتشغيل الإغلاق بشكل غير متزامن على العملية p. أعد Future للنتيجة. إذا كانت p هي الرمز الحرفي المقتبس :any، فسيقوم النظام باختيار معالج للاستخدام تلقائيًا.

أمثلة

julia> addprocs(3);

julia> f = @spawnat 2 myid()
Future(2, 1, 3, nothing)

julia> fetch(f)
2

julia> f = @spawnat :any myid()
Future(3, 1, 7, nothing)

julia> fetch(f)
3
Julia 1.3

الوسيطة :any متاحة اعتبارًا من Julia 1.3.

source
Distributed.@fetchMacro
@fetch expr

يعادل fetch(@spawnat :any expr). انظر fetch و @spawnat.

أمثلة

julia> addprocs(3);

julia> @fetch myid()
2

julia> @fetch myid()
3

julia> @fetch myid()
4

julia> @fetch myid()
2
source
Distributed.@distributedMacro
@distributed

حلقة for متوازية في الذاكرة الموزعة بالشكل:

@distributed [reducer] for var = range
    body
end

يتم تقسيم النطاق المحدد وتنفيذه محليًا عبر جميع العمال. في حالة تحديد دالة مختزلة اختيارية، يقوم @distributed بإجراء اختزالات محلية على كل عامل مع اختزال نهائي على العملية المستدعية.

لاحظ أنه بدون دالة مختزلة، يقوم @distributed بالتنفيذ بشكل غير متزامن، أي أنه يطلق مهام مستقلة على جميع العمال المتاحين ويعود على الفور دون الانتظار للاكتمال. للانتظار حتى الاكتمال، قم بإضافة بادئة للنداء بـ @sync، مثل:

@sync @distributed for var = range
    body
end
source
Distributed.@everywhereMacro
@everywhere [procs()] expr

نفذ تعبيرًا تحت Main على جميع procs. يتم جمع الأخطاء في أي من العمليات في CompositeException وإلقاؤها. على سبيل المثال:

@everywhere bar = 1

سيقوم بتعريف Main.bar على جميع العمليات الحالية. أي عمليات تضاف لاحقًا (مثل addprocs()) لن يكون لديها التعبير معرف.

على عكس @spawnat، فإن @everywhere لا تلتقط أي متغيرات محلية. بدلاً من ذلك، يمكن بث المتغيرات المحلية باستخدام الاستيفاء:

foo = 1
@everywhere bar = $foo

تسمح الحجة الاختيارية procs بتحديد مجموعة فرعية من جميع العمليات لتنفيذ التعبير.

مماثل لاستدعاء remotecall_eval(Main, procs, expr)، ولكن مع ميزتين إضافيتين:

- يتم تنفيذ عبارات `using` و `import` على عملية الاستدعاء أولاً، لضمان
  تجميع الحزم مسبقًا.
- يتم تمرير مسار ملف المصدر الحالي المستخدم بواسطة `include` إلى العمليات الأخرى.
source
Distributed.remoteref_idFunction
remoteref_id(r::AbstractRemoteRef) -> RRID

Futures و RemoteChannels يتم التعرف عليها من خلال الحقول:

  • where - تشير إلى العقدة التي يوجد فيها الكائن/التخزين الأساسي المشار إليه بواسطة المرجع.
  • whence - تشير إلى العقدة التي تم إنشاء المرجع البعيد منها. لاحظ أن هذا يختلف عن العقدة التي يوجد فيها الكائن الأساسي المشار إليه. على سبيل المثال، استدعاء RemoteChannel(2) من عملية الماستر سيؤدي إلى قيمة where تساوي 2 وقيمة whence تساوي 1.
  • id فريد عبر جميع المراجع التي تم إنشاؤها من العامل المحدد بواسطة whence.

معًا، whence و id يحددان مرجعًا بشكل فريد عبر جميع العمال.

remoteref_id هو واجهة برمجة تطبيقات منخفضة المستوى تعيد كائن RRID الذي يلتف حول قيم whence و id لمرجع بعيد.

source
Distributed.channel_from_idFunction
channel_from_id(id) -> c

واجهة برمجة تطبيقات منخفضة المستوى تعيد AbstractChannel الداعمة لـ id الذي تم إرجاعه بواسطة remoteref_id. الاستدعاء صالح فقط على العقدة حيث توجد القناة الداعمة.

source
Distributed.worker_id_from_socketFunction
worker_id_from_socket(s) -> pid

واجهة برمجة تطبيقات منخفضة المستوى، والتي، عند إعطائها اتصال IO أو Worker، تعيد pid للعامل المتصل به. هذا مفيد عند كتابة طرق serialize مخصصة لنوع ما، والتي تعمل على تحسين البيانات المكتوبة بناءً على معرف عملية الاستقبال.

source
Distributed.cluster_cookieMethod
cluster_cookie(cookie) -> cookie

قم بتعيين الكوكي الممرر ككوكي العنقود، ثم أعده.

source

Cluster Manager Interface

تقدم هذه الواجهة آلية لإطلاق وإدارة عمال جوليا في بيئات الكلاستر المختلفة. هناك نوعان من المديرين موجودين في Base: LocalManager، لإطلاق عمال إضافيين على نفس المضيف، و SSHManager، لإطلاقهم على مضيفين بعيدين عبر ssh. يتم استخدام مآخذ TCP/IP للاتصال ونقل الرسائل بين العمليات. من الممكن أن توفر مدراء الكلاستر وسيلة نقل مختلفة.

Distributed.ClusterManagerType
ClusterManager

نوع أعلى لمديري الكتل، الذين يتحكمون في عمليات العمال ككتلة. يقوم مدراء الكتل بتنفيذ كيفية إضافة العمال وإزالتهم والتواصل معهم. SSHManager و LocalManager هما نوعان فرعيان من هذا.

source
Distributed.WorkerConfigType
WorkerConfig

نوع يستخدمه ClusterManagers للتحكم في العمال المضافين إلى مجموعاتهم. يتم استخدام بعض الحقول من قبل جميع مديري المجموعات للوصول إلى مضيف:

  • io – الاتصال المستخدم للوصول إلى العامل (نوع فرعي من IO أو Nothing)
  • host – عنوان المضيف (إما String أو Nothing)
  • port – المنفذ على المضيف المستخدم للاتصال بالعامل (إما Int أو Nothing)

يتم استخدام بعضها من قبل مدير المجموعة لإضافة العمال إلى مضيف تم تهيئته بالفعل:

  • count – عدد العمال الذين سيتم إطلاقهم على المضيف
  • exename – المسار إلى الملف التنفيذي لجوليا على المضيف، الافتراضي هو "$(Sys.BINDIR)/julia" أو "$(Sys.BINDIR)/julia-debug"
  • exeflags – العلامات التي يجب استخدامها عند إطلاق جوليا عن بُعد

يتم استخدام حقل userdata لتخزين المعلومات لكل عامل من قبل المديرين الخارجيين.

يتم استخدام بعض الحقول من قبل SSHManager ومديرين مشابهين:

  • tunneltrue (استخدام النفق)، false (عدم استخدام النفق)، أو nothing (استخدام الافتراضي للمدير)
  • multiplextrue (استخدام تعدد الإرسال SSH للنفق) أو false
  • forward – خيار التوجيه المستخدم لخيار -L من ssh
  • bind_addr – العنوان على المضيف البعيد الذي سيتم الربط به
  • sshflags – العلامات التي يجب استخدامها في إنشاء اتصال SSH
  • max_parallel – الحد الأقصى لعدد العمال للاتصال به بشكل متوازي على المضيف

يتم استخدام بعض الحقول من قبل كل من LocalManagers و SSHManagers:

  • connect_at – يحدد ما إذا كانت هذه مكالمة إعداد من عامل إلى عامل أو من سائق إلى عامل
  • process – العملية التي سيتم الاتصال بها (عادةً ما يقوم المدير بتعيين هذا أثناء addprocs)
  • ospid – معرف العملية وفقًا لنظام تشغيل المضيف، يستخدم لقطع عمليات العمال
  • environ – قاموس خاص يستخدم لتخزين المعلومات المؤقتة من قبل المديرين المحليين/SSH
  • ident – العامل كما تم التعرف عليه بواسطة ClusterManager
  • connect_idents – قائمة بمعرفات العمال التي يجب على العامل الاتصال بها إذا كان يستخدم طوبولوجيا مخصصة
  • enable_threaded_blastrue، false، أو nothing، سواء لاستخدام BLAS متعدد الخيوط أم لا على العمال
source
Distributed.launchFunction
launch(manager::ClusterManager, params::Dict, launched::Array, launch_ntfy::Condition)

تم تنفيذها بواسطة مديري الكلاستر. يجب على كل عامل جوليا يتم إطلاقه بواسطة هذه الوظيفة أن يضيف إدخال WorkerConfig إلى launched ويقوم بإخطار launch_ntfy. يجب أن تخرج الوظيفة بمجرد إطلاق جميع العمال، الذين طلبهم manager. params هو قاموس لجميع الوسائط الرئيسية التي تم استدعاء addprocs بها.

source
Distributed.manageFunction
manage(manager::ClusterManager, id::Integer, config::WorkerConfig. op::Symbol)

تم تنفيذها بواسطة مديري الكلاستر. يتم استدعاؤها في عملية الماستر، خلال فترة حياة العامل، مع قيم op المناسبة:

  • مع :register/:deregister عند إضافة عامل / إزالة عامل من مجموعة عمال جوليا.
  • مع :interrupt عند استدعاء interrupt(workers). يجب على ClusterManager إرسال إشارة مقاطعة للعامل المناسب.
  • مع :finalize لأغراض التنظيف.
source
Base.killMethod
kill(manager::ClusterManager, pid::Int, config::WorkerConfig)

يتم تنفيذه بواسطة مديري الكلاستر. يتم استدعاؤه على عملية الماستر، بواسطة rmprocs. يجب أن يتسبب في خروج العامل البعيد المحدد بواسطة pid. يقوم kill(manager::ClusterManager.....) بتنفيذ exit() عن بُعد على pid.

source
Sockets.connectMethod
connect(manager::ClusterManager, pid::Int, config::WorkerConfig) -> (instrm::IO, outstrm::IO)

تم تنفيذها بواسطة مديري الكلاستر باستخدام وسائل النقل المخصصة. يجب أن تؤسس اتصالًا منطقيًا بالعمالة ذات المعرف pid، المحدد بواسطة config وتعيد زوجًا من كائنات IO. سيتم قراءة الرسائل من pid إلى العملية الحالية من instrm، بينما سيتم كتابة الرسائل المرسلة إلى pid إلى outstrm. يجب على تنفيذ وسائل النقل المخصصة ضمان تسليم الرسائل واستلامها بالكامل وبالترتيب. connect(manager::ClusterManager.....) يقوم بإعداد اتصالات مقبس TCP/IP بين العمال.

source
Distributed.init_workerFunction
init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClusterManager())

يتم استدعاؤه بواسطة مديري الكلاستر الذين ينفذون وسائل النقل المخصصة. يقوم بتهيئة عملية جديدة تم إطلاقها كعامل. حجة سطر الأوامر --worker[=<cookie>] لها تأثير تهيئة عملية كعامل باستخدام مآخذ TCP/IP للنقل. cookie هو cluster_cookie.

source
Distributed.start_workerFunction
start_worker([out::IO=stdout], cookie::AbstractString=readline(stdin); close_stdin::Bool=true, stderr_to_stdout::Bool=true)

start_worker هي دالة داخلية وهي نقطة الدخول الافتراضية لعمليات العمال المتصلة عبر TCP/IP. تقوم بإعداد العملية كعامل في مجموعة جوليا.

معلومات المضيف:المنفذ تُكتب إلى التدفق out (الذي يكون افتراضيًا stdout).

تقرأ الدالة الكوكي من stdin إذا لزم الأمر، وتستمع على منفذ مجاني (أو إذا تم تحديده، المنفذ في خيار سطر الأوامر --bind-to) وتجدول المهام لمعالجة اتصالات TCP الواردة والطلبات. كما أنها (اختياريًا) تغلق stdin وتعيد توجيه stderr إلى stdout.

لا تُرجع أي قيمة.

source
Distributed.process_messagesFunction
process_messages(r_stream::IO, w_stream::IO, incoming::Bool=true)

يتم استدعاؤه بواسطة مديري الكلاستر باستخدام وسائل النقل المخصصة. يجب استدعاؤه عندما تتلقى تنفيذ وسيلة النقل المخصصة الرسالة الأولى من عامل بعيد. يجب على وسيلة النقل المخصصة إدارة اتصال منطقي بالعامل البعيد وتوفير كائنين من نوع IO، واحد للرسائل الواردة والآخر للرسائل الموجهة إلى العامل البعيد. إذا كان incoming هو true، فإن النظير البعيد هو الذي بدأ الاتصال. أي من الزوجين يبدأ الاتصال يرسل ملف تعريف الكلاستر ورقم إصدار جوليا الخاص به لإجراء مصافحة المصادقة.

انظر أيضًا cluster_cookie.

source
Distributed.default_addprocs_paramsFunction
default_addprocs_params(mgr::ClusterManager) -> Dict{Symbol, Any}

تم تنفيذها بواسطة مديري المجموعات. معلمات الكلمات الرئيسية الافتراضية المرسلة عند استدعاء addprocs(mgr). تتوفر مجموعة الخيارات الدنيا عن طريق استدعاء default_addprocs_params()

source