Asynchronous Programming

عندما يحتاج البرنامج إلى التفاعل مع العالم الخارجي، على سبيل المثال التواصل مع آلة أخرى عبر الإنترنت، قد تحتاج العمليات في البرنامج إلى الحدوث بترتيب غير متوقع. لنفترض أن برنامجك يحتاج إلى تنزيل ملف. نود أن نبدأ عملية التنزيل، ونقوم بأداء عمليات أخرى أثناء انتظارنا لإكمالها، ثم نعود إلى الكود الذي يحتاج إلى الملف الذي تم تنزيله عندما يصبح متاحًا. هذا النوع من السيناريو يقع في نطاق البرمجة غير المتزامنة، والتي تُعرف أحيانًا أيضًا بالبرمجة المتزامنة (لأنه، من الناحية المفاهيمية، تحدث أشياء متعددة في وقت واحد).

لمعالجة هذه السيناريوهات، توفر جوليا Task (المعروفة أيضًا بعدة أسماء أخرى، مثل التعاونية المتناظرة، والخيوط الخفيفة، والمهام المتعددة التعاونية، أو الاستمراريات ذات الاستخدام الواحد). عندما يتم تعيين قطعة من العمل الحاسوبي (في الممارسة العملية، تنفيذ دالة معينة) كـ 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566، يصبح من الممكن مقاطعتها عن طريق التبديل إلى 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566 آخر. يمكن استئناف 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566 الأصلي لاحقًا، في هذه النقطة ستستأنف من حيث توقفت. في البداية، قد يبدو هذا مشابهًا لاستدعاء دالة. ومع ذلك، هناك اختلافان رئيسيان. أولاً، لا يستخدم تبديل المهام أي مساحة، لذا يمكن أن تحدث أي عدد من تبديلات المهام دون استهلاك مكدس الاستدعاء. ثانيًا، يمكن أن يحدث التبديل بين المهام بأي ترتيب، على عكس استدعاءات الدوال، حيث يجب أن تنتهي الدالة المستدعاة من التنفيذ قبل أن يعود التحكم إلى الدالة المستدعية.

Basic Task operations

يمكنك التفكير في Task كمعالج لوحدة من العمل الحاسوبي الذي سيتم تنفيذه. لديها دورة حياة تتكون من إنشاء-بدء-تشغيل-إنهاء. يتم إنشاء المهام عن طريق استدعاء مُنشئ Task على دالة بدون وسائط لتشغيلها، أو باستخدام الماكرو @task:

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

@task x يعادل Task(()->x).

ستنتظر هذه المهمة خمس ثوانٍ، ثم ستطبع done. ومع ذلك، لم تبدأ في التنفيذ بعد. يمكننا تشغيلها متى كنا مستعدين عن طريق استدعاء schedule:

julia> schedule(t);

إذا جربت هذا في REPL، سترى أن schedule يعود على الفور. ذلك لأنه ببساطة يضيف t إلى قائمة داخلية من المهام التي يجب تنفيذها. ثم، سيقوم REPL بطباعة الموجه التالي وينتظر المزيد من الإدخال. الانتظار لإدخال من لوحة المفاتيح يوفر فرصة لتشغيل مهام أخرى، لذا في تلك اللحظة ستبدأ t. تستدعي t sleep، الذي يضبط مؤقتًا ويوقف التنفيذ. إذا كانت هناك مهام أخرى قد تم جدولتها، يمكن أن تعمل حينها. بعد خمس ثوانٍ، ينطلق المؤقت ويعيد تشغيل t، وسترى done مطبوعة. ثم تنتهي t.

The wait function blocks the calling task until some other task finishes. So for example if you type

julia> schedule(t); wait(t)

بدلاً من استدعاء schedule فقط، سترى توقفًا لمدة خمس ثوانٍ قبل ظهور موجه الإدخال التالي. وذلك لأن REPL ينتظر انتهاء t قبل المتابعة.

من الشائع الرغبة في إنشاء مهمة وجدولتها على الفور، لذا فإن الماكرو @async متوفر لهذا الغرض –- @async x يعادل schedule(@task x).

Communicating with Channels

في بعض المشاكل، لا ترتبط الأجزاء المختلفة من العمل المطلوب بشكل طبيعي من خلال استدعاءات الوظائف؛ لا يوجد "مستدعي" أو "مستدعى" واضح بين المهام التي يجب إنجازها. مثال على ذلك هو مشكلة المنتج-المستهلك، حيث تقوم إجراء معقد واحد بتوليد القيم وإجراء معقد آخر باستهلاكها. لا يمكن للمستهلك ببساطة استدعاء دالة المنتج للحصول على قيمة، لأن المنتج قد يكون لديه المزيد من القيم لتوليدها وبالتالي قد لا يكون جاهزًا بعد للعودة. مع المهام، يمكن للمنتج والمستهلك العمل معًا طالما يحتاجان إلى ذلك، مع تمرير القيم ذهابًا وإيابًا حسب الحاجة.

