Tasks

Core.TaskType
Задача(func)

Создайте Task (т.е. корутину) для выполнения данной функции func (которая должна быть вызываемой без аргументов). Задача завершится, когда эта функция вернёт значение. Задача будет выполняться в "возрасте мира" от родителя при создании, когда schedule вызван.

!!! предупреждение По умолчанию задачи будут иметь установленный бит "липкости" в значение true t.sticky. Это моделирует исторический стандарт для @async. Липкие задачи могут выполняться только на рабочем потоке, на котором они были впервые запланированы, и при планировании сделают задачу, из которой они были запланированы, липкой. Чтобы получить поведение Threads.@spawn, установите бит "липкости" вручную в значение false.

Примеры

julia> a() = sum(i for i in 1:1000);

julia> b = Task(a);

В этом примере b является выполняемой Task, которая ещё не началась.

source
Base.@taskMacro
@task

Оберните выражение в Task, не выполняя его, и верните Task. Это только создает задачу и не запускает ее.

Warning

По умолчанию задачи будут иметь установленный бит "липкости" в значение true t.sticky. Это моделирует исторический стандарт для @async. Липкие задачи могут выполняться только на рабочем потоке, на котором они были впервые запланированы, и при планировании сделают задачу, из которой они были запланированы, липкой. Чтобы получить поведение Threads.@spawn, установите бит "липкости" вручную в значение false.

Примеры

julia> a1() = sum(i for i in 1:1000);

julia> b = @task a1();

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.@asyncMacro
@async

Оборачивает выражение в Task и добавляет его в очередь планировщика локальной машины.

Значения могут быть интерполированы в @async с помощью $, который копирует значение непосредственно в создаваемую замыкание. Это позволяет вставлять значение переменной, изолируя асинхронный код от изменений значения переменной в текущей задаче.

Warning

Настоятельно рекомендуется всегда предпочитать Threads.@spawn вместо @async даже когда параллелизм не требуется, особенно в публично распределенных библиотеках. Это связано с тем, что использование @async отключает миграцию родительской задачи между рабочими потоками в текущей реализации Julia. Таким образом, на первый взгляд безобидное использование @async в функции библиотеки может значительно повлиять на производительность очень разных частей пользовательских приложений.

Julia 1.4

Интерполяция значений с помощью $ доступна начиная с Julia 1.4.

source
Base.asyncmapFunction
asyncmap(f, c...; ntasks=0, batch_size=nothing)

Использует несколько параллельных задач для применения f к коллекции (или нескольким коллекциям одинаковой длины). Для нескольких аргументов коллекции f применяется поэлементно.

ntasks указывает количество задач, которые будут выполняться параллельно. В зависимости от длины коллекций, если ntasks не указан, будет использовано до 100 задач для параллельного отображения.

ntasks также может быть указан как функция без аргументов. В этом случае количество задач, которые будут выполняться параллельно, проверяется перед обработкой каждого элемента, и новая задача запускается, если значение ntasks_func больше текущего количества задач.

Если указано batch_size, коллекция обрабатывается в пакетном режиме. f должна быть функцией, которая принимает Vector кортежей аргументов и возвращает вектор результатов. Входной вектор будет иметь длину batch_size или меньше.

Следующие примеры подчеркивают выполнение в разных задачах, возвращая objectid задач, в которых выполняется функция отображения.

Сначала, когда ntasks не определен, каждый элемент обрабатывается в другой задаче.

julia> tskoid() = objectid(current_task());

julia> asyncmap(x->tskoid(), 1:5)
5-element Array{UInt64,1}:
 0x6e15e66c75c75853
 0x440f8819a1baa682
 0x9fb3eeadd0c83985
 0xebd3e35fe90d4050
 0x29efc93edce2b961

julia> length(unique(asyncmap(x->tskoid(), 1:5)))
5

С ntasks=2 все элементы обрабатываются в 2 задачах.

julia> asyncmap(x->tskoid(), 1:5; ntasks=2)
5-element Array{UInt64,1}:
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94
 0xa23d2f80cd7cf157
 0x027ab1680df7ae94

julia> length(unique(asyncmap(x->tskoid(), 1:5; ntasks=2)))
2

С определенным batch_size функция отображения должна быть изменена, чтобы принимать массив кортежей аргументов и возвращать массив результатов. map используется в измененной функции отображения для достижения этого.

