Écrire une thread pool en Ruby

Publié le 10 janvier 2019 par Hugo Fabre | back

Cet article est publié sous licence CC BY-NC-SA

Pouvoir exécuter plusieurs tâches en parallèle, que ce soit dans un script ou une application, peut être vraiment très utile, surtout dans le cas où le traitement de ces tâches peut être très long. Pour pallier ce problème, la plupart des langages de programmation nous offrent plusieurs solutions. Dans notre cas, nous utiliserons des threads. Pour ceux qui ne sont pas familiers avec l’utilisation des threads en Ruby, je vous invite à regarder cette vidéo qui traite du sujet.

Pourquoi une thread pool ?

Pourquoi utiliser une thread pool alors qu’on pourrait juste générer un nouveau thread pour chaque tâche ?

Effectivement, on pourrait se contenter de lancer un thread par tâche et d’attendre le résultat. Le souci avec cette approche c’est que si vous avez beaucoup de tâches à effectuer, vous allez surcharger votre machine qui a de grandes chances de planter.

C’est parti !

Avant toute chose, si vous rencontrez des can't alloc thread (ThreadError) en utilisant Ruby 2.5.0, je vous conseille d’utiliser la version 2.4.2, 2.5.3, ou 2.6.0. Je suppose que le problème est lié à ce bug

Les tâches à effectuer

Nous allons d’abord écrire une classe qui représentera les différentes tâches à exécuter via nos threads.

class LongTask
  def initialize(value)
    @value = value
  end

  def run
    sleep(10)
    print "#{@value}\n"
  end
end

C’est une tâche très simple, mais grâce à l’utilisation de sleep(10), on s’assure qu’elle prendra au moins 10 secondes à s’exécuter ce qui nous permettra de mieux visualiser le fonctionnement de notre thread pool.

Pour tester, vous pouvez déjà lancer quelques tâches :

En synchrone :

5.times do |i|
  LongTask.new(i).run
end

En asynchrone :

tasks = []
threads = []

5.times do |i|
  tasks << LongTask.new(i)
end

tasks.each do |task|
  threads << Thread.new do
    task.run
  end
end

threads.each(&:join)

Comme vous avez pu le remarquer si vous avez testé les deux façons de faire présentées ci-dessus, lorsqu’on lance les tâches dans des threads on ne sait pas dans quel ordre elles seront exécutées, il faudra être conscient de ça dans l’utilisation de notre thread pool.

La thread pool

Pour notre thread pool nous aurons besoin d’un tableau pour stocker les threads en cours d’exécution, d’une file pour stocker les tâches en attentes et enfin d’une taille maximum, qui nous permettra justement d’éviter les problèmes qu’on pourrait causer en générant trop de threads en même temps. Je vous propose une implémentation très simple pour commencer :

class ThreadPool
  def initialize(size: 10)
    @size = size
    @tasks = Queue.new
    @pool = []
  end

  def schedule(*args, &block)
    @tasks << [block, args]
  end
end

Effectivement c’est simple, mais comment on l’utilise ?

Nous allons avoir besoin d’une méthode pour démarrer notre thread pool. Mais avant de la coder, il faut savoir ce qu’on va y faire. Donc l’objectif de notre classe sera d’attendre en continu la présence d’une ou plusieurs tâches dans la file puis de les dépiler pour les exécuter une par une dans des threads séparés dans la limite du nombre de threads disponibles™. Chaque thread aura la charge de s’enlever de notre thread pool une fois son traitement terminé.

Facile !

class ThreadPool
  # ...

  def start
    Thread.new do
      loop do
        next if @pool.size >= @size

        task, args = @tasks.pop
        thread = Thread.new do
          task.call(*args)
          end_thread(thread)
        end
        @pool << thread
      end
    end
  end

  def end_thread(thread)
    @pool.delete(thread)
    thread.kill
  end
  # ...
end

Attention, à noter l’encapsulation du travail de la méthode dans un thread. Si on ne le fait pas notre méthode devient bloquante et impossible d’y ajouter des tâches après l’avoir lancée. Et si on testait pour voir ?

thread_pool = ThreadPool.new
thread_pool.start

15.times do |i|
  thread_pool.schedule do
    LongTask.new(i).run
  end
end

Eh bien ? Il ne se passe rien…

Effectivement il ne se passe pas grand-chose, mais pourquoi ? Ici le souci c’est que l’on démarre notre thread pool en asynchrone qui attend d’avoir des tâches pour les effectuer. Le problème c’est que comme elle tourne en asynchrone, notre programme se termine avant qu’elle ait fini de jouer toutes ses tâches.

