Asynchronous Programming
عندما يحتاج البرنامج إلى التفاعل مع العالم الخارجي، على سبيل المثال التواصل مع آلة أخرى عبر الإنترنت، قد تحتاج العمليات في البرنامج إلى الحدوث بترتيب غير متوقع. لنفترض أن برنامجك يحتاج إلى تنزيل ملف. نود أن نبدأ عملية التنزيل، ونقوم بأداء عمليات أخرى أثناء انتظارنا لإكمالها، ثم نعود إلى الكود الذي يحتاج إلى الملف الذي تم تنزيله عندما يصبح متاحًا. هذا النوع من السيناريو يقع في نطاق البرمجة غير المتزامنة، والتي تُعرف أحيانًا أيضًا بالبرمجة المتزامنة (لأنه، من الناحية المفاهيمية، تحدث أشياء متعددة في وقت واحد).
لمعالجة هذه السيناريوهات، توفر جوليا Task
(المعروفة أيضًا بعدة أسماء أخرى، مثل التعاونية المتناظرة، والخيوط الخفيفة، والمهام المتعددة التعاونية، أو الاستمراريات ذات الاستخدام الواحد). عندما يتم تعيين قطعة من العمل الحاسوبي (في الممارسة العملية، تنفيذ دالة معينة) كـ 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566
، يصبح من الممكن مقاطعتها عن طريق التبديل إلى 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566
آخر. يمكن استئناف 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566
الأصلي لاحقًا، في هذه النقطة ستستأنف من حيث توقفت. في البداية، قد يبدو هذا مشابهًا لاستدعاء دالة. ومع ذلك، هناك اختلافان رئيسيان. أولاً، لا يستخدم تبديل المهام أي مساحة، لذا يمكن أن تحدث أي عدد من تبديلات المهام دون استهلاك مكدس الاستدعاء. ثانيًا، يمكن أن يحدث التبديل بين المهام بأي ترتيب، على عكس استدعاءات الدوال، حيث يجب أن تنتهي الدالة المستدعاة من التنفيذ قبل أن يعود التحكم إلى الدالة المستدعية.
Basic Task
operations
يمكنك التفكير في Task
كمعالج لوحدة من العمل الحاسوبي الذي سيتم تنفيذه. لديها دورة حياة تتكون من إنشاء-بدء-تشغيل-إنهاء. يتم إنشاء المهام عن طريق استدعاء مُنشئ Task
على دالة بدون وسائط لتشغيلها، أو باستخدام الماكرو @task
:
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
@task x
يعادل Task(()->x)
.
ستنتظر هذه المهمة خمس ثوانٍ، ثم ستطبع done
. ومع ذلك، لم تبدأ في التنفيذ بعد. يمكننا تشغيلها متى كنا مستعدين عن طريق استدعاء schedule
:
julia> schedule(t);
إذا جربت هذا في REPL، سترى أن schedule
يعود على الفور. ذلك لأنه ببساطة يضيف t
إلى قائمة داخلية من المهام التي يجب تنفيذها. ثم، سيقوم REPL بطباعة الموجه التالي وينتظر المزيد من الإدخال. الانتظار لإدخال من لوحة المفاتيح يوفر فرصة لتشغيل مهام أخرى، لذا في تلك اللحظة ستبدأ t
. تستدعي t
sleep
، الذي يضبط مؤقتًا ويوقف التنفيذ. إذا كانت هناك مهام أخرى قد تم جدولتها، يمكن أن تعمل حينها. بعد خمس ثوانٍ، ينطلق المؤقت ويعيد تشغيل t
، وسترى done
مطبوعة. ثم تنتهي t
.
The wait
function blocks the calling task until some other task finishes. So for example if you type
julia> schedule(t); wait(t)
بدلاً من استدعاء schedule
فقط، سترى توقفًا لمدة خمس ثوانٍ قبل ظهور موجه الإدخال التالي. وذلك لأن REPL ينتظر انتهاء t
قبل المتابعة.
من الشائع الرغبة في إنشاء مهمة وجدولتها على الفور، لذا فإن الماكرو @async
متوفر لهذا الغرض –- @async x
يعادل schedule(@task x)
.
Communicating with Channels
في بعض المشاكل، لا ترتبط الأجزاء المختلفة من العمل المطلوب بشكل طبيعي من خلال استدعاءات الوظائف؛ لا يوجد "مستدعي" أو "مستدعى" واضح بين المهام التي يجب إنجازها. مثال على ذلك هو مشكلة المنتج-المستهلك، حيث تقوم إجراء معقد واحد بتوليد القيم وإجراء معقد آخر باستهلاكها. لا يمكن للمستهلك ببساطة استدعاء دالة المنتج للحصول على قيمة، لأن المنتج قد يكون لديه المزيد من القيم لتوليدها وبالتالي قد لا يكون جاهزًا بعد للعودة. مع المهام، يمكن للمنتج والمستهلك العمل معًا طالما يحتاجان إلى ذلك، مع تمرير القيم ذهابًا وإيابًا حسب الحاجة.
جوليا توفر آلية Channel
لحل هذه المشكلة. 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
هو طابور قابل للانتظار من نوع الأول في الدخول والأول في الخروج يمكن أن يكون لديه مهام متعددة تقرأ منه وتكتب إليه.
لنحدد مهمة المنتج، التي تنتج القيم عبر استدعاء put!
. لاستهلاك القيم، نحتاج إلى جدولة المنتج ليعمل في مهمة جديدة. يمكن استخدام مُنشئ خاص Channel
الذي يقبل دالة ذات وسيط واحد كوسيط لتشغيل مهمة مرتبطة بقناة. يمكننا بعد ذلك take!
القيم بشكل متكرر من كائن القناة:
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
طريقة واحدة للتفكير في هذا السلوك هي أن producer
كان قادرًا على العودة عدة مرات. بين استدعاءات put!
، يتم تعليق تنفيذ المنتج ويكون للمستهلك السيطرة.
يمكن استخدام Channel
ككائن قابل للتكرار في حلقة for
، وفي هذه الحالة تأخذ متغير الحلقة جميع القيم الناتجة. يتم إنهاء الحلقة عندما يتم إغلاق القناة.
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
لاحظ أننا لم نضطر إلى إغلاق القناة بشكل صريح في المنتج. وذلك لأن فعل ربط Channel
بـ Task
يربط عمر القناة المفتوحة بعمر المهمة المرتبطة. يتم إغلاق كائن القناة تلقائيًا عند إنهاء المهمة. يمكن ربط قنوات متعددة بمهمة، والعكس صحيح.
بينما يتوقع المُنشئ Task
دالة بدون وسائط، فإن الطريقة Channel
التي تنشئ قناة مرتبطة بالمهمة تتوقع دالة تقبل وسيطًا واحدًا من نوع 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
. نمط شائع هو أن يكون المنتج مُعَلمًا، وفي هذه الحالة تكون هناك حاجة لتطبيق دالة جزئية لإنشاء دالة بوسيطين أو وسائط واحدة anonymous function.
بالنسبة لكائنات Task
يمكن القيام بذلك إما مباشرة أو باستخدام ماكرو ملائم:
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)
لإدارة أنماط توزيع العمل الأكثر تقدمًا، يمكن استخدام bind
و schedule
بالتزامن مع Task
و Channel
لإنشاء رابط صريح بين مجموعة من القنوات ومجموعة من مهام المنتج/المستهلك.
More on Channels
يمكن تصور القناة على أنها أنبوب، أي أن لها طرف كتابة وطرف قراءة:
يمكن لكتّاب متعددين في مهام مختلفة الكتابة إلى نفس القناة في وقت واحد عبر
put!
استدعاءات.يمكن لعدة قراء في مهام مختلفة قراءة البيانات بشكل متزامن عبر
take!
استدعاءات.كمثال:
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # and a function `foo` which reads items from c1, processes the item read # and writes a result to c2, function foo() while true data = take!(c1) [...] # process data put!(c2, result) # write out result end end # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n errormonitor(@async foo()) end
تُنشأ القنوات عبر المُنشئ
Channel{T}(sz)
. ستحتفظ القناة فقط بالكائنات من النوعT
. إذا لم يتم تحديد النوع، يمكن للقناة أن تحتفظ بكائنات من أي نوع. يُشيرsz
إلى الحد الأقصى لعدد العناصر التي يمكن الاحتفاظ بها في القناة في أي وقت. على سبيل المثال،Channel(32)
ينشئ قناة يمكن أن تحتفظ بحد أقصى قدره 32 كائنًا من أي نوع. يمكن لـChannel{MyType}(64)
الاحتفاظ بما يصل إلى 64 كائنًا منMyType
في أي وقت.إذا كان
Channel
فارغًا، فإن القراء (على استدعاءtake!
) سيحجبون حتى تتوفر البيانات.إذا كانت
Channel
ممتلئة، فإن الكتاب (على اتصالput!
) سيحجبون حتى تصبح المساحة متاحة.isready
تختبر وجود أي كائن في القناة، بينماwait
تنتظر كائنًا ليصبح متاحًا.Channel
في حالة مفتوحة في البداية. هذا يعني أنه يمكن القراءة منه والكتابة إليه بحرية عبرtake!
وput!
استدعاءات.close
يغلق4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
. على4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
مغلقة،4d61726b646f776e2e436f64652822222c2022707574212229_40726566
سيفشل. على سبيل المثال:julia> c = Channel(2); julia> put!(c, 1) # `put!` on an open channel succeeds 1 julia> close(c); julia> put!(c, 2) # `put!` on a closed channel throws an exception. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
take!
وfetch
(الذي يسترجع ولكن لا يزيل القيمة) على قناة مغلقة يعود بنجاح بأي قيم موجودة حتى يتم إفراغها. متابعة المثال أعلاه:julia> fetch(c) # Any number of `fetch` calls succeed. 1 julia> fetch(c) 1 julia> take!(c) # The first `take!` removes the value. 1 julia> take!(c) # No more data available on a closed channel. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
اعتبر مثالًا بسيطًا يستخدم القنوات للتواصل بين المهام. نبدأ 4 مهام لمعالجة البيانات من قناة jobs
واحدة. يتم كتابة الوظائف، التي يتم التعرف عليها بواسطة معرف (job_id
)، إلى القناة. كل مهمة في هذه المحاكاة تقرأ job_id
، تنتظر لفترة زمنية عشوائية وتكتب مرة أخرى مجموعة من job_id
والوقت المحاكى إلى قناة النتائج. أخيرًا، يتم طباعة جميع results
.
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
بدلاً من errormonitor(t)
، قد تكون الحل الأكثر قوة هو استخدام bind(results, t)
، حيث أنه لن يقوم فقط بتسجيل أي فشل غير متوقع، بل سيجبر أيضًا الموارد المرتبطة على الإغلاق ونشر الاستثناء في كل مكان.
More task operations
تُبنى عمليات المهام على بدائية منخفضة المستوى تُسمى yieldto
. تقوم yieldto(task, value)
بتعليق المهمة الحالية، والتبديل إلى المهمة المحددة، وتسبب في أن تعود آخر مكالمة لـ 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566
لتلك المهمة بالقيمة المحددة. لاحظ أن 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566
هي العملية الوحيدة المطلوبة لاستخدام تدفق التحكم بأسلوب المهام؛ بدلاً من الاستدعاء والعودة، نحن دائماً نتبدل إلى مهمة مختلفة. لهذا السبب تُسمى هذه الميزة أيضًا "التعاون المتناظر"; يتم التبديل بين كل مهمة باستخدام نفس الآلية.
yieldto
قوية، لكن معظم استخدامات المهام لا تستدعيها مباشرة. اعتبر لماذا قد يكون هذا. إذا قمت بالتبديل بعيدًا عن المهمة الحالية، فمن المحتمل أنك سترغب في العودة إليها في مرحلة ما، لكن معرفة متى تعود، ومعرفة أي مهمة تتحمل مسؤولية العودة، يمكن أن يتطلب تنسيقًا كبيرًا. على سبيل المثال، put!
و take!
هما عمليات حظر، والتي، عند استخدامها في سياق القنوات، تحافظ على الحالة لتتذكر من هم المستهلكون. عدم الحاجة إلى تتبع المهمة المستهلكة يدويًا هو ما يجعل 4d61726b646f776e2e436f64652822222c2022707574212229_40726566
أسهل في الاستخدام من 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566
المنخفضة المستوى.
بالإضافة إلى yieldto
، هناك بعض الوظائف الأساسية الأخرى اللازمة لاستخدام المهام بفعالية.
current_task
يحصل على مرجع للمهمة الجارية حاليًا.istaskdone
يستفسر عما إذا كانت المهمة قد انتهت.istaskstarted
يستفسر عما إذا كانت المهمة قد تم تنفيذها بعد.task_local_storage
يتلاعب بمخزن القيم المفتاحية المحدد للمهمة الحالية.
Tasks and events
تحدث معظم تبديلات المهام نتيجة الانتظار للأحداث مثل طلبات الإدخال/الإخراج، ويتم تنفيذها بواسطة جدولة مدمجة في قاعدة جوليا. يحتفظ المجدول بقائمة من المهام القابلة للتنفيذ، وينفذ حلقة أحداث تعيد تشغيل المهام بناءً على الأحداث الخارجية مثل وصول الرسائل.
The basic function for waiting for an event is wait
. Several objects implement wait
; for example, given a Process
object, wait
will wait for it to exit. wait
is often implicit; for example, a wait
can happen inside a call to read
to wait for data to be available.
في جميع هذه الحالات، wait
تعمل في النهاية على كائن Condition
، الذي يتولى مسؤولية تنظيم وإعادة تشغيل المهام. عندما تستدعي مهمة 4d61726b646f776e2e436f64652822222c2022776169742229_40726566
على 4d61726b646f776e2e436f64652822222c2022436f6e646974696f6e2229_40726566
، يتم وضع علامة على المهمة بأنها غير قابلة للتنفيذ، وتضاف إلى قائمة الانتظار الخاصة بالشرط، وتتحول إلى المجدول. بعد ذلك، سيختار المجدول مهمة أخرى للتنفيذ، أو سيبقى محجوزًا في انتظار أحداث خارجية. إذا سارت الأمور على ما يرام، في النهاية سيقوم معالج الأحداث باستدعاء notify
على الشرط، مما يتسبب في أن تصبح المهام التي تنتظر ذلك الشرط قابلة للتنفيذ مرة أخرى.
تم إنشاء مهمة بشكل صريح عن طريق استدعاء Task
والتي لا تكون معروفة في البداية للجدول الزمني. هذا يتيح لك إدارة المهام يدويًا باستخدام yieldto
إذا كنت ترغب في ذلك. ومع ذلك، عندما تنتظر مثل هذه المهمة حدثًا، فإنها لا تزال تعاد تشغيلها تلقائيًا عندما يحدث الحدث، كما كنت تتوقع.