Tasks
Core.Task
— TypeTask(func)
أنشئ Task
(أي coroutine) لتنفيذ الدالة المعطاة func
(التي يجب أن تكون قابلة للاستدعاء بدون وسائط). تنتهي المهمة عندما تعود هذه الدالة. ستعمل المهمة في "عمر العالم" من الوالد عند الإنشاء عندما يتم schedule
d.
!!! تحذير بشكل افتراضي، ستحتوي المهام على البت اللاصق مضبوطًا على true t.sticky
. هذا يمثل الافتراضي التاريخي لـ @async
. يمكن تشغيل المهام اللاصقة فقط على خيط العمل الذي تم جدولتها فيه أولاً، وعند جدولتها ستجعل المهمة التي تم جدولتها منها لاصقة. للحصول على سلوك Threads.@spawn
اضبط البت اللاصق يدويًا على false
.
أمثلة
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
في هذا المثال، b
هي Task
قابلة للتشغيل لم تبدأ بعد. ```
Base.@task
— Macro@task
قم بتغليف تعبير في Task
دون تنفيذه، وأعد Task
. هذا ينشئ مهمة فقط، ولا يقوم بتشغيلها.
!!! تحذير بشكل افتراضي، ستحتوي المهام على بت لاصق مضبوط على true t.sticky
. هذا يمثل الافتراضي التاريخي لـ @async
. يمكن تشغيل المهام اللاصقة فقط على خيط العامل الذي تم جدولتها فيه أولاً، وعند جدولتها ستجعل المهمة التي تم جدولتها منها لاصقة. للحصول على سلوك Threads.@spawn
اضبط بت اللصق يدويًا على false
.
أمثلة
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
قم بتغليف تعبير في Task
وأضفه إلى قائمة جدولة الآلة المحلية.
يمكن إدخال القيم في @async
عبر $
، الذي ينسخ القيمة مباشرة إلى الإغلاق الأساسي المُنشأ. هذا يسمح لك بإدراج قيمة متغير، مما يعزل الكود غير المتزامن عن التغييرات في قيمة المتغير في المهمة الحالية.
يُنصح بشدة بتفضيل Threads.@spawn
على @async
دائمًا حتى عندما لا تكون هناك حاجة للتوازي، خاصة في المكتبات الموزعة علنًا. وذلك لأن استخدام @async
يعطل هجرة المهمة الأب عبر خيوط العمل في التنفيذ الحالي لجوليا. وبالتالي، يمكن أن يكون للاستخدام الظاهر البريء لـ @async
في دالة مكتبة تأثير كبير على أداء أجزاء مختلفة جدًا من تطبيقات المستخدمين.
إدخال القيم عبر $
متاح اعتبارًا من جوليا 1.4.
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
يستخدم مهام متزامنة متعددة لتطبيق f
على مجموعة (أو مجموعات متعددة ذات طول متساوي). بالنسبة لعدة معلمات مجموعة، يتم تطبيق f
عنصرًا بعنصر.
يحدد ntasks
عدد المهام التي سيتم تشغيلها بشكل متزامن. اعتمادًا على طول المجموعات، إذا لم يتم تحديد ntasks
، سيتم استخدام ما يصل إلى 100 مهمة للتطبيق المتزامن.
يمكن أيضًا تحديد ntasks
كدالة بدون معلمات. في هذه الحالة، يتم التحقق من عدد المهام التي سيتم تشغيلها بالتوازي قبل معالجة كل عنصر وتبدأ مهمة جديدة إذا كانت قيمة ntasks_func
أكبر من العدد الحالي للمهام.
إذا تم تحديد batch_size
، تتم معالجة المجموعة في وضع الدفعات. يجب أن تكون f
بعد ذلك دالة تقبل Vector
من مجموعات المعلمات ويجب أن تعيد مصفوفة من النتائج. سيكون طول المصفوفة المدخلة batch_size
أو أقل.
تسلط الأمثلة التالية الضوء على التنفيذ في مهام مختلفة من خلال إرجاع objectid
للمهام التي يتم فيها تنفيذ دالة التطبيق.
أولاً، مع عدم تحديد ntasks
، تتم معالجة كل عنصر في مهمة مختلفة.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
مع ntasks=2
، تتم معالجة جميع العناصر في مهمتين.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
مع تعريف batch_size
، تحتاج دالة التطبيق إلى التغيير لقبول مصفوفة من مجموعات المعلمات وإرجاع مصفوفة من النتائج. يتم استخدام map
في دالة التطبيق المعدلة لتحقيق ذلك.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
مثل asyncmap
، ولكن يخزن المخرجات في results
بدلاً من إرجاع مجموعة.
!!! تحذير قد يكون السلوك غير متوقع عندما يتشارك أي وسيط معدل الذاكرة مع أي وسيط آخر.
Base.current_task
— Functioncurrent_task()
احصل على Task
الجاري.
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
تحديد ما إذا كانت المهمة قد انتهت.
أمثلة
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
حدد ما إذا كانت المهمة قد بدأت في التنفيذ.
أمثلة
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
حدد ما إذا كانت المهمة قد انتهت بسبب حدوث استثناء.
أمثلة
julia> a4() = error("task failed");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
تتطلب هذه الوظيفة على الأقل جوليا 1.3.
Base.task_local_storage
— Methodtask_local_storage(key)
ابحث عن قيمة مفتاح في تخزين المهام المحلي للمهمة الحالية.
Base.task_local_storage
— Methodtask_local_storage(key, value)
تعيين قيمة لمفتاح في تخزين المهام المحلي للمهمة الحالية.
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
استدعاء الدالة body
مع تخزين محلي للمهام معدّل، حيث يتم تعيين value
إلى key
؛ يتم استعادة القيمة السابقة لـ key
، أو عدم وجودها، بعد ذلك. مفيد لمحاكاة النطاق الديناميكي.
Scheduling
Base.yield
— Functionyield()
التبديل إلى الجدول الزمني للسماح لمهمة مجدولة أخرى بالتشغيل. المهمة التي تستدعي هذه الوظيفة لا تزال قابلة للتنفيذ، وسيتم إعادة تشغيلها على الفور إذا لم تكن هناك مهام قابلة للتنفيذ أخرى.
yield(t::Task, arg = nothing)
نسخة سريعة وغير عادلة من schedule(t, arg); yield()
التي تعطي الأولوية لـ t
قبل استدعاء المجدول.
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
انتقل إلى المهمة المعطاة. في المرة الأولى التي يتم فيها الانتقال إلى مهمة، يتم استدعاء دالة المهمة بدون أي وسائط. في الانتقالات اللاحقة، يتم إرجاع arg
من آخر استدعاء لـ yieldto
للمهمة. هذه مكالمة منخفضة المستوى تقوم فقط بتبديل المهام، دون النظر إلى الحالات أو الجدولة بأي شكل من الأشكال. يُنصح بتجنب استخدامها.
Base.sleep
— Functionsleep(seconds)
قم بحظر المهمة الحالية لعدد محدد من الثواني. الحد الأدنى لوقت النوم هو 1 مللي ثانية أو إدخال 0.001
.
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
أضف Task
إلى قائمة الانتظار الخاصة بالجدولة. هذا يتسبب في تشغيل المهمة باستمرار عندما يكون النظام غير مشغول، ما لم تقم المهمة بإجراء عملية حظر مثل wait
.
إذا تم توفير وسيط ثانٍ val
، فسيتم تمريره إلى المهمة (عبر قيمة الإرجاع لـ yieldto
) عندما تعمل مرة أخرى. إذا كان error
هو true
، فسيتم رفع القيمة كاستثناء في المهمة المستيقظة.
من غير الصحيح استخدام schedule
على Task
عشوائي تم بدءه بالفعل. راجع مرجع واجهة برمجة التطبيقات لمزيد من المعلومات.
بشكل افتراضي، ستحتوي المهام على بت لاصق مضبوط على true
t.sticky
. هذا يمثل الافتراضي التاريخي لـ @async
. يمكن تشغيل المهام اللاصقة فقط على خيط العمل الذي تم جدولة المهمة عليه أولاً، وعند الجدولة ستجعل المهمة التي تم جدولة منها لاصقة. للحصول على سلوك Threads.@spawn
قم بضبط بت اللصق يدويًا على false
.
أمثلة
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
اطبع سجل الأخطاء إلى stderr
إذا فشل المهمة t
.
أمثلة
julia> Base._wait(errormonitor(Threads.@spawn error("task failed")))
Unhandled Task ERROR: task failed
Stacktrace:
[...]
Base.@sync
— Macro@sync
انتظر حتى تكتمل جميع الاستخدامات المغلقة lexically لـ @async
، @spawn
، Distributed.@spawnat
و Distributed.@distributed
. يتم جمع جميع الاستثناءات التي تم طرحها بواسطة العمليات غير المتزامنة المغلقة ورميها كـ CompositeException
.
أمثلة
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— Functionملاحظة خاصة لـ Threads.Condition
:
يجب على المتصل أن يكون ممسكًا بـ lock
الذي يمتلك Threads.Condition
قبل استدعاء هذه الطريقة. سيتم حظر المهمة المستدعية حتى يقوم بعض المهام الأخرى بإيقاظها، عادةً عن طريق استدعاء notify
على نفس كائن Threads.Condition
. سيتم تحرير القفل بشكل ذري عند الحظر (حتى لو كان مقفلاً بشكل متكرر)، وسيتم استعادته قبل العودة.
انتظر(r::Future)
انتظر حتى تصبح القيمة متاحة لـ Future
.
انتظر(r::RemoteChannel، args...)
انتظر حتى تصبح القيمة متاحة على RemoteChannel
المحدد.
wait([x])
قم بحظر المهمة الحالية حتى يحدث حدث ما، اعتمادًا على نوع الوسيطة:
Channel
: انتظر حتى يتم إضافة قيمة إلى القناة.Condition
: انتظر لـnotify
على شرط وارجع معلمةval
الممررة إلىnotify
. الانتظار على شرط يسمح أيضًا بتمريرfirst=true
مما يؤدي إلى وضع المنتظر أولاً في الصف للاستيقاظ علىnotify
بدلاً من السلوك المعتاد الأول في الأول.Process
: انتظر حتى تخرج عملية أو سلسلة عمليات. يمكن استخدام حقلexitcode
لعملية لتحديد النجاح أو الفشل.Task
: انتظر حتى تنتهيTask
. إذا فشلت المهمة مع استثناء، يتم رميTaskFailedException
(الذي يلف المهمة الفاشلة).RawFD
: انتظر للتغييرات على موصّل ملف (انظر حزمةFileWatching
).
إذا لم يتم تمرير أي وسيطة، فإن المهمة تحظر لفترة غير محددة. يمكن إعادة تشغيل المهمة فقط من خلال استدعاء صريح لـ schedule
أو yieldto
.
غالبًا ما يتم استدعاء wait
داخل حلقة while
لضمان تلبية شرط الانتظار قبل المتابعة.
انتظر(c::Channel)
يمنع حتى يصبح Channel
isready
.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # المهمة محجوزة لأن القناة ليست جاهزة
false
julia> put!(c, 1);
julia> istaskdone(task) # المهمة الآن غير محجوزة
true
Base.fetch
— Methodfetch(t::Task)
انتظر حتى تنتهي Task
ثم أعد قيمة النتيجة الخاصة بها. إذا فشلت المهمة مع استثناء، يتم رمي TaskFailedException
(الذي يلف المهمة الفاشلة).
Base.fetch
— Methodfetch(x::Any)
إرجاع x
.
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
انتظر حتى تعيد testcb()
القيمة true
أو تمر ثواني timeout
، أيهما يحدث أولاً. يتم استدعاء دالة الاختبار كل pollint
ثواني. القيمة الدنيا لـ pollint
هي 0.001 ثواني، أي 1 مللي ثانية.
ارجع :ok
أو :timed_out
.
أمثلة
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
أنشئ مصدر حدث مُفعّل بواسطة الحافة يمكن أن تنتظر المهام من أجله. المهام التي تستدعي wait
على Condition
يتم تعليقها وتسجيلها في قائمة الانتظار. يتم إيقاظ المهام عندما يتم استدعاء notify
لاحقًا على Condition
. يمكن أن تعيد الانتظار على شرط قيمة أو تثير خطأ إذا تم استخدام الوسائط الاختيارية لـ notify
. يعني التفعيل بواسطة الحافة أن المهام التي تنتظر في الوقت الذي يتم فيه استدعاء notify
فقط يمكن إيقاظها. بالنسبة للإشعارات المُفعّلة بواسطة المستوى، يجب عليك الاحتفاظ بحالة إضافية لتتبع ما إذا كان قد حدث إشعار. تقوم أنواع Channel
و Threads.Event
بذلك، ويمكن استخدامها للأحداث المُفعّلة بواسطة المستوى.
هذا الكائن ليس آمنًا للخيوط. انظر Threads.Condition
للحصول على نسخة آمنة للخيوط.
Base.Threads.Condition
— TypeThreads.Condition([lock])
نسخة آمنة من Base.Condition
.
لإجراء مكالمة إلى wait
أو notify
على Threads.Condition
، يجب أولاً إجراء مكالمة إلى lock
عليها. عند استدعاء wait
، يتم تحرير القفل بشكل ذري أثناء الحظر، وسيتم استعادته قبل أن تعود wait
. لذلك، يبدو الاستخدام العادي لـ Threads.Condition
c
كما يلي:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
تتطلب هذه الوظيفة على الأقل Julia 1.2.
Base.Event
— TypeEvent([autoreset=false])
إنشاء مصدر حدث مُفعّل بمستوى. المهام التي تستدعي wait
على Event
يتم تعليقها وتخزينها في قائمة انتظار حتى يتم استدعاء notify
على Event
. بعد استدعاء notify
، يبقى Event
في حالة مُفعّلة ولن تعيق المهام الانتظار له، حتى يتم استدعاء reset
.
إذا كانت autoreset
صحيحة، فسيتم إطلاق مهمة واحدة فقط من wait
لكل استدعاء لـ notify
.
هذا يوفر ترتيب ذاكرة اكتساب وإفراج على notify/wait.
تتطلب هذه الوظيفة على الأقل Julia 1.1.
تتطلب وظيفة autoreset
وضمان ترتيب الذاكرة على الأقل Julia 1.8.
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
أيقظ المهام التي تنتظر شرطًا، مع تمرير val
لها. إذا كان all
هو true
(القيمة الافتراضية)، يتم إيقاظ جميع المهام المنتظرة، وإلا يتم إيقاظ واحدة فقط. إذا كان error
هو true
، يتم رفع القيمة الممررة كاستثناء في المهام المستيقظة.
أعد عدد المهام التي تم إيقاظها. أعد 0 إذا لم تكن هناك مهام تنتظر على condition
.
Base.reset
— Methodreset(::Event)
إعادة تعيين Event
إلى حالة غير محددة. بعد ذلك، ستقوم أي استدعاءات مستقبلية لـ wait
بحظر التنفيذ حتى يتم استدعاء notify
مرة أخرى.
Base.Semaphore
— TypeSemaphore(sem_size)
إنشاء Semaphore عددي يسمح بحد أقصى sem_size
من عمليات الاستحواذ التي يمكن أن تكون قيد الاستخدام في أي وقت. يجب أن يتطابق كل استحواذ مع عملية إفراج.
يوفر هذا ترتيب ذاكرة للاستحواذ والإفراج على استدعاءات الاستحواذ/الإفراج.
Base.acquire
— Functionacquire(s::Semaphore)
انتظر حتى يتوفر أحد الأذونات sem_size
، مع حظر حتى يمكن الحصول على واحد.
acquire(f, s::Semaphore)
نفذ f
بعد الحصول على Semaphore s
، وrelease
عند الانتهاء أو حدوث خطأ.
على سبيل المثال، شكل كتلة do يضمن أن يكون هناك فقط 2 استدعاءات لـ foo
نشطة في نفس الوقت:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
هذه الطريقة تتطلب على الأقل Julia 1.8.
Base.release
— Functionrelease(s::Semaphore)
إرجاع تصريح واحد إلى المجموعة، مما قد يسمح لمهمة أخرى بالحصول عليه واستئناف التنفيذ.
Base.AbstractLock
— TypeBase.lock
— Functionlock(lock)
احصل على lock
عندما يصبح متاحًا. إذا كان القفل مؤمنًا بالفعل بواسطة مهمة/خيط مختلف، انتظر حتى يصبح متاحًا.
يجب أن يتطابق كل lock
مع unlock
.
lock(f::Function, lock)
احصل على lock
، نفذ f
مع الاحتفاظ بـ lock
، وأطلق lock
عندما تعود f
. إذا كان القفل مؤمنًا بالفعل بواسطة مهمة/خيط مختلف، انتظر حتى يصبح متاحًا.
عندما تعود هذه الدالة، يكون lock
قد تم إطلاقه، لذا يجب على المتصل عدم محاولة unlock
له.
انظر أيضًا: @lock
.
استخدام Channel
كوسيط ثانٍ يتطلب Julia 1.7 أو أحدث.
اقفل(f::Function, l::Lockable)
احصل على القفل المرتبط بـ l
، نفذ f
مع القفل الممسوك، وأطلق القفل عندما تعود f
. ستتلقى f
حجة موضعية واحدة: القيمة المغلفة بواسطة l
. إذا كان القفل مقفلاً بالفعل بواسطة مهمة/خيط مختلف، انتظر حتى يصبح متاحًا. عندما تعود هذه الدالة، سيتم إطلاق lock
، لذا يجب على المتصل عدم محاولة unlock
له.
يتطلب على الأقل Julia 1.11.
Base.unlock
— Functionunlock(lock)
يحرر ملكية lock
.
إذا كان هذا قفلًا تكراريًا تم الحصول عليه مسبقًا، قم بتقليل العداد الداخلي وارجع على الفور.
Base.trylock
— Functiontrylock(lock) -> Success (Boolean)
احصل على القفل إذا كان متاحًا، وارجع true
إذا كان ناجحًا. إذا كان القفل مؤمنًا بالفعل بواسطة مهمة/خيط مختلف، ارجع false
.
يجب أن يتطابق كل trylock
ناجح مع unlock
.
يمكن استخدام دالة trylock
مع islocked
لكتابة خوارزميات الاختبار والاختبار والتعيين أو خوارزميات التراجع الأسي إذا كانت مدعومة بواسطة typeof(lock)
(اقرأ الوثائق الخاصة به).
Base.islocked
— Functionislocked(lock) -> Status (Boolean)
تحقق مما إذا كان lock
محجوزًا بواسطة أي مهمة/خيط. لا ينبغي استخدام هذه الوظيفة بمفردها للتزامن. ومع ذلك، يمكن استخدام islocked
مع trylock
لكتابة خوارزميات الاختبار والاختبار والتعيين أو التراجع الأسي إذا كان مدعومًا بواسطة typeof(lock)
(اقرأ الوثائق الخاصة به).
مساعدة موسعة
على سبيل المثال، يمكن تنفيذ التراجع الأسي كما يلي إذا كانت تنفيذية lock
تلبي الخصائص الموثقة أدناه.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
التنفيذ
يوصى بأن تعرف تنفيذية القفل islocked
بالخصائص التالية وأن تلاحظ ذلك في وثائقها.
islocked(lock)
خالية من سباقات البيانات.- إذا كانت
islocked(lock)
تعيدfalse
، يجب أن تنجح الاستدعاء الفوري لـtrylock(lock)
(تعيدtrue
) إذا لم يكن هناك تدخل من مهام أخرى.
Base.ReentrantLock
— TypeReentrantLock()
ينشئ قفلًا قابلًا لإعادة الدخول لمزامنة Task
s. يمكن لنفس المهمة الحصول على القفل عدة مرات حسب الحاجة (هذا هو ما يعنيه الجزء "القابل لإعادة الدخول" من الاسم). يجب أن يتطابق كل lock
مع unlock
.
ستمنع استدعاء lock
أيضًا تشغيل المُنهيات على تلك الخيط حتى يتم استدعاء unlock
المقابل. يجب أن يتم دعم استخدام نمط القفل القياسي الموضح أدناه بشكل طبيعي، ولكن احذر من عكس ترتيب المحاولة/القفل أو فقدان كتلة المحاولة تمامًا (على سبيل المثال، محاولة العودة مع الاحتفاظ بالقفل):
هذا يوفر ترتيب ذاكرة اكتساب/إفراج على استدعاءات القفل/الإفراج.
lock(l)
try
<عمل ذري>
finally
unlock(l)
end
إذا كانت !islocked(lck::ReentrantLock)
صحيحة، فإن trylock(lck)
ينجح ما لم تكن هناك مهام أخرى تحاول الاحتفاظ بالقفل "في نفس الوقت."
Base.@lock
— Macro@lock l expr
نسخة الماكرو من lock(f, l::AbstractLock)
ولكن مع expr
بدلاً من دالة f
. يتوسع إلى:
lock(l)
try
expr
finally
unlock(l)
end
هذا مشابه لاستخدام lock
مع كتلة do
، ولكنه يتجنب إنشاء إغلاق وبالتالي يمكن أن يحسن الأداء.
@lock
أضيف في جوليا 1.3، وتم تصديره في جوليا 1.10.
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
ينشئ كائن Lockable
الذي يلتف حول value
ويرتبط بالقفل المقدم lock
. يدعم هذا الكائن @lock
، lock
، trylock
، unlock
. للوصول إلى القيمة، قم بفهرسة كائن القفل أثناء حمل القفل.
يتطلب على الأقل Julia 1.11.
مثال
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # يجب أن تحمل القفل للوصول إلى القيمة
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
تمثيل قناة تمرر كائنات من النوع T
.
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
يبني Channel
مع مخزن داخلي يمكنه استيعاب حد أقصى من size
كائنات من النوع T
. put!
يستدعي على قناة ممتلئة ويعلق حتى يتم إزالة كائن باستخدام take!
.
Channel(0)
يبني قناة غير مخزنة. put!
يعلق حتى يتم استدعاء take!
المطابق. والعكس صحيح.
بناة أخرى:
Channel()
: الباني الافتراضي، يعادلChannel{Any}(0)
Channel(Inf)
: يعادلChannel{Any}(typemax(Int))
Channel(sz)
: يعادلChannel{Any}(sz)
تم إضافة الباني الافتراضي Channel()
و size=0
الافتراضي في Julia 1.3.
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
أنشئ مهمة جديدة من func
، اربطها بقناة جديدة من النوع T
وحجم size
، وجدول المهمة، كل ذلك في استدعاء واحد. يتم إغلاق القناة تلقائيًا عند انتهاء المهمة.
يجب أن يقبل func
القناة المرتبطة كحجته الوحيدة.
إذا كنت بحاجة إلى مرجع للمهمة التي تم إنشاؤها، مرر كائن Ref{Task}
عبر وسيط taskref
.
إذا كان spawn=true
، فقد يتم جدولة Task
التي تم إنشاؤها لـ func
على خيط آخر بالتوازي، وهو ما يعادل إنشاء مهمة عبر Threads.@spawn
.
إذا كان spawn=true
ولم يتم تعيين وسيط threadpool
، فإنه يت default إلى :default
.
إذا تم تعيين وسيط threadpool
(إلى :default
أو :interactive
)، فهذا يعني أن spawn=true
وأن المهمة الجديدة تم إنشاؤها في مجموعة الخيوط المحددة.
أعد قناة Channel
.
أمثلة
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
الإشارة إلى المهمة التي تم إنشاؤها:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
تم إضافة وسيط spawn=
في Julia 1.3. تم إضافة هذا المنشئ في Julia 1.3. في الإصدارات السابقة من Julia، استخدمت قناة الوسائط الرئيسية لتعيين size
و T
، ولكن تلك المنشئات لم تعد مدعومة.
تم إضافة وسيط threadpool=
في Julia 1.9.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
أضف عنصرًا v
إلى القناة c
. يتم حظره إذا كانت القناة ممتلئة.
بالنسبة للقنوات غير المعبأة، يتم الحظر حتى يتم تنفيذ take!
بواسطة مهمة مختلفة.
يتم الآن تحويل v
إلى نوع القناة باستخدام convert
عند استدعاء put!
.
Base.take!
— Methodtake!(c::Channel)
يُزيل ويُرجع قيمة من Channel
بالترتيب. يُعيق حتى تتوفر البيانات. بالنسبة للقنوات غير المعبأة، يُعيق حتى يتم تنفيذ put!
بواسطة مهمة مختلفة.
أمثلة
قناة مُعبأة:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
قناة غير مُعبأة:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
يحدد ما إذا كان هناك قيمة مخزنة في Channel
. يعود على الفور، لا يحجب.
بالنسبة للقنوات غير المعبأة، يعود true
إذا كانت هناك مهام تنتظر على put!
.
أمثلة
قناة معبأة:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
قناة غير معبأة:
julia> c = Channel();
julia> isready(c) # لا توجد مهام تنتظر لوضع!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # جدولة مهمة put!
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
ينتظر ويعيد (دون إزالة) أول عنصر متاح من Channel
. ملاحظة: fetch
غير مدعوم على Channel
غير المخزن (0-size).
أمثلة
قناة مخزنة:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # العنصر لم يُزال
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
إغلاق قناة. يتم رمي استثناء (يتم إعطاؤه اختياريًا بواسطة excp
) بواسطة:
Base.bind
— Methodbind(chnl::Channel, task::Task)
ربط عمر chnl
بمهمة. يتم إغلاق Channel
chnl
تلقائيًا عند انتهاء المهمة. يتم تمرير أي استثناء غير معالج في المهمة إلى جميع المنتظرين على chnl
.
يمكن إغلاق كائن chnl
بشكل صريح بغض النظر عن إنهاء المهمة. لا تؤثر المهام المنتهية على كائنات Channel
المغلقة بالفعل.
عندما يتم ربط قناة بعدة مهام، ستغلق المهمة الأولى التي تنتهي القناة. عندما يتم ربط عدة قنوات بنفس المهمة، فإن إنهاء المهمة سيغلق جميع القنوات المرتبطة.
أمثلة
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
الاستخدام الصحيح الأسهل لـ schedule
هو على Task
لم يبدأ (مجدول) بعد. ومع ذلك، من الممكن استخدام 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
و wait
ككتلة بناء منخفضة المستوى جدًا لبناء واجهات التزامن. شرط مسبق حاسم لاستدعاء schedule(task)
هو أن المتصل يجب أن "يمتلك" task
؛ أي، يجب أن يعرف أن الاستدعاء لـ wait
في task
المعطاة يحدث في المواقع المعروفة للكود الذي يستدعي schedule(task)
. واحدة من الاستراتيجيات لضمان مثل هذا الشرط المسبق هي استخدام الذرات، كما هو موضح في المثال التالي:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
يسمح لمهمة واحدة بـ الانتظار
لإشعار مهمة أخرى. إنها واجهة تواصل محدودة حيث يمكن استخدام الانتظار
مرة واحدة فقط من مهمة واحدة (لاحظ التعيين غير الذري لـ ev.task
)
في هذا المثال، يُسمح لـ notify(ev::OneWayEvent)
باستدعاء schedule(ev.task)
إذا وفقط إذا كانت تعدل الحالة من OWE_WAITING
إلى OWE_NOTIFYING
. هذا يُعلمنا أن المهمة التي تنفذ wait(ev::OneWayEvent)
أصبحت الآن في الفرع ok
وأنه لا يمكن أن تكون هناك مهام أخرى تحاول schedule(ev.task)
لأن @atomicreplace(ev.state, state => OWE_NOTIFYING)
ستفشل.