Tasks
Core.Task
— TypeЗадача(func)
Создайте Task
(т.е. корутину) для выполнения данной функции func
(которая должна быть вызываемой без аргументов). Задача завершится, когда эта функция вернёт значение. Задача будет выполняться в "возрасте мира" от родителя при создании, когда schedule
вызван.
!!! предупреждение По умолчанию задачи будут иметь установленный бит "липкости" в значение true t.sticky
. Это моделирует исторический стандарт для @async
. Липкие задачи могут выполняться только на рабочем потоке, на котором они были впервые запланированы, и при планировании сделают задачу, из которой они были запланированы, липкой. Чтобы получить поведение Threads.@spawn
, установите бит "липкости" вручную в значение false
.
Примеры
julia> a() = sum(i for i in 1:1000);
julia> b = Task(a);
В этом примере b
является выполняемой Task
, которая ещё не началась.
Base.@task
— Macro@task
Оберните выражение в Task
, не выполняя его, и верните Task
. Это только создает задачу и не запускает ее.
По умолчанию задачи будут иметь установленный бит "липкости" в значение true t.sticky
. Это моделирует исторический стандарт для @async
. Липкие задачи могут выполняться только на рабочем потоке, на котором они были впервые запланированы, и при планировании сделают задачу, из которой они были запланированы, липкой. Чтобы получить поведение Threads.@spawn
, установите бит "липкости" вручную в значение false
.
Примеры
julia> a1() = sum(i for i in 1:1000);
julia> b = @task a1();
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.@async
— Macro@async
Оборачивает выражение в Task
и добавляет его в очередь планировщика локальной машины.
Значения могут быть интерполированы в @async
с помощью $
, который копирует значение непосредственно в создаваемую замыкание. Это позволяет вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.
Настоятельно рекомендуется всегда предпочитать Threads.@spawn
вместо @async
даже когда параллелизм не требуется, особенно в публично распределенных библиотеках. Это связано с тем, что использование @async
отключает миграцию родительской задачи между рабочими потоками в текущей реализации Julia. Таким образом, на первый взгляд безобидное использование @async
в функции библиотеки может значительно повлиять на производительность очень разных частей пользовательских приложений.
Интерполяция значений с помощью $
доступна начиная с Julia 1.4.
Base.asyncmap
— Functionasyncmap(f, c...; ntasks=0, batch_size=nothing)
Использует несколько параллельных задач для применения f
к коллекции (или нескольким коллекциям одинаковой длины). Для нескольких аргументов коллекции f
применяется поэлементно.
ntasks
указывает количество задач, которые будут выполняться параллельно. В зависимости от длины коллекций, если ntasks
не указан, будет использовано до 100 задач для параллельного отображения.
ntasks
также может быть указан как функция без аргументов. В этом случае количество задач, которые будут выполняться параллельно, проверяется перед обработкой каждого элемента, и новая задача запускается, если значение ntasks_func
больше текущего количества задач.
Если указано batch_size
, коллекция обрабатывается в пакетном режиме. f
должна быть функцией, которая принимает Vector
кортежей аргументов и возвращает вектор результатов. Входной вектор будет иметь длину batch_size
или меньше.
Следующие примеры подчеркивают выполнение в разных задачах, возвращая objectid
задач, в которых выполняется функция отображения.
Сначала, когда ntasks
не определен, каждый элемент обрабатывается в другой задаче.
julia> tskoid() = objectid(current_task());
julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
0x6e15e66c75c75853
0x440f8819a1baa682
0x9fb3eeadd0c83985
0xebd3e35fe90d4050
0x29efc93edce2b961
julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5
С ntasks=2
все элементы обрабатываются в 2 задачах.
julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
0xa23d2f80cd7cf157
0x027ab1680df7ae94
julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2
С определенным batch_size
функция отображения должна быть изменена, чтобы принимать массив кортежей аргументов и возвращать массив результатов. map
используется в измененной функции отображения для достижения этого.
julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)
julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
"args_tuple: (1,), element_val: 1, task: 9118321258196414413"
"args_tuple: (2,), element_val: 2, task: 4904288162898683522"
"args_tuple: (3,), element_val: 3, task: 9118321258196414413"
"args_tuple: (4,), element_val: 4, task: 4904288162898683522"
"args_tuple: (5,), element_val: 5, task: 9118321258196414413"
Base.asyncmap!
— Functionasyncmap!(f, results, c...; ntasks=0, batch_size=nothing)
Как asyncmap
, но сохраняет вывод в results
, а не возвращает коллекцию.
Поведение может быть неожиданным, если какой-либо изменяемый аргумент разделяет память с любым другим аргументом.
Base.current_task
— Functioncurrent_task()
Получите текущую выполняемую Task
.
Base.istaskdone
— Functionistaskdone(t::Task) -> Bool
Определите, завершилась ли задача.
Примеры
julia> a2() = sum(i for i in 1:1000);
julia> b = Task(a2);
julia> istaskdone(b)
false
julia> schedule(b);
julia> yield();
julia> istaskdone(b)
true
Base.istaskstarted
— Functionistaskstarted(t::Task) -> Bool
Определяет, началось ли выполнение задачи.
Примеры
julia> a3() = sum(i for i in 1:1000);
julia> b = Task(a3);
julia> istaskstarted(b)
false
Base.istaskfailed
— Functionistaskfailed(t::Task) -> Bool
Определяет, завершилась ли задача из-за выброшенного исключения.
Примеры
julia> a4() = error("задача завершилась с ошибкой");
julia> b = Task(a4);
julia> istaskfailed(b)
false
julia> schedule(b);
julia> yield();
julia> istaskfailed(b)
true
Эта функция требует как минимум Julia 1.3.
Base.task_local_storage
— Methodtask_local_storage(key)
Посмотрите значение ключа в локальном хранилище задач текущей задачи.
Base.task_local_storage
— Methodtask_local_storage(key, value)
Назначить значение ключу в локальном хранилище задач текущей задачи.
Base.task_local_storage
— Methodtask_local_storage(body, key, value)
Вызовите функцию body
с измененным локальным хранилищем задач, в котором value
присваивается key
; предыдущее значение key
, или его отсутствие, восстанавливается после этого. Полезно для эмуляции динамической области видимости.
Scheduling
Base.yield
— Functionyield()
Переключиться на планировщик, чтобы позволить запуститься другой запланированной задаче. Задача, которая вызывает эту функцию, все еще может быть выполнена и будет перезапущена немедленно, если нет других выполняемых задач.
yield(t::Task, arg = nothing)
Быстрая, несправедливая версия schedule(t, arg); yield()
, которая немедленно передает управление t
перед вызовом планировщика.
Base.yieldto
— Functionyieldto(t::Task, arg = nothing)
Переключиться на заданную задачу. В первый раз, когда происходит переключение на задачу, функция задачи вызывается без аргументов. При последующих переключениях arg
возвращается из последнего вызова задачи к yieldto
. Это низкоуровневый вызов, который только переключает задачи, не учитывая состояния или планирование каким-либо образом. Его использование не рекомендуется.
Base.sleep
— Functionsleep(seconds)
Блокирует текущую задачу на указанное количество секунд. Минимальное время сна составляет 1 миллисекунду или ввод 0.001
.
Base.schedule
— Functionschedule(t::Task, [val]; error=false)
Добавляет Task
в очередь планировщика. Это приводит к тому, что задача выполняется постоянно, когда система в остальном простаивает, если только задача не выполняет блокирующую операцию, такую как wait
.
Если предоставлен второй аргумент val
, он будет передан задаче (через возвращаемое значение yieldto
), когда она снова запустится. Если error
равно true
, значение будет вызвано как исключение в пробуждённой задаче.
Неправильно использовать schedule
для произвольной Task
, которая уже была запущена. См. справочник API для получения дополнительной информации.
По умолчанию задачи будут иметь установленный бит "липкости" в значение true t.sticky
. Это моделирует исторический стандарт для @async
. Липкие задачи могут выполняться только на рабочем потоке, на котором они были впервые запланированы, и при планировании сделают задачу, из которой они были запланированы, липкой. Чтобы получить поведение Threads.@spawn
, установите бит "липкости" вручную в значение false
.
Примеры
julia> a5() = sum(i for i in 1:1000);
julia> b = Task(a5);
julia> istaskstarted(b)
false
julia> schedule(b);
julia> yield();
julia> istaskstarted(b)
true
julia> istaskdone(b)
true
Synchronization
Base.errormonitor
— Functionerrormonitor(t::Task)
Выводит журнал ошибок в stderr
, если задача t
завершилась неудачей.
Примеры
julia> Base._wait(errormonitor(Threads.@spawn error("задача завершилась неудачей")))
Unhandled Task ERROR: задача завершилась неудачей
Stacktrace:
[...]
Base.@sync
— Macro@sync
Ждите, пока все лексически заключенные использования @async
, @spawn
, Distributed.@spawnat
и Distributed.@distributed
не завершатся. Все исключения, выброшенные заключенными асинхронными операциями, собираются и выбрасываются как CompositeException
.
Примеры
julia> Threads.nthreads()
4
julia> @sync begin
Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
end;
Thread-id 3, task 1
Thread-id 1, task 2
Base.wait
— FunctionСпециальное примечание для Threads.Condition
:
Вызывающий должен удерживать lock
, который владеет Threads.Condition
, перед вызовом этого метода. Вызывающая задача будет заблокирована до тех пор, пока какая-либо другая задача не разбудит её, обычно вызывая notify
на том же объекте Threads.Condition
. Замок будет атомарно освобождён при блокировке (даже если он был заблокирован рекурсивно) и будет повторно захвачен перед возвратом.
wait(r::Future)
Ожидание, пока значение не станет доступным для указанного Future
.
wait(r::RemoteChannel, args...)
Ожидание, пока значение не станет доступным на указанном RemoteChannel
.
wait([x])
Блокирует текущую задачу до тех пор, пока не произойдет какое-то событие, в зависимости от типа аргумента:
Channel
: Ждет, пока значение не будет добавлено в канал.Condition
: Ждет вызоваnotify
на условии и возвращает параметрval
, переданный вnotify
. Ожидание на условии дополнительно позволяет передатьfirst=true
, что приводит к тому, что ожидающий помещается первым в очередь на пробуждение при вызовеnotify
, вместо обычного поведения "первый пришел - первый вышел".Process
: Ждет, пока процесс или цепочка процессов не завершится. Полеexitcode
процесса можно использовать для определения успеха или неудачи.Task
: Ждет завершенияTask
. Если задача завершается с исключением, выбрасываетсяTaskFailedException
(который оборачивает неудавшуюся задачу).RawFD
: Ждет изменений на файловом дескрипторе (см. пакетFileWatching
).
Если аргумент не передан, задача блокируется на неопределенный срок. Задача может быть перезапущена только явным вызовом schedule
или yieldto
.
Часто wait
вызывается внутри цикла while
, чтобы убедиться, что ожидаемое условие выполнено перед продолжением.
wait(c::Channel)
Блокирует выполнение до тех пор, пока Channel
isready
.
julia> c = Channel(1);
julia> isready(c)
false
julia> task = Task(() -> wait(c));
julia> schedule(task);
julia> istaskdone(task) # задача заблокирована, потому что канал не готов
false
julia> put!(c, 1);
julia> istaskdone(task) # задача теперь разблокирована
true
Base.fetch
— Methodfetch(t::Task)
Ждите, пока Task
не завершится, затем верните его значение результата. Если задача завершится с ошибкой, будет выброшено исключение TaskFailedException
(которое оборачивает неудавшуюся задачу).
Base.fetch
— Methodfetch(x::Any)
Возвращает x
.
Base.timedwait
— Functiontimedwait(testcb, timeout::Real; pollint::Real=0.1)
Ждите, пока testcb()
не вернет true
или пока не истечет timeout
секунд, в зависимости от того, что произойдет раньше. Тестовая функция опрашивается каждые pollint
секунд. Минимальное значение для pollint
составляет 0.001 секунды, то есть 1 миллисекунда.
Возвращает :ok
или :timed_out
.
Примеры
julia> cb() = (sleep(5); return);
julia> t = @async cb();
julia> timedwait(()->istaskdone(t), 1)
:timed_out
julia> timedwait(()->istaskdone(t), 6.5)
:ok
Base.Condition
— TypeCondition()
Создайте источник событий с срабатыванием по фронту, на который могут ожидать задачи. Задачи, которые вызывают wait
на Condition
, приостанавливаются и ставятся в очередь. Задачи пробуждаются, когда позже вызывается notify
на Condition
. Ожидание на условии может вернуть значение или вызвать ошибку, если используются необязательные аргументы notify
. Срабатывание по фронту означает, что только задачи, ожидающие в момент вызова notify
, могут быть пробуждены. Для уведомлений с уровнями вам нужно поддерживать дополнительное состояние, чтобы отслеживать, произошло ли уведомление. Типы Channel
и Threads.Event
делают это и могут использоваться для событий с уровнями.
Этот объект НЕ является потокобезопасным. См. Threads.Condition
для потокобезопасной версии.
Base.Threads.Condition
— TypeThreads.Condition([lock])
Потоко-безопасная версия Base.Condition
.
Чтобы вызвать wait
или notify
на Threads.Condition
, вы сначала должны вызвать lock
на нем. Когда вызывается wait
, блокировка атомарно освобождается во время блокировки и будет повторно захвачена перед тем, как wait
вернется. Поэтому идиоматическое использование Threads.Condition
c
выглядит следующим образом:
lock(c)
try
while !thing_we_are_waiting_for
wait(c)
end
finally
unlock(c)
end
Эта функциональность требует как минимум Julia 1.2.
Base.Event
— TypeEvent([autoreset=false])
Создайте источник событий с уровневым триггером. Задачи, которые вызывают wait
на Event
, приостанавливаются и ставятся в очередь до тех пор, пока не будет вызван notify
на Event
. После вызова notify
Event
остается в состоянии сигнала, и задачи больше не будут блокироваться при ожидании его, пока не будет вызван reset
.
Если autoreset
равно true, то не более одной задачи будет освобождено из wait
за каждый вызов notify
.
Это обеспечивает порядок памяти захвата и освобождения для notify/wait.
Эта функциональность требует как минимум Julia 1.1.
Функциональность autoreset
и гарантия порядка памяти требуют как минимум Julia 1.8.
Base.notify
— Functionnotify(condition, val=nothing; all=true, error=false)
Разбудите задачи, ожидающие условия, передавая им val
. Если all
равно true
(по умолчанию), все ожидающие задачи будут разбудены, в противном случае только одна. Если error
равно true
, переданное значение будет вызвано как исключение в разбуденных задачах.
Верните количество разбуденных задач. Верните 0, если ни одна задача не ожидает condition
.
Base.reset
— Methodreset(::Event)
Сбрасывает Event
в неустановленное состояние. Затем любые будущие вызовы wait
будут блокироваться до тех пор, пока не будет снова вызван notify
.
Base.Semaphore
— TypeSemaphore(sem_size)
Создайте счетный семафор, который позволяет в любой момент времени использовать не более sem_size
захватов. Каждый захват должен соответствовать освобождению.
Это обеспечивает упорядочение памяти захвата и освобождения при вызовах захвата/освобождения.
Base.acquire
— Functionacquire(s::Semaphore)
Ожидание, пока один из разрешений sem_size
не станет доступным, блокируя выполнение до тех пор, пока одно из них не будет получено.
acquire(f, s::Semaphore)
Выполните f
после получения семафора s
и release
по завершении или ошибке.
Например, форма do-блока, которая гарантирует, что только 2 вызова foo
будут активны одновременно:
s = Base.Semaphore(2)
@sync for _ in 1:100
Threads.@spawn begin
Base.acquire(s) do
foo()
end
end
end
Этот метод требует как минимум Julia 1.8.
Base.release
— Functionrelease(s::Semaphore)
Вернуть один разрешение в пул, возможно позволяя другой задаче получить его и продолжить выполнение.
Base.AbstractLock
— TypeAbstractLock
Абстрактный суперкласс, описывающий типы, которые реализуют примитивы синхронизации: lock
, trylock
, unlock
и islocked
.
Base.lock
— Functionlock(lock)
Получите lock
, когда он станет доступен. Если замок уже заблокирован другой задачей/потоком, подождите, пока он не станет доступен.
Каждый lock
должен соответствовать unlock
.
lock(f::Function, lock)
Получите lock
, выполните f
, удерживая lock
, и освободите lock
, когда f
вернется. Если блокировка уже занята другой задачей/потоком, дождитесь, пока она станет доступной.
Когда эта функция возвращает, lock
был освобожден, поэтому вызывающий не должен пытаться его unlock
.
См. также: @lock
.
Использование Channel
в качестве второго аргумента требует Julia 1.7 или более поздней версии.
lock(f::Function, l::Lockable)
Приобретите блокировку, связанную с l
, выполните f
, удерживая блокировку, и освободите блокировку, когда f
вернется. f
получит один позиционный аргумент: значение, обернутое в l
. Если блокировка уже занята другой задачей/потоком, подождите, пока она не станет доступной. Когда эта функция вернется, lock
будет освобожден, поэтому вызывающий не должен пытаться unlock
его.
Требуется как минимум Julia 1.11.
Base.unlock
— Functionunlock(lock)
Освобождает право собственности на lock
.
Если это рекурсивный замок, который был захвачен ранее, уменьшите внутренний счетчик и немедленно вернитесь.
Base.trylock
— Functiontrylock(lock) -> Успех (Булевый)
Получите блокировку, если она доступна, и верните true
, если успешно. Если блокировка уже занята другой задачей/потоком, верните false
.
Каждый успешный trylock
должен соответствовать unlock
.
Функция trylock
, в сочетании с islocked
, может быть использована для написания алгоритмов тестирования и установки или экспоненциального отката если это поддерживается typeof(lock)
(читайте его документацию).
Base.islocked
— Functionislocked(lock) -> Status (Boolean)
Проверьте, удерживается ли lock
какой-либо задачей/потоком. Эта функция сама по себе не должна использоваться для синхронизации. Однако islocked
в сочетании с trylock
может быть использован для написания алгоритмов тестирования и установки или экспоненциального отката если это поддерживается typeof(lock)
(читайте его документацию).
Расширенная помощь
Например, экспоненциальный откат может быть реализован следующим образом, если реализация lock
удовлетворяет свойствам, описанным ниже.
nspins = 0
while true
while islocked(lock)
GC.safepoint()
nspins += 1
nspins > LIMIT && error("timeout")
end
trylock(lock) && break
backoff()
end
Реализация
Реализация блокировки должна определить islocked
с следующими свойствами и отметить это в своей строке документации.
islocked(lock)
не имеет гонок данных.- Если
islocked(lock)
возвращаетfalse
, немедленный вызовtrylock(lock)
должен завершиться успешно (возвращаетtrue
), если нет вмешательства со стороны других задач.
Base.ReentrantLock
— TypeReentrantLock()
Создает повторно используемый замок для синхронизации Task
s. Одна и та же задача может захватывать замок столько раз, сколько необходимо (это и означает "повторно используемый" в названии). Каждый lock
должен соответствовать unlock
.
Вызов lock
также будет препятствовать выполнению финализаторов в этом потоке до соответствующего unlock
. Использование стандартного шаблона блокировки, показанного ниже, должно естественным образом поддерживаться, но будьте осторожны с инверсией порядка try/lock или пропуском блока try полностью (например, попытка вернуть значение, когда замок все еще удерживается):
Это обеспечивает порядок памяти acquire/release для вызовов lock/unlock.
lock(l)
try
<атомарная работа>
finally
unlock(l)
end
Если !islocked(lck::ReentrantLock)
верно, то trylock(lck)
будет успешным, если нет других задач, пытающихся удерживать замок "в одно и то же время."
Base.@lock
— Macro@lock l expr
Макросная версия lock(f, l::AbstractLock)
, но с expr
вместо функции f
. Расширяется в:
lock(l)
try
expr
finally
unlock(l)
end
Это похоже на использование lock
с блоком do
, но избегает создания замыкания и, таким образом, может улучшить производительность.
@lock
был добавлен в Julia 1.3 и экспортирован в Julia 1.10.
Base.Lockable
— TypeLockable(value, lock = ReentrantLock())
Создает объект Lockable
, который оборачивает value
и ассоциирует его с предоставленным lock
. Этот объект поддерживает @lock
, lock
, trylock
, unlock
. Чтобы получить доступ к значению, индексируйте объект lockable, удерживая замок.
Требуется как минимум Julia 1.11.
Пример
julia> locked_list = Base.Lockable(Int[]);
julia> @lock(locked_list, push!(locked_list[], 1)) # необходимо удерживать замок для доступа к значению
1-element Vector{Int64}:
1
julia> lock(summary, locked_list)
"1-element Vector{Int64}"
Channels
Base.AbstractChannel
— TypeAbstractChannel{T}
Представление канала, передающего объекты типа T
.
Base.Channel
— TypeChannel{T=Any}(size::Int=0)
Создает Channel
с внутренним буфером, который может содержать максимум size
объектов типа T
. put!
вызовы на полном канале блокируют выполнение до тех пор, пока объект не будет удален с помощью take!
.
Channel(0)
создает небухгалтерский канал. put!
блокирует выполнение до тех пор, пока не будет вызван соответствующий take!
. И наоборот.
Другие конструкторы:
Channel()
: конструктор по умолчанию, эквивалентныйChannel{Any}(0)
Channel(Inf)
: эквивалентноChannel{Any}(typemax(Int))
Channel(sz)
: эквивалентноChannel{Any}(sz)
Конструктор по умолчанию Channel()
и значение по умолчанию size=0
были добавлены в Julia 1.3.
Base.Channel
— MethodChannel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)
Создайте новую задачу из func
, привяжите ее к новому каналу типа T
и размера size
, и запланируйте задачу, все в одном вызове. Канал автоматически закрывается, когда задача завершается.
func
должен принимать связанный канал в качестве единственного аргумента.
Если вам нужна ссылка на созданную задачу, передайте объект Ref{Task}
через именованный аргумент taskref
.
Если spawn=true
, задача, созданная для func
, может быть запланирована на другом потоке параллельно, что эквивалентно созданию задачи через Threads.@spawn
.
Если spawn=true
и аргумент threadpool
не установлен, по умолчанию используется :default
.
Если аргумент threadpool
установлен (на :default
или :interactive
), это подразумевает, что spawn=true
, и новая задача создается в указанном пуле потоков.
Возвращает Channel
.
Примеры
julia> chnl = Channel() do ch
foreach(i -> put!(ch, i), 1:4)
end;
julia> typeof(chnl)
Channel{Any}
julia> for i in chnl
@show i
end;
i = 1
i = 2
i = 3
i = 4
Ссылка на созданную задачу:
julia> taskref = Ref{Task}();
julia> chnl = Channel(taskref=taskref) do ch
println(take!(ch))
end;
julia> istaskdone(taskref[])
false
julia> put!(chnl, "Hello");
Hello
julia> istaskdone(taskref[])
true
Параметр spawn=
был добавлен в Julia 1.3. Этот конструктор был добавлен в Julia 1.3. В более ранних версиях Julia канал использовал именованные аргументы для установки size
и T
, но эти конструкторы устарели.
Аргумент threadpool=
был добавлен в Julia 1.9.
julia> chnl = Channel{Char}(1, spawn=true) do ch
for c in "hello world"
put!(ch, c)
end
end
Channel{Char}(1) (2 items available)
julia> String(collect(chnl))
"hello world"
Base.put!
— Methodput!(c::Channel, v)
Добавляет элемент v
в канал c
. Блокирует, если канал полон.
Для небухгалтерских каналов блокирует до тех пор, пока не будет выполнен take!
другой задачей.
v
теперь преобразуется в тип канала с помощью convert
при вызове put!
.
Base.take!
— Methodtake!(c::Channel)
Удаляет и возвращает значение из Channel
в порядке. Блокирует выполнение до тех пор, пока данные не станут доступны. Для небухгалтерских каналов блокирует выполнение до тех пор, пока не будет выполнен put!
другой задачей.
Примеры
Буферизированный канал:
julia> c = Channel(1);
julia> put!(c, 1);
julia> take!(c)
1
Небуферизированный канал:
julia> c = Channel(0);
julia> task = Task(() -> put!(c, 1));
julia> schedule(task);
julia> take!(c)
1
Base.isready
— Methodisready(c::Channel)
Определяет, есть ли значение, хранящееся в Channel
. Возвращает немедленно, не блокирует.
Для небухгалтерских каналов возвращает true
, если есть задачи, ожидающие выполнения put!
.
Примеры
Буферизированный канал:
julia> c = Channel(1);
julia> isready(c)
false
julia> put!(c, 1);
julia> isready(c)
true
Небуферизированный канал:
julia> c = Channel();
julia> isready(c) # нет задач, ожидающих выполнения put!
false
julia> task = Task(() -> put!(c, 1));
julia> schedule(task); # запланировать задачу put!
julia> isready(c)
true
Base.fetch
— Methodfetch(c::Channel)
Ожидает и возвращает (без удаления) первый доступный элемент из Channel
. Примечание: fetch
не поддерживается для небухгалтерского (0-размерного) Channel
.
Примеры
Буферизированный канал:
julia> c = Channel(3) do ch
foreach(i -> put!(ch, i), 1:3)
end;
julia> fetch(c)
1
julia> collect(c) # элемент не удален
3-element Vector{Any}:
1
2
3
Base.close
— Methodclose(c::Channel[, excp::Exception])
Закрыть канал. Исключение (опционально заданное excp
) выбрасывается при:
Base.bind
— Methodbind(chnl::Channel, task::Task)
Ассоциирует время жизни chnl
с задачей. Channel
chnl
автоматически закрывается, когда задача завершается. Любое необработанное исключение в задаче передается всем ожидающим на chnl
.
Объект chnl
можно явно закрыть независимо от завершения задачи. Завершающие задачи не влияют на уже закрытые объекты Channel
.
Когда канал связан с несколькими задачами, первая завершившаяся задача закроет канал. Когда несколько каналов связаны с одной задачей, завершение задачи закроет все связанные каналы.
Примеры
julia> c = Channel(0);
julia> task = @async foreach(i->put!(c, i), 1:4);
julia> bind(c,task);
julia> for i in c
@show i
end;
i = 1
i = 2
i = 3
i = 4
julia> isopen(c)
false
julia> c = Channel(0);
julia> task = @async (put!(c, 1); error("foo"));
julia> bind(c, task);
julia> take!(c)
1
julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
nested task error: foo
[...]
Low-level synchronization using schedule
and wait
Самое простое правильное использование schedule
происходит в Task
, который еще не начат (запланирован). Однако возможно использовать 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566
и wait
в качестве очень низкоуровневого строительного блока для создания интерфейсов синхронизации. Ключевым предварительным условием для вызова schedule(task)
является то, что вызывающий должен "владеть" task
; т.е. он должен знать, что вызов wait
в данном task
происходит в местах, известных коду, вызывающему schedule(task)
. Одна из стратегий для обеспечения такого предварительного условия — использовать атомарные операции, как показано в следующем примере:
@enum OWEState begin
OWE_EMPTY
OWE_WAITING
OWE_NOTIFYING
end
mutable struct OneWayEvent
@atomic state::OWEState
task::Task
OneWayEvent() = new(OWE_EMPTY)
end
function Base.notify(ev::OneWayEvent)
state = @atomic ev.state
while state !== OWE_NOTIFYING
# Spin until we successfully update the state to OWE_NOTIFYING:
state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
if ok
if state == OWE_WAITING
# OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
# already waiting or about to call `wait`. The notifier task must wake up
# the waiter task.
schedule(ev.task)
else
@assert state == OWE_EMPTY
# Since we are assuming that there is only one notifier task (for
# simplicity), we know that the other possible case here is OWE_EMPTY.
# We do not need to do anything because we know that the waiter task has
# not called `wait(ev::OneWayEvent)` yet.
end
break
end
end
return
end
function Base.wait(ev::OneWayEvent)
ev.task = current_task()
state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
if ok
# OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
# invoke OWE_WAITING -> OWE_NOTIFYING transition. The waiter task must call
# `wait()` immediately. In particular, it MUST NOT invoke any function that may
# yield to the scheduler at this point in code.
wait()
else
@assert state == OWE_NOTIFYING
# Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
# notifier task.
end
return
end
ev = OneWayEvent()
@sync begin
@async begin
wait(ev)
println("done")
end
println("notifying...")
notify(ev)
end
# output
notifying...
done
OneWayEvent
позволяет одной задаче ожидать
уведомления от другой задачи. Это ограниченный интерфейс связи, так как ожидание
может быть использовано только один раз из одной задачи (обратите внимание на неатомарное присваивание ev.task
)
В этом примере notify(ev::OneWayEvent)
разрешено вызывать schedule(ev.task)
, если и только если он изменяет состояние с OWE_WAITING
на OWE_NOTIFYING
. Это позволяет нам знать, что задача, выполняющая wait(ev::OneWayEvent)
, теперь находится в ветке ok
, и не может быть других задач, которые пытаются вызвать schedule(ev.task)
, поскольку их @atomicreplace(ev.state, state => OWE_NOTIFYING)
потерпит неудачу.