Asynchronous Programming
Когда программе необходимо взаимодействовать с внешним миром, например, общаться с другой машиной через интернет, операции в программе могут происходить в непредсказуемом порядке. Допустим, ваша программа должна загрузить файл. Мы хотели бы инициировать операцию загрузки, выполнять другие операции, пока ждем ее завершения, а затем возобновить код, который нуждается в загруженном файле, когда он станет доступен. Этот тип сценария попадает в область асинхронного программирования, иногда также называемого конкурентным программированием (поскольку концептуально несколько вещей происходят одновременно).
Чтобы решить эти сценарии, Julia предоставляет Task
(также известные под несколькими другими названиями, такими как симметричные корутины, легковесные потоки, кооперативная многозадачность или одноразовые продолжения). Когда часть вычислительной работы (на практике, выполнение определенной функции) обозначается как 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566
, становится возможным прервать ее, переключившись на другую 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566
. Исходный 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566
может быть позже возобновлен, в этот момент он продолжит с того места, где остановился. Сначала это может показаться похожим на вызов функции. Однако есть два ключевых отличия. Во-первых, переключение задач не использует никакого пространства, поэтому может происходить любое количество переключений задач без потребления стека вызовов. Во-вторых, переключение между задачами может происходить в любом порядке, в отличие от вызовов функций, где вызываемая функция должна завершить выполнение, прежде чем управление вернется к вызывающей функции.
Basic Task
operations
Вы можете рассматривать Task
как дескриптор для единицы вычислительной работы, которую необходимо выполнить. У него есть жизненный цикл создания-старта-запуска-завершения. Задачи создаются путем вызова конструктора Task
на функции без аргументов, которую нужно выполнить, или с использованием макроса @task
:
julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0
@task x
эквивалентен Task(()->x)
.
Эта задача будет ждать пять секунд, а затем напечатает done
. Однако она еще не начала выполняться. Мы можем запустить ее, когда будем готовы, вызвав schedule
:
julia> schedule(t);
Если вы попробуете это в REPL, вы увидите, что schedule
возвращает немедленно. Это происходит потому, что он просто добавляет t
в внутреннюю очередь задач для выполнения. Затем REPL напечатает следующий запрос и будет ждать дальнейшего ввода. Ожидание ввода с клавиатуры предоставляет возможность для выполнения других задач, поэтому в этот момент t
начнется. t
вызывает sleep
, который устанавливает таймер и останавливает выполнение. Если другие задачи были запланированы, они могут выполниться в это время. Через пять секунд таймер срабатывает и перезапускает t
, и вы увидите, что напечатано done
. t
затем завершен.
Функция wait
блокирует вызываемую задачу до тех пор, пока не завершится какая-то другая задача. Так что, например, если вы введете
julia> schedule(t); wait(t)
вместо того чтобы просто вызывать schedule
, вы увидите пятиминутную паузу перед тем, как появится следующий запрос на ввод. Это происходит потому, что REPL ждет завершения t
, прежде чем продолжить.
Обычно возникает желание создать задачу и запланировать её сразу, поэтому для этой цели предоставлен макрос @async
–- @async x
эквивалентно schedule(@task x)
.
Communicating with Channels
В некоторых задачах различные части необходимой работы не связаны между собой естественным образом через вызовы функций; среди задач, которые необходимо выполнить, нет очевидного "вызывающего" или "вызываемого". Примером является проблема производителя-потребителя, где одна сложная процедура генерирует значения, а другая сложная процедура их потребляет. Потребитель не может просто вызвать функцию производителя, чтобы получить значение, потому что производитель может иметь больше значений для генерации и, следовательно, может еще не быть готов вернуть результат. С помощью задач производитель и потребитель могут работать столько, сколько им нужно, передавая значения друг другу по мере необходимости.
Julia предоставляет механизм Channel
для решения этой проблемы. 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
— это ожидаемая очередь с приоритетом, которая может иметь несколько задач, читающих из нее и записывающих в нее.
Давайте определим задачу производителя, которая производит значения через вызов put!
. Чтобы потреблять значения, нам нужно запланировать выполнение производителя в новой задаче. Специальный конструктор Channel
, который принимает функцию с одним аргументом в качестве аргумента, можно использовать для выполнения задачи, связанной с каналом. Затем мы можем take!
значения повторно из объекта канала:
julia> function producer(c::Channel)
put!(c, "start")
for n=1:4
put!(c, 2n)
end
put!(c, "stop")
end;
julia> chnl = Channel(producer);
julia> take!(chnl)
"start"
julia> take!(chnl)
2
julia> take!(chnl)
4
julia> take!(chnl)
6
julia> take!(chnl)
8
julia> take!(chnl)
"stop"
Один из способов рассмотреть это поведение заключается в том, что producer
смог вернуть несколько раз. Между вызовами put!
выполнение продюсера приостанавливается, и управление переходит к потребителю.
Возвращенный Channel
может быть использован как итерируемый объект в цикле for
, в котором переменная цикла принимает все сгенерированные значения. Цикл завершается, когда канал закрывается.
julia> for x in Channel(producer)
println(x)
end
start
2
4
6
8
stop
Обратите внимание, что нам не нужно было явно закрывать канал в производителе. Это связано с тем, что действие связывания Channel
с Task
ассоциирует открытый срок жизни канала с жизненным циклом связанной задачи. Объект канала автоматически закрывается, когда задача завершается. Несколько каналов могут быть связаны с задачей и наоборот.
В то время как конструктор Task
ожидает функцию без аргументов, метод Channel
, который создает канал, привязанный к задаче, ожидает функцию, принимающую один аргумент типа 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
. Распространенной схемой является параметризация производителя, в этом случае необходимо частичное применение функции для создания функции с 0 или 1 аргументом anonymous function.
Для Task
объектов это можно сделать либо напрямую, либо с помощью удобной макроса:
function mytask(myarg)
...
end
taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)
Чтобы организовать более сложные схемы распределения работы, bind
и schedule
могут использоваться вместе с конструкторами Task
и Channel
для явного связывания набора каналов с набором задач производитель/потребитель.
More on Channels
Канал можно представить как трубу, т.е. у него есть конец для записи и конец для чтения:
Несколько авторов в разных задачах могут одновременно писать в один и тот же канал через
put!
вызовы.Несколько читателей в разных задачах могут одновременно считывать данные через
take!
вызовы.В качестве примера:
# Given Channels c1 and c2, c1 = Channel(32) c2 = Channel(32) # and a function `foo` which reads items from c1, processes the item read # and writes a result to c2, function foo() while true data = take!(c1) [...] # process data put!(c2, result) # write out result end end # we can schedule `n` instances of `foo` to be active concurrently. for _ in 1:n errormonitor(@async foo()) end
Каналы создаются с помощью конструктора
Channel{T}(sz)
. Канал будет хранить только объекты типаT
. Если тип не указан, канал может хранить объекты любого типа.sz
относится к максимальному количеству элементов, которые могут храниться в канале в любой момент времени. Например,Channel(32)
создает канал, который может хранить максимум 32 объекта любого типа.Channel{MyType}(64)
может хранить до 64 объектов типаMyType
в любой момент времени.Если
Channel
пуст, читатели (при вызовеtake!
) будут блокироваться до тех пор, пока данные не станут доступны.Если
Channel
заполнен, писатели (при вызовеput!
) будут блокироваться до тех пор, пока не появится место.isready
тестирует наличие любого объекта в канале, в то время какwait
ожидает, пока объект не станет доступным.Channel
находится в открытом состоянии изначально. Это означает, что к нему можно свободно обращаться для чтения и записи через вызовыtake!
иput!
.close
закрывает4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
. При закрытом4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566
вызов4d61726b646f776e2e436f64652822222c2022707574212229_40726566
завершится неудачей. Например:julia> c = Channel(2); julia> put!(c, 1) # `put!` on an open channel succeeds 1 julia> close(c); julia> put!(c, 2) # `put!` on a closed channel throws an exception. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
take!
иfetch
(которые извлекают, но не удаляют значение) на закрытом канале успешно возвращают любые существующие значения, пока он не будет опустошен. Продолжая приведенный выше пример:julia> fetch(c) # Any number of `fetch` calls succeed. 1 julia> fetch(c) 1 julia> take!(c) # The first `take!` removes the value. 1 julia> take!(c) # No more data available on a closed channel. ERROR: InvalidStateException: Channel is closed. Stacktrace: [...]
Рассмотрим простой пример использования каналов для межзадачной коммуникации. Мы запускаем 4 задачи для обработки данных из единственного канала jobs
. Задания, идентифицируемые по id (job_id
), записываются в канал. Каждая задача в этой симуляции считывает job_id
, ждет случайное количество времени и записывает обратно кортеж из job_id
и смоделированного времени в канал результатов. В конце все results
выводятся на экран.
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
errormonitor(@async do_work())
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
Вместо errormonitor(t)
более надежным решением может быть использование bind(results, t)
, так как это не только зафиксирует любые неожиданные сбои, но и заставит связанные ресурсы закрыться и распространит исключение повсюду.
More task operations
Операции задач построены на низкоуровневом примитиве, называемом yieldto
. yieldto(task, value)
приостанавливает текущую задачу, переключается на указанную task
и заставляет последний вызов 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566
этой задачи вернуть указанное value
. Обратите внимание, что 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566
является единственной операцией, необходимой для использования управления потоком в стиле задач; вместо вызова и возврата мы всегда просто переключаемся на другую задачу. Вот почему эта функция также называется "симметричные корутины"; каждая задача переключается на и с использованием одного и того же механизма.
yieldto
мощный, но большинство использований задач не вызывает его напрямую. Подумайте, почему это может быть. Если вы переключитесь с текущей задачи, вы, вероятно, захотите вернуться к ней в какой-то момент, но знать, когда вернуться, и знать, какая задача несет ответственность за возврат, может потребовать значительной координации. Например, put!
и take!
являются блокирующими операциями, которые, когда используются в контексте каналов, поддерживают состояние, чтобы запомнить, кто являются потребителями. Не нужно вручную отслеживать потребляющую задачу — это то, что делает 4d61726b646f776e2e436f64652822222c2022707574212229_40726566
более удобным в использовании, чем низкоуровневый 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566
.
В дополнение к yieldto
, для эффективного использования задач необходимо несколько других базовых функций.
current_task
получает ссылку на текущую выполняемую задачу.istaskdone
запрашивает, завершилась ли задача.istaskstarted
запрашивает, была ли задача выполнена.task_local_storage
манипулирует хранилищем ключ-значение, специфичным для текущей задачи.
Tasks and events
Большинство переключений задач происходит в результате ожидания событий, таких как запросы ввода-вывода, и выполняется планировщиком, включенным в Julia Base. Планировщик поддерживает очередь выполняемых задач и выполняет цикл событий, который перезапускает задачи на основе внешних событий, таких как поступление сообщений.
The basic function for waiting for an event is wait
. Several objects implement wait
; for example, given a Process
object, wait
will wait for it to exit. wait
is often implicit; for example, a wait
can happen inside a call to read
to wait for data to be available.
Во всех этих случаях, wait
в конечном итоге работает с объектом Condition
, который отвечает за постановку задач в очередь и их перезапуск. Когда задача вызывает 4d61726b646f776e2e436f64652822222c2022776169742229_40726566
на 4d61726b646f776e2e436f64652822222c2022436f6e646974696f6e2229_40726566
, задача помечается как неисполняемая, добавляется в очередь условия и переключается на планировщик. Затем планировщик выберет другую задачу для выполнения или будет заблокирован в ожидании внешних событий. Если все пройдет хорошо, в конечном итоге обработчик событий вызовет notify
на условии, что приведет к тому, что задачи, ожидающие этого условия, снова станут исполняемыми.
Задача, созданная явно с помощью вызова Task
, изначально не известна планировщику. Это позволяет вам управлять задачами вручную с помощью yieldto
, если вы хотите. Однако, когда такая задача ожидает события, она все равно автоматически перезапускается, когда событие происходит, как вы и ожидали.