جوليا توفر آلية Channel لحل هذه المشكلة. 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 هو طابور قابل للانتظار من نوع الأول في الدخول والأول في الخروج يمكن أن يكون لديه مهام متعددة تقرأ منه وتكتب إليه.

لنحدد مهمة المنتج، التي تنتج القيم عبر استدعاء put!. لاستهلاك القيم، نحتاج إلى جدولة المنتج ليعمل في مهمة جديدة. يمكن استخدام مُنشئ خاص Channel الذي يقبل دالة ذات وسيط واحد كوسيط لتشغيل مهمة مرتبطة بقناة. يمكننا بعد ذلك take! القيم بشكل متكرر من كائن القناة:

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

طريقة واحدة للتفكير في هذا السلوك هي أن producer كان قادرًا على العودة عدة مرات. بين استدعاءات put!، يتم تعليق تنفيذ المنتج ويكون للمستهلك السيطرة.

يمكن استخدام Channel ككائن قابل للتكرار في حلقة for، وفي هذه الحالة تأخذ متغير الحلقة جميع القيم الناتجة. يتم إنهاء الحلقة عندما يتم إغلاق القناة.

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

لاحظ أننا لم نضطر إلى إغلاق القناة بشكل صريح في المنتج. وذلك لأن فعل ربط Channel بـ Task يربط عمر القناة المفتوحة بعمر المهمة المرتبطة. يتم إغلاق كائن القناة تلقائيًا عند إنهاء المهمة. يمكن ربط قنوات متعددة بمهمة، والعكس صحيح.

بينما يتوقع المُنشئ Task دالة بدون وسائط، فإن الطريقة Channel التي تنشئ قناة مرتبطة بالمهمة تتوقع دالة تقبل وسيطًا واحدًا من نوع 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566. نمط شائع هو أن يكون المنتج مُعَلمًا، وفي هذه الحالة تكون هناك حاجة لتطبيق دالة جزئية لإنشاء دالة بوسيطين أو وسائط واحدة anonymous function.

بالنسبة لكائنات Task يمكن القيام بذلك إما مباشرة أو باستخدام ماكرو ملائم:

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)

لإدارة أنماط توزيع العمل الأكثر تقدمًا، يمكن استخدام bind و schedule بالتزامن مع Task و Channel لإنشاء رابط صريح بين مجموعة من القنوات ومجموعة من مهام المنتج/المستهلك.

More on Channels

يمكن تصور القناة على أنها أنبوب، أي أن لها طرف كتابة وطرف قراءة:

  • يمكن لكتّاب متعددين في مهام مختلفة الكتابة إلى نفس القناة في وقت واحد عبر put! استدعاءات.

  • يمكن لعدة قراء في مهام مختلفة قراءة البيانات بشكل متزامن عبر take! استدعاءات.

  • كمثال:

    # Given Channels c1 and c2,
    c1 = Channel(32)
    c2 = Channel(32)
    
    # and a function `foo` which reads items from c1, processes the item read
    # and writes a result to c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # process data
            put!(c2, result)    # write out result
        end
    end
    
    # we can schedule `n` instances of `foo` to be active concurrently.
    for _ in 1:n
        errormonitor(@async foo())
    end
  • تُنشأ القنوات عبر المُنشئ Channel{T}(sz). ستحتفظ القناة فقط بالكائنات من النوع T. إذا لم يتم تحديد النوع، يمكن للقناة أن تحتفظ بكائنات من أي نوع. يُشير sz إلى الحد الأقصى لعدد العناصر التي يمكن الاحتفاظ بها في القناة في أي وقت. على سبيل المثال، Channel(32) ينشئ قناة يمكن أن تحتفظ بحد أقصى قدره 32 كائنًا من أي نوع. يمكن لـ Channel{MyType}(64) الاحتفاظ بما يصل إلى 64 كائنًا من MyType في أي وقت.

  • إذا كان Channel فارغًا، فإن القراء (على استدعاء take!) سيحجبون حتى تتوفر البيانات.

  • إذا كانت Channel ممتلئة، فإن الكتاب (على اتصال put!) سيحجبون حتى تصبح المساحة متاحة.

  • isready تختبر وجود أي كائن في القناة، بينما wait تنتظر كائنًا ليصبح متاحًا.

  • Channel في حالة مفتوحة في البداية. هذا يعني أنه يمكن القراءة منه والكتابة إليه بحرية عبر take! و put! استدعاءات. close يغلق 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566. على 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 مغلقة، 4d61726b646f776e2e436f64652822222c2022707574212229_40726566 سيفشل. على سبيل المثال:

    julia> c = Channel(2);
    
    julia> put!(c, 1) # `put!` on an open channel succeeds
    1
    
    julia> close(c);
    
    julia> put!(c, 2) # `put!` on a closed channel throws an exception.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]
  • take! و fetch (الذي يسترجع ولكن لا يزيل القيمة) على قناة مغلقة يعود بنجاح بأي قيم موجودة حتى يتم إفراغها. متابعة المثال أعلاه:

    julia> fetch(c) # Any number of `fetch` calls succeed.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # The first `take!` removes the value.
    1
    
    julia> take!(c) # No more data available on a closed channel.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]