julia> batch_func(input) = map(x->string("args_tuple: ", x, ", element_val: ", x[1], ", task: ", tskoid()), input)
batch_func (generic function with 1 method)

julia> asyncmap(batch_func, 1:5; ntasks=2, batch_size=2)
5-element Array{String,1}:
 "args_tuple: (1,), element_val: 1, task: 9118321258196414413"
 "args_tuple: (2,), element_val: 2, task: 4904288162898683522"
 "args_tuple: (3,), element_val: 3, task: 9118321258196414413"
 "args_tuple: (4,), element_val: 4, task: 4904288162898683522"
 "args_tuple: (5,), element_val: 5, task: 9118321258196414413"
source
Base.asyncmap!Function
asyncmap!(f, results, c...; ntasks=0, batch_size=nothing)

Как asyncmap, но сохраняет вывод в results, а не возвращает коллекцию.

Warning

Поведение может быть неожиданным, если какой-либо изменяемый аргумент разделяет память с любым другим аргументом.

source
Base.istaskdoneFunction
istaskdone(t::Task) -> Bool

Определите, завершилась ли задача.

Примеры

julia> a2() = sum(i for i in 1:1000);

julia> b = Task(a2);

julia> istaskdone(b)
false

julia> schedule(b);

julia> yield();

julia> istaskdone(b)
true
source
Base.istaskstartedFunction
istaskstarted(t::Task) -> Bool

Определяет, началось ли выполнение задачи.

Примеры

julia> a3() = sum(i for i in 1:1000);

julia> b = Task(a3);

julia> istaskstarted(b)
false
source
Base.istaskfailedFunction
istaskfailed(t::Task) -> Bool

Определяет, завершилась ли задача из-за выброшенного исключения.

Примеры

julia> a4() = error("задача завершилась с ошибкой");

julia> b = Task(a4);

julia> istaskfailed(b)
false

julia> schedule(b);

julia> yield();

julia> istaskfailed(b)
true
Julia 1.3

Эта функция требует как минимум Julia 1.3.

source
Base.task_local_storageMethod
task_local_storage(key)

Посмотрите значение ключа в локальном хранилище задач текущей задачи.

source
Base.task_local_storageMethod
task_local_storage(key, value)

Назначить значение ключу в локальном хранилище задач текущей задачи.

source
Base.task_local_storageMethod
task_local_storage(body, key, value)

Вызовите функцию body с измененным локальным хранилищем задач, в котором value присваивается key; предыдущее значение key, или его отсутствие, восстанавливается после этого. Полезно для эмуляции динамической области видимости.

source

Scheduling

Base.yieldFunction
yield()

Переключиться на планировщик, чтобы позволить запуститься другой запланированной задаче. Задача, которая вызывает эту функцию, все еще может быть выполнена и будет перезапущена немедленно, если нет других выполняемых задач.

source
yield(t::Task, arg = nothing)

Быстрая, несправедливая версия schedule(t, arg); yield(), которая немедленно передает управление t перед вызовом планировщика.

source
Base.yieldtoFunction
yieldto(t::Task, arg = nothing)

Переключиться на заданную задачу. В первый раз, когда происходит переключение на задачу, функция задачи вызывается без аргументов. При последующих переключениях arg возвращается из последнего вызова задачи к yieldto. Это низкоуровневый вызов, который только переключает задачи, не учитывая состояния или планирование каким-либо образом. Его использование не рекомендуется.

source
Base.sleepFunction
sleep(seconds)

Блокирует текущую задачу на указанное количество секунд. Минимальное время сна составляет 1 миллисекунду или ввод 0.001.

source
Base.scheduleFunction
schedule(t::Task, [val]; error=false)

Добавляет Task в очередь планировщика. Это приводит к тому, что задача выполняется постоянно, когда система в остальном простаивает, если только задача не выполняет блокирующую операцию, такую как wait.

Если предоставлен второй аргумент val, он будет передан задаче (через возвращаемое значение yieldto), когда она снова запустится. Если error равно true, значение будет вызвано как исключение в пробуждённой задаче.

Warning

Неправильно использовать schedule для произвольной Task, которая уже была запущена. См. справочник API для получения дополнительной информации.

Warning

По умолчанию задачи будут иметь установленный бит "липкости" в значение true t.sticky. Это моделирует исторический стандарт для @async. Липкие задачи могут выполняться только на рабочем потоке, на котором они были впервые запланированы, и при планировании сделают задачу, из которой они были запланированы, липкой. Чтобы получить поведение Threads.@spawn, установите бит "липкости" вручную в значение false.