Ici c’est en fait un faux problème. Selon l’utilisation que vous aurez de votre thread pool on pourrait très bien s’arrêter ici. Par exemple, imaginons que j’aie une application qui écoute via la Gem listen un dossier donné, avec pour rôle de déplacer les fichiers entrants en suivant des règles spécifiques (dans des dossiers différents selon leurs noms par exemple). Cette application sera destinée à tourner sans s’arrêter et du coup le problème n’existe pas, puisqu’on ne terminera jamais volontairement l’application.

Mais pour rendre les choses plus intéressantes, je vous propose de moderniser un peu notre thread pool.

Aller plus loin

Attendre que la thread pool n’ait plus de tâches à effectuer pour s’arrêter

Très simple, il suffit de rajouter une méthode pour savoir si notre thread pool est active :

class ThreadPool
  # ...

  def inactive?
    @tasks.empty? && @pool.empty?
  end

  # ...
end

Et ensuite, on vérifie régulièrement pour savoir si notre thread pool est toujours active, notre code de test devient donc :

thread_pool = ThreadPool.new
thread_pool.start

15.times do |i|
  thread_pool.schedule do
    LongTask.new(i).run
  end
end

sleep(1) until thread_pool.inactive?

Gestion d’erreur

Avec notre implémentation, il n’est pas trop compliqué de gérer et reporter les erreurs qui se produiraient lors du traitement d’une tâche. Pour cela nous allons encapsuler chaque lancement de tâche et son thread associé dans un objet spécifique Processor. Cela nous permettra d’identifier chaque tâche qui a échoué et pourquoi. Ce Processor aura connaissance de la tâche à effectuer, les arguments à lui passer, son status (réussi ou raté), le thread dans lequel la tâche a été lancé, les éventuels messages d’erreur et enfin un manager qui est en fait notre thread pool. Nous en avons besoin pour pouvoir lui signifier qu’un traitement est fini et le prévenir en cas d’erreur.

class Processor
  attr_reader :thread

  def initialize(task, args, manager)
    @task = task
    @args = args
    @manager = manager
    @success = false
    @thread = nil
    @error = nil
  end

  def run
    @thread = Thread.new do
      begin
        @task.call(*args)
        @success = true
      rescue => e
        @success = false
        @error = e.message
        @manager.add_failed_processor(self)
      end

      @manager.end_processor(self)
    end
  end

  def success?
    @success
  end

  def fail?
    !success?
  end
end

Ensuite on va devoir changer quelques petites choses dans notre thread pool. D’abord, on va changer la méthode end_thread qui deviendra end_processor vu que l’on gère maintenant des processeurs qui encapsulent nos threads et rajouter une méthode pour stocker les processeurs qui ont raté

class ThreadPool
  # ...

  def end_processor(processor)
    @pool.delete(processor)
    processor.thread.kill
  end

  def add_failed_processor(processor)
    @failed_processors << processor
  end

  # ...
end

On pense à rajouter notre nouvelle variable d’instance @failed_processors = [] et ensuite on change légèrement notre méthode start de la thread pool :

class ThreadPool
  # ...

  def start
    @waiting_thread = Thread.new do
      loop do
        next if @pool.size >= @size

        task, args = @tasks.pop
        processor = Processor.new(task, args, self)
        processor.run
        @pool << processor
      end
    end
  end

  # ...
end

Nous sommes fin prêts pour lancer tout ça et avoir un petit reporting à la fin. Il faudra quand même rajouter un attr_reader :failed_processors à notre thread pool.

thread_pool = ThreadPool.new
thread_pool.start

15.times do |i|
  thread_pool.schedule do
    LongTask.new(i).run
  end
end

sleep(1) until thread_pool.inactive?
puts "Failed tasks: #{thread_pool.failed_processors}"

# Results =>
# ...
# 12
# 13
# 14
# Failed tasks: []

C’est fini !

Évidemment il faut garder à l’esprit que c’est un choix d’implémentation parmi tant d’autres, avec ses qualités et ses défauts. J’ai choisi cette approche parce qu’elle me permet d’avoir quelque chose de simple et de très facilement modulable, par exemple j’ai pu rajouter un système de Timeout sur mes processeurs ainsi qu’un système de retry limité (4 retries maximum par exemple).

Pour voir une autre implémentation très intéressante, je vous invite à lire cet article dont je me suis pas mal inspiré (en anglais).


L’équipe Synbioz.
Libres d’être ensemble.