Asynchronous Programming

Когда программе необходимо взаимодействовать с внешним миром, например, общаться с другой машиной через интернет, операции в программе могут происходить в непредсказуемом порядке. Допустим, ваша программа должна загрузить файл. Мы хотели бы инициировать операцию загрузки, выполнять другие операции, пока ждем ее завершения, а затем возобновить код, который нуждается в загруженном файле, когда он станет доступен. Этот тип сценария попадает в область асинхронного программирования, иногда также называемого конкурентным программированием (поскольку концептуально несколько вещей происходят одновременно).

Чтобы решить эти сценарии, Julia предоставляет Task (также известные под несколькими другими названиями, такими как симметричные корутины, легковесные потоки, кооперативная многозадачность или одноразовые продолжения). Когда часть вычислительной работы (на практике, выполнение определенной функции) обозначается как 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566, становится возможным прервать ее, переключившись на другую 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566. Исходный 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566 может быть позже возобновлен, в этот момент он продолжит с того места, где остановился. Сначала это может показаться похожим на вызов функции. Однако есть два ключевых отличия. Во-первых, переключение задач не использует никакого пространства, поэтому может происходить любое количество переключений задач без потребления стека вызовов. Во-вторых, переключение между задачами может происходить в любом порядке, в отличие от вызовов функций, где вызываемая функция должна завершить выполнение, прежде чем управление вернется к вызывающей функции.

Basic Task operations

Вы можете рассматривать Task как дескриптор для единицы вычислительной работы, которую необходимо выполнить. У него есть жизненный цикл создания-старта-запуска-завершения. Задачи создаются путем вызова конструктора Task на функции без аргументов, которую нужно выполнить, или с использованием макроса @task:

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

@task x эквивалентен Task(()->x).

Эта задача будет ждать пять секунд, а затем напечатает done. Однако она еще не начала выполняться. Мы можем запустить ее, когда будем готовы, вызвав schedule:

julia> schedule(t);

Если вы попробуете это в REPL, вы увидите, что schedule возвращает немедленно. Это происходит потому, что он просто добавляет t в внутреннюю очередь задач для выполнения. Затем REPL напечатает следующий запрос и будет ждать дальнейшего ввода. Ожидание ввода с клавиатуры предоставляет возможность для выполнения других задач, поэтому в этот момент t начнется. t вызывает sleep, который устанавливает таймер и останавливает выполнение. Если другие задачи были запланированы, они могут выполниться в это время. Через пять секунд таймер срабатывает и перезапускает t, и вы увидите, что напечатано done. t затем завершен.

Функция wait блокирует вызываемую задачу до тех пор, пока не завершится какая-то другая задача. Так что, например, если вы введете

julia> schedule(t); wait(t)

вместо того чтобы просто вызывать schedule, вы увидите пятиминутную паузу перед тем, как появится следующий запрос на ввод. Это происходит потому, что REPL ждет завершения t, прежде чем продолжить.

Обычно возникает желание создать задачу и запланировать её сразу, поэтому для этой цели предоставлен макрос @async –- @async x эквивалентно schedule(@task x).

Communicating with Channels

В некоторых задачах различные части необходимой работы не связаны между собой естественным образом через вызовы функций; среди задач, которые необходимо выполнить, нет очевидного "вызывающего" или "вызываемого". Примером является проблема производителя-потребителя, где одна сложная процедура генерирует значения, а другая сложная процедура их потребляет. Потребитель не может просто вызвать функцию производителя, чтобы получить значение, потому что производитель может иметь больше значений для генерации и, следовательно, может еще не быть готов вернуть результат. С помощью задач производитель и потребитель могут работать столько, сколько им нужно, передавая значения друг другу по мере необходимости.

Julia предоставляет механизм Channel для решения этой проблемы. 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 — это ожидаемая очередь с приоритетом, которая может иметь несколько задач, читающих из нее и записывающих в нее.

Давайте определим задачу производителя, которая производит значения через вызов put!. Чтобы потреблять значения, нам нужно запланировать выполнение производителя в новой задаче. Специальный конструктор Channel, который принимает функцию с одним аргументом в качестве аргумента, можно использовать для выполнения задачи, связанной с каналом. Затем мы можем take! значения повторно из объекта канала:

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

Один из способов рассмотреть это поведение заключается в том, что producer смог вернуть несколько раз. Между вызовами put! выполнение продюсера приостанавливается, и управление переходит к потребителю.

Возвращенный Channel может быть использован как итерируемый объект в цикле for, в котором переменная цикла принимает все сгенерированные значения. Цикл завершается, когда канал закрывается.

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

Обратите внимание, что нам не нужно было явно закрывать канал в производителе. Это связано с тем, что действие связывания Channel с Task ассоциирует открытый срок жизни канала с жизненным циклом связанной задачи. Объект канала автоматически закрывается, когда задача завершается. Несколько каналов могут быть связаны с задачей и наоборот.

В то время как конструктор Task ожидает функцию без аргументов, метод Channel, который создает канал, привязанный к задаче, ожидает функцию, принимающую один аргумент типа 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566. Распространенной схемой является параметризация производителя, в этом случае необходимо частичное применение функции для создания функции с 0 или 1 аргументом anonymous function.

Для Task объектов это можно сделать либо напрямую, либо с помощью удобной макроса:

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)

Чтобы организовать более сложные схемы распределения работы, bind и schedule могут использоваться вместе с конструкторами Task и Channel для явного связывания набора каналов с набором задач производитель/потребитель.

More on Channels