Примеры

julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
source

Synchronization

Base.errormonitorFunction
errormonitor(t::Task)

Выводит журнал ошибок в stderr, если задача t завершилась неудачей.

Примеры

julia> Base._wait(errormonitor(Threads.@spawn error("задача завершилась неудачей")))
Unhandled Task ERROR: задача завершилась неудачей
Stacktrace:
[...]
source
Base.@syncMacro
@sync

Ждите, пока все лексически заключенные использования @async, @spawn, Distributed.@spawnat и Distributed.@distributed не завершатся. Все исключения, выброшенные заключенными асинхронными операциями, собираются и выбрасываются как CompositeException.

Примеры

julia> Threads.nthreads()
4

julia> @sync begin
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 1")
           Threads.@spawn println("Thread-id $(Threads.threadid()), task 2")
       end;
Thread-id 3, task 1
Thread-id 1, task 2
source
Base.waitFunction

Специальное примечание для Threads.Condition:

Вызывающий должен удерживать lock, который владеет Threads.Condition, перед вызовом этого метода. Вызывающая задача будет заблокирована до тех пор, пока какая-либо другая задача не разбудит её, обычно вызывая notify на том же объекте Threads.Condition. Замок будет атомарно освобождён при блокировке (даже если он был заблокирован рекурсивно) и будет повторно захвачен перед возвратом.

source
wait(r::Future)

Ожидание, пока значение не станет доступным для указанного Future.

source
wait(r::RemoteChannel, args...)

Ожидание, пока значение не станет доступным на указанном RemoteChannel.

source
wait([x])

Блокирует текущую задачу до тех пор, пока не произойдет какое-то событие, в зависимости от типа аргумента:

  • Channel: Ждет, пока значение не будет добавлено в канал.
  • Condition: Ждет вызова notify на условии и возвращает параметр val, переданный в notify. Ожидание на условии дополнительно позволяет передать first=true, что приводит к тому, что ожидающий помещается первым в очередь на пробуждение при вызове notify, вместо обычного поведения "первый пришел - первый вышел".
  • Process: Ждет, пока процесс или цепочка процессов не завершится. Поле exitcode процесса можно использовать для определения успеха или неудачи.
  • Task: Ждет завершения Task. Если задача завершается с исключением, выбрасывается TaskFailedException (который оборачивает неудавшуюся задачу).
  • RawFD: Ждет изменений на файловом дескрипторе (см. пакет FileWatching).

Если аргумент не передан, задача блокируется на неопределенный срок. Задача может быть перезапущена только явным вызовом schedule или yieldto.

Часто wait вызывается внутри цикла while, чтобы убедиться, что ожидаемое условие выполнено перед продолжением.

source
wait(c::Channel)

Блокирует выполнение до тех пор, пока Channel isready.

julia> c = Channel(1);

julia> isready(c)
false

julia> task = Task(() -> wait(c));

julia> schedule(task);

julia> istaskdone(task)  # задача заблокирована, потому что канал не готов
false

julia> put!(c, 1);

julia> istaskdone(task)  # задача теперь разблокирована
true
source
Base.fetchMethod
fetch(t::Task)

Ждите, пока Task не завершится, затем верните его значение результата. Если задача завершится с ошибкой, будет выброшено исключение TaskFailedException (которое оборачивает неудавшуюся задачу).

source
Base.timedwaitFunction
timedwait(testcb, timeout::Real; pollint::Real=0.1)

Ждите, пока testcb() не вернет true или пока не истечет timeout секунд, в зависимости от того, что произойдет раньше. Тестовая функция опрашивается каждые pollint секунд. Минимальное значение для pollint составляет 0.001 секунды, то есть 1 миллисекунда.

Возвращает :ok или :timed_out.

Примеры

julia> cb() = (sleep(5); return);

julia> t = @async cb();

julia> timedwait(()->istaskdone(t), 1)
:timed_out

julia> timedwait(()->istaskdone(t), 6.5)
:ok
source
Base.ConditionType
Condition()

Создайте источник событий с срабатыванием по фронту, на который могут ожидать задачи. Задачи, которые вызывают wait на Condition, приостанавливаются и ставятся в очередь. Задачи пробуждаются, когда позже вызывается notify на Condition. Ожидание на условии может вернуть значение или вызвать ошибку, если используются необязательные аргументы notify. Срабатывание по фронту означает, что только задачи, ожидающие в момент вызова notify, могут быть пробуждены. Для уведомлений с уровнями вам нужно поддерживать дополнительное состояние, чтобы отслеживать, произошло ли уведомление. Типы Channel и Threads.Event делают это и могут использоваться для событий с уровнями.