اعتبر مثالًا بسيطًا يستخدم القنوات للتواصل بين المهام. نبدأ 4 مهام لمعالجة البيانات من قناة jobs واحدة. يتم كتابة الوظائف، التي يتم التعرف عليها بواسطة معرف (job_id)، إلى القناة. كل مهمة في هذه المحاكاة تقرأ job_id، تنتظر لفترة زمنية عشوائية وتكتب مرة أخرى مجموعة من job_id والوقت المحاكى إلى قناة النتائج. أخيرًا، يتم طباعة جميع results.

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
           errormonitor(@async do_work())
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

بدلاً من errormonitor(t)، قد تكون الحل الأكثر قوة هو استخدام bind(results, t)، حيث أنه لن يقوم فقط بتسجيل أي فشل غير متوقع، بل سيجبر أيضًا الموارد المرتبطة على الإغلاق ونشر الاستثناء في كل مكان.

More task operations

تُبنى عمليات المهام على بدائية منخفضة المستوى تُسمى yieldto. تقوم yieldto(task, value) بتعليق المهمة الحالية، والتبديل إلى المهمة المحددة، وتسبب في أن تعود آخر مكالمة لـ 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 لتلك المهمة بالقيمة المحددة. لاحظ أن 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 هي العملية الوحيدة المطلوبة لاستخدام تدفق التحكم بأسلوب المهام؛ بدلاً من الاستدعاء والعودة، نحن دائماً نتبدل إلى مهمة مختلفة. لهذا السبب تُسمى هذه الميزة أيضًا "التعاون المتناظر"; يتم التبديل بين كل مهمة باستخدام نفس الآلية.

yieldto قوية، لكن معظم استخدامات المهام لا تستدعيها مباشرة. اعتبر لماذا قد يكون هذا. إذا قمت بالتبديل بعيدًا عن المهمة الحالية، فمن المحتمل أنك سترغب في العودة إليها في مرحلة ما، لكن معرفة متى تعود، ومعرفة أي مهمة تتحمل مسؤولية العودة، يمكن أن يتطلب تنسيقًا كبيرًا. على سبيل المثال، put! و take! هما عمليات حظر، والتي، عند استخدامها في سياق القنوات، تحافظ على الحالة لتتذكر من هم المستهلكون. عدم الحاجة إلى تتبع المهمة المستهلكة يدويًا هو ما يجعل 4d61726b646f776e2e436f64652822222c2022707574212229_40726566 أسهل في الاستخدام من 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 المنخفضة المستوى.

بالإضافة إلى yieldto، هناك بعض الوظائف الأساسية الأخرى اللازمة لاستخدام المهام بفعالية.

  • current_task يحصل على مرجع للمهمة الجارية حاليًا.
  • istaskdone يستفسر عما إذا كانت المهمة قد انتهت.
  • istaskstarted يستفسر عما إذا كانت المهمة قد تم تنفيذها بعد.
  • task_local_storage يتلاعب بمخزن القيم المفتاحية المحدد للمهمة الحالية.

Tasks and events

تحدث معظم تبديلات المهام نتيجة الانتظار للأحداث مثل طلبات الإدخال/الإخراج، ويتم تنفيذها بواسطة جدولة مدمجة في قاعدة جوليا. يحتفظ المجدول بقائمة من المهام القابلة للتنفيذ، وينفذ حلقة أحداث تعيد تشغيل المهام بناءً على الأحداث الخارجية مثل وصول الرسائل.

The basic function for waiting for an event is wait. Several objects implement wait; for example, given a Process object, wait will wait for it to exit. wait is often implicit; for example, a wait can happen inside a call to read to wait for data to be available.

في جميع هذه الحالات، wait تعمل في النهاية على كائن Condition، الذي يتولى مسؤولية تنظيم وإعادة تشغيل المهام. عندما تستدعي مهمة 4d61726b646f776e2e436f64652822222c2022776169742229_40726566 على 4d61726b646f776e2e436f64652822222c2022436f6e646974696f6e2229_40726566، يتم وضع علامة على المهمة بأنها غير قابلة للتنفيذ، وتضاف إلى قائمة الانتظار الخاصة بالشرط، وتتحول إلى المجدول. بعد ذلك، سيختار المجدول مهمة أخرى للتنفيذ، أو سيبقى محجوزًا في انتظار أحداث خارجية. إذا سارت الأمور على ما يرام، في النهاية سيقوم معالج الأحداث باستدعاء notify على الشرط، مما يتسبب في أن تصبح المهام التي تنتظر ذلك الشرط قابلة للتنفيذ مرة أخرى.

تم إنشاء مهمة بشكل صريح عن طريق استدعاء Task والتي لا تكون معروفة في البداية للجدول الزمني. هذا يتيح لك إدارة المهام يدويًا باستخدام yieldto إذا كنت ترغب في ذلك. ومع ذلك، عندما تنتظر مثل هذه المهمة حدثًا، فإنها لا تزال تعاد تشغيلها تلقائيًا عندما يحدث الحدث، كما كنت تتوقع.