Канал можно представить как трубу, т.е. у него есть конец для записи и конец для чтения:

  • Несколько авторов в разных задачах могут одновременно писать в один и тот же канал через put! вызовы.

  • Несколько читателей в разных задачах могут одновременно считывать данные через take! вызовы.

  • В качестве примера:

    # Given Channels c1 and c2,
    c1 = Channel(32)
    c2 = Channel(32)
    
    # and a function `foo` which reads items from c1, processes the item read
    # and writes a result to c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # process data
            put!(c2, result)    # write out result
        end
    end
    
    # we can schedule `n` instances of `foo` to be active concurrently.
    for _ in 1:n
        errormonitor(@async foo())
    end
  • Каналы создаются с помощью конструктора Channel{T}(sz). Канал будет хранить только объекты типа T. Если тип не указан, канал может хранить объекты любого типа. sz относится к максимальному количеству элементов, которые могут храниться в канале в любой момент времени. Например, Channel(32) создает канал, который может хранить максимум 32 объекта любого типа. Channel{MyType}(64) может хранить до 64 объектов типа MyType в любой момент времени.

  • Если Channel пуст, читатели (при вызове take!) будут блокироваться до тех пор, пока данные не станут доступны.

  • Если Channel заполнен, писатели (при вызове put!) будут блокироваться до тех пор, пока не появится место.

  • isready тестирует наличие любого объекта в канале, в то время как wait ожидает, пока объект не станет доступным.

  • Channel находится в открытом состоянии изначально. Это означает, что к нему можно свободно обращаться для чтения и записи через вызовы take! и put!. close закрывает 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566. При закрытом 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 вызов 4d61726b646f776e2e436f64652822222c2022707574212229_40726566 завершится неудачей. Например:

    julia> c = Channel(2);
    
    julia> put!(c, 1) # `put!` on an open channel succeeds
    1
    
    julia> close(c);
    
    julia> put!(c, 2) # `put!` on a closed channel throws an exception.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]
  • take! и fetch (которые извлекают, но не удаляют значение) на закрытом канале успешно возвращают любые существующие значения, пока он не будет опустошен. Продолжая приведенный выше пример:

    julia> fetch(c) # Any number of `fetch` calls succeed.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # The first `take!` removes the value.
    1
    
    julia> take!(c) # No more data available on a closed channel.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]

Рассмотрим простой пример использования каналов для межзадачной коммуникации. Мы запускаем 4 задачи для обработки данных из единственного канала jobs. Задания, идентифицируемые по id (job_id), записываются в канал. Каждая задача в этой симуляции считывает job_id, ждет случайное количество времени и записывает обратно кортеж из job_id и смоделированного времени в канал результатов. В конце все results выводятся на экран.

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
           errormonitor(@async do_work())
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

Вместо errormonitor(t) более надежным решением может быть использование bind(results, t), так как это не только зафиксирует любые неожиданные сбои, но и заставит связанные ресурсы закрыться и распространит исключение повсюду.

More task operations

Операции задач построены на низкоуровневом примитиве, называемом yieldto. yieldto(task, value) приостанавливает текущую задачу, переключается на указанную task и заставляет последний вызов 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 этой задачи вернуть указанное value. Обратите внимание, что 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 является единственной операцией, необходимой для использования управления потоком в стиле задач; вместо вызова и возврата мы всегда просто переключаемся на другую задачу. Вот почему эта функция также называется "симметричные корутины"; каждая задача переключается на и с использованием одного и того же механизма.

yieldto мощный, но большинство использований задач не вызывает его напрямую. Подумайте, почему это может быть. Если вы переключитесь с текущей задачи, вы, вероятно, захотите вернуться к ней в какой-то момент, но знать, когда вернуться, и знать, какая задача несет ответственность за возврат, может потребовать значительной координации. Например, put! и take! являются блокирующими операциями, которые, когда используются в контексте каналов, поддерживают состояние, чтобы запомнить, кто являются потребителями. Не нужно вручную отслеживать потребляющую задачу — это то, что делает 4d61726b646f776e2e436f64652822222c2022707574212229_40726566 более удобным в использовании, чем низкоуровневый 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566.

В дополнение к yieldto, для эффективного использования задач необходимо несколько других базовых функций.

  • current_task получает ссылку на текущую выполняемую задачу.
  • istaskdone запрашивает, завершилась ли задача.
  • istaskstarted запрашивает, была ли задача выполнена.
  • task_local_storage манипулирует хранилищем ключ-значение, специфичным для текущей задачи.

Tasks and events

Большинство переключений задач происходит в результате ожидания событий, таких как запросы ввода-вывода, и выполняется планировщиком, включенным в Julia Base. Планировщик поддерживает очередь выполняемых задач и выполняет цикл событий, который перезапускает задачи на основе внешних событий, таких как поступление сообщений.

The basic function for waiting for an event is wait. Several objects implement wait; for example, given a Process object, wait will wait for it to exit. wait is often implicit; for example, a wait can happen inside a call to read to wait for data to be available.

Во всех этих случаях, wait в конечном итоге работает с объектом Condition, который отвечает за постановку задач в очередь и их перезапуск. Когда задача вызывает 4d61726b646f776e2e436f64652822222c2022776169742229_40726566 на 4d61726b646f776e2e436f64652822222c2022436f6e646974696f6e2229_40726566, задача помечается как неисполняемая, добавляется в очередь условия и переключается на планировщик. Затем планировщик выберет другую задачу для выполнения или будет заблокирован в ожидании внешних событий. Если все пройдет хорошо, в конечном итоге обработчик событий вызовет notify на условии, что приведет к тому, что задачи, ожидающие этого условия, снова станут исполняемыми.

Задача, созданная явно с помощью вызова Task, изначально не известна планировщику. Это позволяет вам управлять задачами вручную с помощью yieldto, если вы хотите. Однако, когда такая задача ожидает события, она все равно автоматически перезапускается, когда событие происходит, как вы и ожидали.