Этот объект НЕ является потокобезопасным. См. Threads.Condition для потокобезопасной версии.

source
Base.Threads.ConditionType
Threads.Condition([lock])

Потоко-безопасная версия Base.Condition.

Чтобы вызвать wait или notify на Threads.Condition, вы сначала должны вызвать lock на нем. Когда вызывается wait, блокировка атомарно освобождается во время блокировки и будет повторно захвачена перед тем, как wait вернется. Поэтому идиоматическое использование Threads.Condition c выглядит следующим образом:

lock(c)
try
    while !thing_we_are_waiting_for
        wait(c)
    end
finally
    unlock(c)
end
Julia 1.2

Эта функциональность требует как минимум Julia 1.2.

source
Base.EventType
Event([autoreset=false])

Создайте источник событий с уровневым триггером. Задачи, которые вызывают wait на Event, приостанавливаются и ставятся в очередь до тех пор, пока не будет вызван notify на Event. После вызова notify Event остается в состоянии сигнала, и задачи больше не будут блокироваться при ожидании его, пока не будет вызван reset.

Если autoreset равно true, то не более одной задачи будет освобождено из wait за каждый вызов notify.

Это обеспечивает порядок памяти захвата и освобождения для notify/wait.

Julia 1.1

Эта функциональность требует как минимум Julia 1.1.

Julia 1.8

Функциональность autoreset и гарантия порядка памяти требуют как минимум Julia 1.8.

source
Base.notifyFunction
notify(condition, val=nothing; all=true, error=false)

Разбудите задачи, ожидающие условия, передавая им val. Если all равно true (по умолчанию), все ожидающие задачи будут разбудены, в противном случае только одна. Если error равно true, переданное значение будет вызвано как исключение в разбуденных задачах.

Верните количество разбуденных задач. Верните 0, если ни одна задача не ожидает condition.

source
Base.resetMethod
reset(::Event)

Сбрасывает Event в неустановленное состояние. Затем любые будущие вызовы wait будут блокироваться до тех пор, пока не будет снова вызван notify.

source
Base.SemaphoreType
Semaphore(sem_size)

Создайте счетный семафор, который позволяет в любой момент времени использовать не более sem_size захватов. Каждый захват должен соответствовать освобождению.

Это обеспечивает упорядочение памяти захвата и освобождения при вызовах захвата/освобождения.

source
Base.acquireFunction
acquire(s::Semaphore)

Ожидание, пока один из разрешений sem_size не станет доступным, блокируя выполнение до тех пор, пока одно из них не будет получено.

source
acquire(f, s::Semaphore)

Выполните f после получения семафора s и release по завершении или ошибке.

Например, форма do-блока, которая гарантирует, что только 2 вызова foo будут активны одновременно:

s = Base.Semaphore(2)
@sync for _ in 1:100
    Threads.@spawn begin
        Base.acquire(s) do
            foo()
        end
    end
end
Julia 1.8

Этот метод требует как минимум Julia 1.8.

source
Base.releaseFunction
release(s::Semaphore)

Вернуть один разрешение в пул, возможно позволяя другой задаче получить его и продолжить выполнение.

source
Base.lockFunction
lock(lock)

Получите lock, когда он станет доступен. Если замок уже заблокирован другой задачей/потоком, подождите, пока он не станет доступен.

Каждый lock должен соответствовать unlock.

source
lock(f::Function, lock)

Получите lock, выполните f, удерживая lock, и освободите lock, когда f вернется. Если блокировка уже занята другой задачей/потоком, дождитесь, пока она станет доступной.

Когда эта функция возвращает, lock был освобожден, поэтому вызывающий не должен пытаться его unlock.

См. также: @lock.

Julia 1.7

Использование Channel в качестве второго аргумента требует Julia 1.7 или более поздней версии.

source

lock(f::Function, l::Lockable)

Приобретите блокировку, связанную с l, выполните f, удерживая блокировку, и освободите блокировку, когда f вернется. f получит один позиционный аргумент: значение, обернутое в l. Если блокировка уже занята другой задачей/потоком, подождите, пока она не станет доступной. Когда эта функция вернется, lock будет освобожден, поэтому вызывающий не должен пытаться unlock его.

