Asynchronous Programming

Lorsque un programme doit interagir avec le monde extérieur, par exemple en communiquant avec une autre machine via Internet, les opérations dans le programme peuvent devoir se dérouler dans un ordre imprévisible. Supposons que votre programme doive télécharger un fichier. Nous aimerions initier l'opération de téléchargement, effectuer d'autres opérations pendant que nous attendons qu'elle se termine, puis reprendre le code qui a besoin du fichier téléchargé lorsqu'il est disponible. Ce type de scénario relève du domaine de la programmation asynchrone, parfois également appelée programmation concurrente (puisque, conceptuellement, plusieurs choses se produisent en même temps).

Pour aborder ces scénarios, Julia fournit des Task (également connus sous plusieurs autres noms, tels que coroutines symétriques, threads légers, multitâche coopératif ou continuations à usage unique). Lorsqu'un travail de calcul (en pratique, l'exécution d'une fonction particulière) est désigné comme un 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566, il devient possible de l'interrompre en passant à un autre 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566. Le 4d61726b646f776e2e436f64652822222c20225461736b2229_40726566 original peut ensuite être repris, moment auquel il reprendra exactement là où il s'était arrêté. Au début, cela peut sembler similaire à un appel de fonction. Cependant, il y a deux différences clés. Premièrement, le changement de tâches n'utilise aucun espace, donc un nombre quelconque de changements de tâches peut se produire sans consommer la pile d'appels. Deuxièmement, le changement entre les tâches peut se produire dans n'importe quel ordre, contrairement aux appels de fonction, où la fonction appelée doit terminer son exécution avant que le contrôle ne retourne à la fonction appelante.

Basic Task operations

Vous pouvez considérer une Task comme un identifiant pour une unité de travail computationnel à réaliser. Elle a un cycle de vie de création-démarrage-exécution-terminaison. Les tâches sont créées en appelant le constructeur Task sur une fonction à 0 argument à exécuter, ou en utilisant la macro @task :

julia> t = @task begin; sleep(5); println("done"); end
Task (runnable) @0x00007f13a40c0eb0

@task x est équivalent à Task(()->x).

Cette tâche attendra cinq secondes, puis imprimera fait. Cependant, elle n'a pas encore commencé à s'exécuter. Nous pouvons l'exécuter quand nous sommes prêts en appelant schedule :

julia> schedule(t);

Si vous essayez cela dans le REPL, vous verrez que schedule retourne immédiatement. Cela est dû au fait qu'il ajoute simplement t à une file d'attente interne de tâches à exécuter. Ensuite, le REPL affichera l'invite suivante et attendra plus d'entrées. Attendre une entrée au clavier offre une opportunité pour que d'autres tâches s'exécutent, donc à ce moment-là, t commencera. t appelle sleep, qui définit un minuteur et arrête l'exécution. Si d'autres tâches ont été planifiées, elles pourraient s'exécuter alors. Après cinq secondes, le minuteur se déclenche et redémarre t, et vous verrez done affiché. t est alors terminé.

La fonction wait bloque la tâche appelante jusqu'à ce qu'une autre tâche se termine. Donc, par exemple, si vous tapez

julia> schedule(t); wait(t)

au lieu d'appeler uniquement schedule, vous verrez une pause de cinq secondes avant que le prochain invite d'entrée n'apparaisse. Cela est dû au fait que le REPL attend que t se termine avant de continuer.

Il est courant de vouloir créer une tâche et de la planifier immédiatement, donc la macro @async est fournie à cet effet –- @async x est équivalent à schedule(@task x).

Communicating with Channels

Dans certains problèmes, les différentes tâches requises ne sont pas naturellement liées par des appels de fonction ; il n'y a pas de "caller" ou de "callee" évident parmi les tâches à accomplir. Un exemple est le problème du producteur-consommateur, où une procédure complexe génère des valeurs et une autre procédure complexe les consomme. Le consommateur ne peut pas simplement appeler une fonction de producteur pour obtenir une valeur, car le producteur peut avoir d'autres valeurs à générer et peut donc ne pas être encore prêt à retourner. Avec des tâches, le producteur et le consommateur peuvent tous deux s'exécuter aussi longtemps qu'ils en ont besoin, passant des valeurs d'avant en arrière si nécessaire.

Julia fournit un mécanisme Channel pour résoudre ce problème. Un 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 est une file d'attente FIFO (premier entré, premier sorti) attendable qui peut avoir plusieurs tâches lisant et écrivant dedans.

Définissons une tâche de producteur, qui produit des valeurs via l'appel put!. Pour consommer des valeurs, nous devons planifier le producteur pour qu'il s'exécute dans une nouvelle tâche. Un constructeur spécial Channel qui accepte une fonction à un argument comme argument peut être utilisé pour exécuter une tâche liée à un canal. Nous pouvons ensuite take! des valeurs de manière répétée à partir de l'objet canal :

