Pouvoir exécuter plusieurs tâches en parallèle, que ce soit dans un script ou une application, peut être vraiment très utile, surtout dans le cas où le traitement de ces tâches peut être très long. Pour pallier ce problème, la plupart des langages de programmation nous offrent plusieurs solutions. Dans notre cas, nous utiliserons des threads. Pour ceux qui ne sont pas familiers avec l’utilisation des threads en Ruby, je vous invite à regarder cette vidéo qui traite du sujet.
Pourquoi utiliser une thread pool alors qu’on pourrait juste générer un nouveau thread pour chaque tâche ?
Effectivement, on pourrait se contenter de lancer un thread par tâche et d’attendre le résultat. Le souci avec cette approche c’est que si vous avez beaucoup de tâches à effectuer, vous allez surcharger votre machine qui a de grandes chances de planter.
Avant toute chose, si vous rencontrez des can't alloc thread (ThreadError)
en utilisant Ruby 2.5.0,
je vous conseille d’utiliser la version 2.4.2, 2.5.3, ou 2.6.0.
Je suppose que le problème est lié à ce bug
Nous allons d’abord écrire une classe qui représentera les différentes tâches à exécuter via nos threads.
class LongTask
def initialize(value)
@value = value
end
def run
sleep(10)
print "#{@value}\n"
end
end
C’est une tâche très simple, mais grâce à l’utilisation de sleep(10)
,
on s’assure qu’elle prendra au moins 10 secondes à s’exécuter ce qui nous permettra de mieux
visualiser le fonctionnement de notre thread pool.
Pour tester, vous pouvez déjà lancer quelques tâches :
En synchrone :
5.times do |i|
LongTask.new(i).run
end
En asynchrone :
tasks = []
threads = []
5.times do |i|
tasks << LongTask.new(i)
end
tasks.each do |task|
threads << Thread.new do
task.run
end
end
threads.each(&:join)
Comme vous avez pu le remarquer si vous avez testé les deux façons de faire présentées ci-dessus, lorsqu’on lance les tâches dans des threads on ne sait pas dans quel ordre elles seront exécutées, il faudra être conscient de ça dans l’utilisation de notre thread pool.
Pour notre thread pool nous aurons besoin d’un tableau pour stocker les threads en cours d’exécution, d’une file pour stocker les tâches en attentes et enfin d’une taille maximum, qui nous permettra justement d’éviter les problèmes qu’on pourrait causer en générant trop de threads en même temps. Je vous propose une implémentation très simple pour commencer :
class ThreadPool
def initialize(size: 10)
@size = size
@tasks = Queue.new
@pool = []
end
def schedule(*args, &block)
@tasks << [block, args]
end
end
Effectivement c’est simple, mais comment on l’utilise ?
Nous allons avoir besoin d’une méthode pour démarrer notre thread pool. Mais avant de la coder, il faut savoir ce qu’on va y faire. Donc l’objectif de notre classe sera d’attendre en continu la présence d’une ou plusieurs tâches dans la file puis de les dépiler pour les exécuter une par une dans des threads séparés dans la limite du nombre de threads disponibles™. Chaque thread aura la charge de s’enlever de notre thread pool une fois son traitement terminé.
Facile !
class ThreadPool
# ...
def start
Thread.new do
loop do
next if @pool.size >= @size
task, args = @tasks.pop
thread = Thread.new do
task.call(*args)
end_thread(thread)
end
@pool << thread
end
end
end
def end_thread(thread)
@pool.delete(thread)
thread.kill
end
# ...
end
Attention, à noter l’encapsulation du travail de la méthode dans un thread. Si on ne le fait pas notre méthode devient bloquante et impossible d’y ajouter des tâches après l’avoir lancée. Et si on testait pour voir ?
thread_pool = ThreadPool.new
thread_pool.start
15.times do |i|
thread_pool.schedule do
LongTask.new(i).run
end
end
Eh bien ? Il ne se passe rien…
Effectivement il ne se passe pas grand-chose, mais pourquoi ? Ici le souci c’est que l’on démarre notre thread pool en asynchrone qui attend d’avoir des tâches pour les effectuer. Le problème c’est que comme elle tourne en asynchrone, notre programme se termine avant qu’elle ait fini de jouer toutes ses tâches.
Ici c’est en fait un faux problème. Selon l’utilisation que vous aurez de votre thread pool on pourrait très bien s’arrêter ici. Par exemple, imaginons que j’aie une application qui écoute via la Gem listen un dossier donné, avec pour rôle de déplacer les fichiers entrants en suivant des règles spécifiques (dans des dossiers différents selon leurs noms par exemple). Cette application sera destinée à tourner sans s’arrêter et du coup le problème n’existe pas, puisqu’on ne terminera jamais volontairement l’application.
Mais pour rendre les choses plus intéressantes, je vous propose de moderniser un peu notre thread pool.
Très simple, il suffit de rajouter une méthode pour savoir si notre thread pool est active :
class ThreadPool
# ...
def inactive?
@tasks.empty? && @pool.empty?
end
# ...
end
Et ensuite, on vérifie régulièrement pour savoir si notre thread pool est toujours active, notre code de test devient donc :
thread_pool = ThreadPool.new
thread_pool.start
15.times do |i|
thread_pool.schedule do
LongTask.new(i).run
end
end
sleep(1) until thread_pool.inactive?
Avec notre implémentation, il n’est pas trop compliqué de gérer et reporter les erreurs qui
se produiraient lors du traitement d’une tâche. Pour cela nous allons encapsuler chaque lancement de tâche
et son thread associé dans un objet spécifique Processor
. Cela nous permettra d’identifier chaque tâche
qui a échoué et pourquoi. Ce Processor
aura connaissance de la tâche à effectuer, les arguments à lui passer,
son status (réussi ou raté), le thread dans lequel la tâche a été lancé, les éventuels messages d’erreur et enfin
un manager
qui est en fait notre thread pool. Nous en avons besoin pour pouvoir lui signifier qu’un traitement
est fini et le prévenir en cas d’erreur.
class Processor
attr_reader :thread
def initialize(task, args, manager)
@task = task
@args = args
@manager = manager
@success = false
@thread = nil
@error = nil
end
def run
@thread = Thread.new do
begin
@task.call(*args)
@success = true
rescue => e
@success = false
@error = e.message
@manager.add_failed_processor(self)
end
@manager.end_processor(self)
end
end
def success?
@success
end
def fail?
!success?
end
end
Ensuite on va devoir changer quelques petites choses dans notre thread pool.
D’abord, on va changer la méthode end_thread
qui deviendra end_processor
vu que l’on gère maintenant
des processeurs qui encapsulent nos threads et rajouter une méthode pour stocker les processeurs qui ont raté
class ThreadPool
# ...
def end_processor(processor)
@pool.delete(processor)
processor.thread.kill
end
def add_failed_processor(processor)
@failed_processors << processor
end
# ...
end
On pense à rajouter notre nouvelle variable d’instance @failed_processors = []
et
ensuite on change légèrement notre méthode start
de la thread pool :
class ThreadPool
# ...
def start
@waiting_thread = Thread.new do
loop do
next if @pool.size >= @size
task, args = @tasks.pop
processor = Processor.new(task, args, self)
processor.run
@pool << processor
end
end
end
# ...
end
Nous sommes fin prêts pour lancer tout ça et avoir un petit reporting à la fin.
Il faudra quand même rajouter un attr_reader :failed_processors
à notre thread pool.
thread_pool = ThreadPool.new
thread_pool.start
15.times do |i|
thread_pool.schedule do
LongTask.new(i).run
end
end
sleep(1) until thread_pool.inactive?
puts "Failed tasks: #{thread_pool.failed_processors}"
# Results =>
# ...
# 12
# 13
# 14
# Failed tasks: []
Évidemment il faut garder à l’esprit que c’est un choix d’implémentation parmi
tant d’autres, avec ses qualités et ses défauts.
J’ai choisi cette approche parce qu’elle me permet d’avoir quelque chose de simple
et de très facilement modulable, par exemple j’ai pu rajouter un système de Timeout
sur mes processeurs ainsi qu’un système de retry limité (4 retries maximum par exemple).
Pour voir une autre implémentation très intéressante, je vous invite à lire cet article dont je me suis pas mal inspiré (en anglais).
L’équipe Synbioz.
Libres d’être ensemble.
Nos conseils et ressources pour vos développements produit.