Julia 1.11

Требуется как минимум Julia 1.11.

source
Base.unlockFunction
unlock(lock)

Освобождает право собственности на lock.

Если это рекурсивный замок, который был захвачен ранее, уменьшите внутренний счетчик и немедленно вернитесь.

source
Base.trylockFunction
trylock(lock) -> Успех (Булевый)

Получите блокировку, если она доступна, и верните true, если успешно. Если блокировка уже занята другой задачей/потоком, верните false.

Каждый успешный trylock должен соответствовать unlock.

Функция trylock, в сочетании с islocked, может быть использована для написания алгоритмов тестирования и установки или экспоненциального отката если это поддерживается typeof(lock) (читайте его документацию).

source
Base.islockedFunction
islocked(lock) -> Status (Boolean)

Проверьте, удерживается ли lock какой-либо задачей/потоком. Эта функция сама по себе не должна использоваться для синхронизации. Однако islocked в сочетании с trylock может быть использован для написания алгоритмов тестирования и установки или экспоненциального отката если это поддерживается typeof(lock) (читайте его документацию).

Расширенная помощь

Например, экспоненциальный откат может быть реализован следующим образом, если реализация lock удовлетворяет свойствам, описанным ниже.

nspins = 0
while true
    while islocked(lock)
        GC.safepoint()
        nspins += 1
        nspins > LIMIT && error("timeout")
    end
    trylock(lock) && break
    backoff()
end

Реализация

Реализация блокировки должна определить islocked с следующими свойствами и отметить это в своей строке документации.

  • islocked(lock) не имеет гонок данных.
  • Если islocked(lock) возвращает false, немедленный вызов trylock(lock) должен завершиться успешно (возвращает true), если нет вмешательства со стороны других задач.
source
Base.ReentrantLockType
ReentrantLock()

Создает повторно используемый замок для синхронизации Tasks. Одна и та же задача может захватывать замок столько раз, сколько необходимо (это и означает "повторно используемый" в названии). Каждый lock должен соответствовать unlock.

Вызов lock также будет препятствовать выполнению финализаторов в этом потоке до соответствующего unlock. Использование стандартного шаблона блокировки, показанного ниже, должно естественным образом поддерживаться, но будьте осторожны с инверсией порядка try/lock или пропуском блока try полностью (например, попытка вернуть значение, когда замок все еще удерживается):

Это обеспечивает порядок памяти acquire/release для вызовов lock/unlock.

lock(l)
try
    <атомарная работа>
finally
    unlock(l)
end

Если !islocked(lck::ReentrantLock) верно, то trylock(lck) будет успешным, если нет других задач, пытающихся удерживать замок "в одно и то же время."

source
Base.@lockMacro
@lock l expr

Макросная версия lock(f, l::AbstractLock), но с expr вместо функции f. Расширяется в:

lock(l)
try
    expr
finally
    unlock(l)
end

Это похоже на использование lock с блоком do, но избегает создания замыкания и, таким образом, может улучшить производительность.

Compat

@lock был добавлен в Julia 1.3 и экспортирован в Julia 1.10.

source
Base.LockableType

Lockable(value, lock = ReentrantLock())

Создает объект Lockable, который оборачивает value и ассоциирует его с предоставленным lock. Этот объект поддерживает @lock, lock, trylock, unlock. Чтобы получить доступ к значению, индексируйте объект lockable, удерживая замок.

Julia 1.11

Требуется как минимум Julia 1.11.

Пример

julia> locked_list = Base.Lockable(Int[]);

julia> @lock(locked_list, push!(locked_list[], 1)) # необходимо удерживать замок для доступа к значению
1-element Vector{Int64}:
 1

julia> lock(summary, locked_list)
"1-element Vector{Int64}"
source

Channels

Base.AbstractChannelType
AbstractChannel{T}

Представление канала, передающего объекты типа T.

source
Base.ChannelType
Channel{T=Any}(size::Int=0)

Создает Channel с внутренним буфером, который может содержать максимум size объектов типа T. put! вызовы на полном канале блокируют выполнение до тех пор, пока объект не будет удален с помощью take!.

Channel(0) создает небухгалтерский канал. put! блокирует выполнение до тех пор, пока не будет вызван соответствующий take!. И наоборот.