julia> function producer(c::Channel)
           put!(c, "start")
           for n=1:4
               put!(c, 2n)
           end
           put!(c, "stop")
       end;

julia> chnl = Channel(producer);

julia> take!(chnl)
"start"

julia> take!(chnl)
2

julia> take!(chnl)
4

julia> take!(chnl)
6

julia> take!(chnl)
8

julia> take!(chnl)
"stop"

Une façon de penser à ce comportement est que producer a pu retourner plusieurs fois. Entre les appels à put!, l'exécution du producteur est suspendue et le consommateur a le contrôle.

Le Channel retourné peut être utilisé comme un objet itérable dans une boucle for, auquel cas la variable de boucle prend toutes les valeurs produites. La boucle est terminée lorsque le canal est fermé.

julia> for x in Channel(producer)
           println(x)
       end
start
2
4
6
8
stop

Notez que nous n'avons pas eu besoin de fermer explicitement le canal dans le producteur. Cela est dû au fait que l'acte de lier un Channel à un Task associe la durée de vie ouverte d'un canal à celle de la tâche liée. L'objet canal est fermé automatiquement lorsque la tâche se termine. Plusieurs canaux peuvent être liés à une tâche, et vice versa.

Alors que le constructeur Task attend une fonction sans argument, la méthode Channel qui crée un canal lié à une tâche attend une fonction qui accepte un seul argument de type 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566. Un modèle courant est que le producteur soit paramétré, auquel cas une application partielle de fonction est nécessaire pour créer une fonction avec 0 ou 1 argument anonymous function.

Pour Task objets, cela peut être fait soit directement, soit en utilisant une macro de commodité :

function mytask(myarg)
    ...
end

taskHdl = Task(() -> mytask(7))
# or, equivalently
taskHdl = @task mytask(7)

Pour orchestrer des modèles de distribution de travail plus avancés, bind et schedule peuvent être utilisés en conjonction avec Task et Channel pour lier explicitement un ensemble de canaux avec un ensemble de tâches de producteur/consommateur.

More on Channels

Un canal peut être visualisé comme un tuyau, c'est-à-dire qu'il a une extrémité d'écriture et une extrémité de lecture :

  • Plusieurs écrivains dans différentes tâches peuvent écrire dans le même canal simultanément via des appels put!.

  • Plusieurs lecteurs dans différentes tâches peuvent lire des données de manière concurrente via des appels take!.

  • En tant qu'exemple :

    # Given Channels c1 and c2,
    c1 = Channel(32)
    c2 = Channel(32)
    
    # and a function `foo` which reads items from c1, processes the item read
    # and writes a result to c2,
    function foo()
        while true
            data = take!(c1)
            [...]               # process data
            put!(c2, result)    # write out result
        end
    end
    
    # we can schedule `n` instances of `foo` to be active concurrently.
    for _ in 1:n
        errormonitor(@async foo())
    end
  • Les canaux sont créés via le constructeur Channel{T}(sz). Le canal ne contiendra que des objets de type T. Si le type n'est pas spécifié, le canal peut contenir des objets de n'importe quel type. sz fait référence au nombre maximum d'éléments qui peuvent être contenus dans le canal à tout moment. Par exemple, Channel(32) crée un canal qui peut contenir un maximum de 32 objets de n'importe quel type. Un Channel{MyType}(64) peut contenir jusqu'à 64 objets de MyType à tout moment.

  • Si un Channel est vide, les lecteurs (sur un take! appel) bloqueront jusqu'à ce que des données soient disponibles.

  • Si un Channel est plein, les écrivains (sur un put! appel) seront bloqués jusqu'à ce qu'un espace devienne disponible.

  • isready teste la présence de tout objet dans le canal, tandis que wait attend qu'un objet devienne disponible.

  • Un Channel est dans un état ouvert initialement. Cela signifie qu'il peut être lu et écrit librement via take! et put! appels. close ferme un 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566. Sur un 4d61726b646f776e2e436f64652822222c20224368616e6e656c2229_40726566 fermé, 4d61726b646f776e2e436f64652822222c2022707574212229_40726566 échouera. Par exemple :

    julia> c = Channel(2);
    
    julia> put!(c, 1) # `put!` on an open channel succeeds
    1
    
    julia> close(c);
    
    julia> put!(c, 2) # `put!` on a closed channel throws an exception.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]
  • take! et fetch (qui récupère mais ne supprime pas la valeur) sur un canal fermé renvoient avec succès toutes les valeurs existantes jusqu'à ce qu'il soit vidé. En continuant l'exemple ci-dessus :

    julia> fetch(c) # Any number of `fetch` calls succeed.
    1
    
    julia> fetch(c)
    1
    
    julia> take!(c) # The first `take!` removes the value.
    1
    
    julia> take!(c) # No more data available on a closed channel.
    ERROR: InvalidStateException: Channel is closed.
    Stacktrace:
    [...]