Другие конструкторы:

  • Channel(): конструктор по умолчанию, эквивалентный Channel{Any}(0)
  • Channel(Inf): эквивалентно Channel{Any}(typemax(Int))
  • Channel(sz): эквивалентно Channel{Any}(sz)
Julia 1.3

Конструктор по умолчанию Channel() и значение по умолчанию size=0 были добавлены в Julia 1.3.

source
Base.ChannelMethod
Channel{T=Any}(func::Function, size=0; taskref=nothing, spawn=false, threadpool=nothing)

Создайте новую задачу из func, привяжите ее к новому каналу типа T и размера size, и запланируйте задачу, все в одном вызове. Канал автоматически закрывается, когда задача завершается.

func должен принимать связанный канал в качестве единственного аргумента.

Если вам нужна ссылка на созданную задачу, передайте объект Ref{Task} через именованный аргумент taskref.

Если spawn=true, задача, созданная для func, может быть запланирована на другом потоке параллельно, что эквивалентно созданию задачи через Threads.@spawn.

Если spawn=true и аргумент threadpool не установлен, по умолчанию используется :default.

Если аргумент threadpool установлен (на :default или :interactive), это подразумевает, что spawn=true, и новая задача создается в указанном пуле потоков.

Возвращает Channel.

Примеры

julia> chnl = Channel() do ch
           foreach(i -> put!(ch, i), 1:4)
       end;

julia> typeof(chnl)
Channel{Any}

julia> for i in chnl
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

Ссылка на созданную задачу:

julia> taskref = Ref{Task}();

julia> chnl = Channel(taskref=taskref) do ch
           println(take!(ch))
       end;

julia> istaskdone(taskref[])
false

julia> put!(chnl, "Hello");
Hello

julia> istaskdone(taskref[])
true
Julia 1.3

Параметр spawn= был добавлен в Julia 1.3. Этот конструктор был добавлен в Julia 1.3. В более ранних версиях Julia канал использовал именованные аргументы для установки size и T, но эти конструкторы устарели.

Julia 1.9

Аргумент threadpool= был добавлен в Julia 1.9.

julia> chnl = Channel{Char}(1, spawn=true) do ch
           for c in "hello world"
               put!(ch, c)
           end
       end
Channel{Char}(1) (2 items available)

julia> String(collect(chnl))
"hello world"
source
Base.put!Method
put!(c::Channel, v)

Добавляет элемент v в канал c. Блокирует, если канал полон.

Для небухгалтерских каналов блокирует до тех пор, пока не будет выполнен take! другой задачей.

Julia 1.1

v теперь преобразуется в тип канала с помощью convert при вызове put!.

source
Base.take!Method
take!(c::Channel)

Удаляет и возвращает значение из Channel в порядке. Блокирует выполнение до тех пор, пока данные не станут доступны. Для небухгалтерских каналов блокирует выполнение до тех пор, пока не будет выполнен put! другой задачей.

Примеры

Буферизированный канал:

julia> c = Channel(1);

julia> put!(c, 1);

julia> take!(c)
1

Небуферизированный канал:

julia> c = Channel(0);

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);

julia> take!(c)
1
source
Base.isreadyMethod
isready(c::Channel)

Определяет, есть ли значение, хранящееся в Channel. Возвращает немедленно, не блокирует.

Для небухгалтерских каналов возвращает true, если есть задачи, ожидающие выполнения put!.

Примеры

Буферизированный канал:

julia> c = Channel(1);

julia> isready(c)
false

julia> put!(c, 1);

julia> isready(c)
true

Небуферизированный канал:

julia> c = Channel();

julia> isready(c)  # нет задач, ожидающих выполнения put!
false

julia> task = Task(() -> put!(c, 1));

julia> schedule(task);  # запланировать задачу put!

julia> isready(c)
true
source
Base.fetchMethod
fetch(c::Channel)

Ожидает и возвращает (без удаления) первый доступный элемент из Channel. Примечание: fetch не поддерживается для небухгалтерского (0-размерного) Channel.

Примеры

Буферизированный канал:

julia> c = Channel(3) do ch
           foreach(i -> put!(ch, i), 1:3)
       end;

julia> fetch(c)
1

julia> collect(c)  # элемент не удален
3-element Vector{Any}:
 1
 2
 3
source
Base.closeMethod
close(c::Channel[, excp::Exception])

Закрыть канал. Исключение (опционально заданное excp) выбрасывается при:

  • put! на закрытом канале.
  • take! и fetch на пустом, закрытом канале.
source
Base.bindMethod
bind(chnl::Channel, task::Task)

Ассоциирует время жизни chnl с задачей. Channel chnl автоматически закрывается, когда задача завершается. Любое необработанное исключение в задаче передается всем ожидающим на chnl.

Объект chnl можно явно закрыть независимо от завершения задачи. Завершающие задачи не влияют на уже закрытые объекты Channel.

Когда канал связан с несколькими задачами, первая завершившаяся задача закроет канал. Когда несколько каналов связаны с одной задачей, завершение задачи закроет все связанные каналы.

Примеры

julia> c = Channel(0);

julia> task = @async foreach(i->put!(c, i), 1:4);

julia> bind(c,task);

julia> for i in c
           @show i
       end;
i = 1
i = 2
i = 3
i = 4

julia> isopen(c)
false
julia> c = Channel(0);

julia> task = @async (put!(c, 1); error("foo"));

julia> bind(c, task);

julia> take!(c)
1

julia> put!(c, 1);
ERROR: TaskFailedException
Stacktrace:
[...]
    nested task error: foo
[...]
source

Low-level synchronization using schedule and wait

Самое простое правильное использование schedule происходит в Task, который еще не начат (запланирован). Однако возможно использовать 4d61726b646f776e2e436f64652822222c20227363686564756c652229_40726566 и wait в качестве очень низкоуровневого строительного блока для создания интерфейсов синхронизации. Ключевым предварительным условием для вызова schedule(task) является то, что вызывающий должен "владеть" task; т.е. он должен знать, что вызов wait в данном task происходит в местах, известных коду, вызывающему schedule(task). Одна из стратегий для обеспечения такого предварительного условия — использовать атомарные операции, как показано в следующем примере:

@enum OWEState begin
    OWE_EMPTY
    OWE_WAITING
    OWE_NOTIFYING
end

mutable struct OneWayEvent
    @atomic state::OWEState
    task::Task
    OneWayEvent() = new(OWE_EMPTY)
end

function Base.notify(ev::OneWayEvent)
    state = @atomic ev.state
    while state !== OWE_NOTIFYING
        # Spin until we successfully update the state to OWE_NOTIFYING:
        state, ok = @atomicreplace(ev.state, state => OWE_NOTIFYING)
        if ok
            if state == OWE_WAITING
                # OWE_WAITING -> OWE_NOTIFYING transition means that the waiter task is
                # already waiting or about to call `wait`. The notifier task must wake up
                # the waiter task.
                schedule(ev.task)
            else
                @assert state == OWE_EMPTY
                # Since we are assuming that there is only one notifier task (for
                # simplicity), we know that the other possible case here is OWE_EMPTY.
                # We do not need to do anything because we know that the waiter task has
                # not called `wait(ev::OneWayEvent)` yet.
            end
            break
        end
    end
    return
end

function Base.wait(ev::OneWayEvent)
    ev.task = current_task()
    state, ok = @atomicreplace(ev.state, OWE_EMPTY => OWE_WAITING)
    if ok
        # OWE_EMPTY -> OWE_WAITING transition means that the notifier task is guaranteed to
        # invoke OWE_WAITING -> OWE_NOTIFYING transition.  The waiter task must call
        # `wait()` immediately.  In particular, it MUST NOT invoke any function that may
        # yield to the scheduler at this point in code.
        wait()
    else
        @assert state == OWE_NOTIFYING
        # Otherwise, the `state` must have already been moved to OWE_NOTIFYING by the
        # notifier task.
    end
    return
end

ev = OneWayEvent()
@sync begin
    @async begin
        wait(ev)
        println("done")
    end
    println("notifying...")
    notify(ev)
end

# output
notifying...
done

OneWayEvent позволяет одной задаче ожидать уведомления от другой задачи. Это ограниченный интерфейс связи, так как ожидание может быть использовано только один раз из одной задачи (обратите внимание на неатомарное присваивание ev.task)

В этом примере notify(ev::OneWayEvent) разрешено вызывать schedule(ev.task), если и только если он изменяет состояние с OWE_WAITING на OWE_NOTIFYING. Это позволяет нам знать, что задача, выполняющая wait(ev::OneWayEvent), теперь находится в ветке ok, и не может быть других задач, которые пытаются вызвать schedule(ev.task), поскольку их @atomicreplace(ev.state, state => OWE_NOTIFYING) потерпит неудачу.