Considérons un exemple simple utilisant des canaux pour la communication inter-tâches. Nous démarrons 4 tâches pour traiter des données à partir d'un seul canal jobs. Les travaux, identifiés par un id (job_id), sont écrits dans le canal. Chaque tâche dans cette simulation lit un job_id, attend un temps aléatoire et renvoie un tuple de job_id et du temps simulé au canal des résultats. Enfin, tous les results sont imprimés.

julia> const jobs = Channel{Int}(32);

julia> const results = Channel{Tuple}(32);

julia> function do_work()
           for job_id in jobs
               exec_time = rand()
               sleep(exec_time)                # simulates elapsed time doing actual work
                                               # typically performed externally.
               put!(results, (job_id, exec_time))
           end
       end;

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> errormonitor(@async make_jobs(n)); # feed the jobs channel with "n" jobs

julia> for i in 1:4 # start 4 tasks to process requests in parallel
           errormonitor(@async do_work())
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds")
           global n = n - 1
       end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311

Au lieu de errormonitor(t), une solution plus robuste pourrait être d'utiliser bind(results, t), car cela permettra non seulement de consigner les échecs inattendus, mais aussi de forcer la fermeture des ressources associées et de propager l'exception partout.

More task operations

Les opérations de tâche sont basées sur une primitive de bas niveau appelée yieldto. yieldto(task, value) suspend la tâche actuelle, passe à la task spécifiée et fait en sorte que le dernier appel de 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 de cette tâche retourne la value spécifiée. Remarquez que 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 est la seule opération requise pour utiliser le flux de contrôle de style tâche ; au lieu d'appeler et de retourner, nous passons toujours à une tâche différente. C'est pourquoi cette fonctionnalité est également appelée "coroutines symétriques" ; chaque tâche est commutée vers et depuis en utilisant le même mécanisme.

yieldto est puissant, mais la plupart des utilisations des tâches ne l'invoquent pas directement. Considérez pourquoi cela pourrait être. Si vous passez à une autre tâche, vous voudrez probablement revenir à celle-ci à un moment donné, mais savoir quand revenir et savoir quelle tâche a la responsabilité de revenir peut nécessiter une coordination considérable. Par exemple, put! et take! sont des opérations bloquantes, qui, lorsqu'elles sont utilisées dans le contexte des canaux, maintiennent un état pour se souvenir de qui sont les consommateurs. Ne pas avoir besoin de suivre manuellement la tâche consommatrice est ce qui rend 4d61726b646f776e2e436f64652822222c2022707574212229_40726566 plus facile à utiliser que le 4d61726b646f776e2e436f64652822222c20227969656c64746f2229_40726566 de bas niveau.

En plus de yieldto, quelques autres fonctions de base sont nécessaires pour utiliser les tâches efficacement.

  • current_task obtient une référence à la tâche actuellement en cours d'exécution.
  • istaskdone interroge si une tâche a quitté.
  • istaskstarted interroge si une tâche a déjà été exécutée.
  • task_local_storage manipule un magasin de clés-valeurs spécifique à la tâche actuelle.

Tasks and events

La plupart des changements de tâche se produisent en raison de l'attente d'événements tels que des requêtes d'E/S, et sont effectués par un planificateur inclus dans Julia Base. Le planificateur maintient une file d'attente de tâches exécutables et exécute une boucle d'événements qui redémarre les tâches en fonction d'événements externes tels que l'arrivée de messages.

The basic function for waiting for an event is wait. Several objects implement wait; for example, given a Process object, wait will wait for it to exit. wait is often implicit; for example, a wait can happen inside a call to read to wait for data to be available.

Dans tous ces cas, wait opère finalement sur un objet Condition, qui est chargé de mettre en file d'attente et de redémarrer les tâches. Lorsqu'une tâche appelle 4d61726b646f776e2e436f64652822222c2022776169742229_40726566 sur un 4d61726b646f776e2e436f64652822222c2022436f6e646974696f6e2229_40726566, la tâche est marquée comme non exécutable, ajoutée à la file d'attente de la condition, et passe au planificateur. Le planificateur choisira alors une autre tâche à exécuter, ou se bloquera en attendant des événements externes. Si tout se passe bien, finalement un gestionnaire d'événements appellera notify sur la condition, ce qui fait que les tâches attendant cette condition redeviennent exécutables.

Une tâche créée explicitement en appelant Task n'est initialement pas connue du planificateur. Cela vous permet de gérer les tâches manuellement en utilisant yieldto si vous le souhaitez. Cependant, lorsqu'une telle tâche attend un événement, elle est toujours redémarrée automatiquement lorsque l'événement se produit, comme vous pouvez l'attendre.