[RUBY] Technique de self-pipe utile pour le traitement IO.select

introduction

Je pense qu'il y a un problème de blocage en tant que problème lors du traitement IO.select. Le blocage est un phénomène qui se produit lorsqu'il n'y a pas de données dans le tampon et que le processus de lecture de données attend, ou lorsqu'il y a des données dans le tampon mais qu'il n'y a pas de données pour le nombre d'octets que vous souhaitez lire et que le processus de lecture de données attend. (Idem pour l'écriture). Si un blocage se produit dans le processus IO.select, le processus suivant sera attendu, ce qui est inefficace. Il existe une technique d'auto-pipe pour résoudre ce problème. L'histoire de la technique du self-pipe a commencé vers 1990 et est toujours utilisée dans les bibliothèques que nous utilisons quotidiennement. Technique d'auto-pipe

Exemples de blocage dans IO.select et solutions

Par exemple, si vous souhaitez effectuer un traitement IO.select avec ruby, vous pouvez l'écrire comme ceci en utilisant quelque chose appelé IO.pipe. Ceci est un exemple de blocage.

Exemple 1


eg_1 = -> {
 async_heavy_process = -> { puts "heavy" }
 
 r, w = IO.pipe
 fork { sleep 5; w.puts "hoge" }
 
 IO.select([r])
 async_heavy_process.()
 puts r.gets 
}
eg_1.()

Si vous ne restez pas debout pendant 5 secondes, le processus async_heavy_process ne s'exécutera pas et le programme sera inefficace. Un traitement lourd exécuté de manière asynchrone est quelque chose que vous souhaitez exécuter rapidement. Pensons à résoudre le problème. ʻSi certains des objets IO transmis à IO.select sont toujours prêts, ils ne seront pas bloqués par IO.select. `La technique du self-pipe est née de cette idée. Jetons un œil à l'exemple de code 2 qui est une version modifiée de «l'exemple 1» qui intègre cette idée.

Exemple 2


eg_2 = -> {
  async_heavy_process = -> { puts "heavy"; }
  self_reader, self_writer = IO.pipe
  self_writer.puts 0

  r, w = IO.pipe
  fork { sleep 5; w.puts "hoge" }

  IO.select([r, self_reader])
  async_heavy_process.()
  puts r.gets
}
eg_2.()

Dès que vous l'exécutez, «" heavy "» sera affiché et vous verrez qu'il n'est pas bloqué dans la partie IO.select. Je pense qu'il y a d'autres utilisations, mais si vous le savez, faites-le moi savoir.

Exemples de bibliothèques utilisant la technique du self-pipe

L'exemple présenté dans l'exemple est très simple et peu pratique, donc à la fin je vais vous montrer comment cette technique d'auto-pipe est utilisée dans une courte bibliothèque.

foreman

Foreman lit le fichier Procfile qui définit la commande que vous souhaitez démarrer, l'exécute dans plusieurs processus et la sortie standard (erreur standard) générée dans chaque processus est transmise au programme en cours d'exécution dans le processus principal via le tube et affichée comme sortie standard. C'est un outil. Plus précisément, quelque chose comme ça

Procfile


app: sleep 5 && echo 'app' && exit 1;          #Processus enfant 1
web: while :; do sleep 1 && echo 'web'; done;  #Processus enfant 2

bash


$ foreman start
00:57:43 app.1  | started with pid 21149              #Processus principal/Sortie dans le fil principal
00:57:43 web.1  | started with pid 21150              #Processus principal/Sortie dans le fil principal
00:57:44 web.1  | web                                 #Passez l'écrivain au processus enfant 2 pour écrire, et via le lecteur, le processus/Sortie sur le fil 2
00:57:45 web.1  | web                                 #Passez l'écrivain au processus enfant 2 pour écrire, et via le lecteur, le processus/Sortie sur le fil 2
00:57:46 web.1  | web                                 #Passez l'écrivain au processus enfant 2 pour écrire, et via le lecteur, le processus/Sortie sur le fil 2
00:57:47 web.1  | web                                 #Passez l'écrivain au processus enfant 2 pour écrire, et via le lecteur, le processus principal/Sortie sur le fil 2
00:57:48 app.1  | app                                 #Passez l'écrivain au processus enfant 1 pour écrire, et à travers le lecteur, le processus principal/Sortie sur le fil 2
00:57:48 web.1  | web                                 #Passez l'écrivain au processus enfant 1 pour écrire, et à travers le lecteur, le processus principal/Sortie sur le fil 2
00:57:48 app.1  | exited with code 1                  #Processus principal/Confirmer la fin du processus enfant 1 dans le thread 2
00:57:48 system | sending SIGTERM to all processes    #Processus principal/Sortie lorsque SIGTERM est envoyé du thread principal au processus enfant(SIGKILL pour Windows)
00:57:48 web.1  | terminated by SIGTERM               #Processus principal/Sortie lorsque la fin de tous les processus enfants est confirmée à partir du thread principal

Est. Quant au processus utilisé, il est utilisé dans le processus (wait_for_output) pour acquérir la sortie standard (erreur standard) du tube passé au processus enfant. C'est le code.

  # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L406-L420
  def watch_for_output
    Thread.new do
      begin
        loop do
          io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)
          read_self_pipe
          handle_signals
          handle_io(io ? io.first : [])
        end
      rescue Exception => ex
        puts ex.message
        puts ex.backtrace
      end
    end
  end
io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)

Si IO.select est bloqué de manière permanente pour une raison quelconque, le processus de vérification de confirmation de fin du processus enfant (wait_for_shutdown_or_child_termination), qui est le processus suivant de watch_for_output, ne sera pas exécuté. Cela signifie que cela finira. Cela signifie que le contremaître ne pourra pas tuer les processus enfants, ce qui est le pire des cas.

 # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L54-L63
 def start
    register_signal_handlers
    startup
    spawn_processes
    watch_for_output
    sleep 0.1
    wait_for_shutdown_or_child_termination
    shutdown
    exit(@exitstatus) if @exitstatus
  end

unicorn

Je n'ai pas lu le code attentivement, donc je n'entrerai pas trop dans les détails ici, mais si vous grep le code, vous pouvez voir que self-pipe est utilisé.

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L82-L91

    # We use @self_pipe differently in the master and worker processes:
    #
    # * The master process never closes or reinitializes this once
    # initialized.  Signal handlers in the master process will write to
    # it to wake up the master from IO.select in exactly the same manner
    # djb describes in https://cr.yp.to/docs/selfpipe.html
    #
    # * The workers immediately close the pipe they inherit.  See the
    # Unicorn::Worker class for the pipe workers use.
    @self_pipe = []

Et peut-être que ce processus permet d'éviter le blocage.

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L748

  def worker_loop(worker)
   #
   #réduction
   #
   ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
   #
   #réduction
   #
  end

Si vous êtes intéressé, vous pouvez lire le code.

Sommaire

Introduction d'une technique d'auto-pipe pour éviter le blocage qui se produit dans le traitement IO.select. C'est une technique qui est un peu utilisée dans les bibliothèques telles que contremaître et licorne, dont je m'occupe habituellement, alors pourquoi ne pas essayer de l'apprendre en ce moment? Il n'y avait pas d'article au Japon, alors j'en ai fait un article. Je pense que certains domaines ne peuvent être atteints, mais j'espère que cela sera utile.

référence

Recommended Posts

Technique de self-pipe utile pour le traitement IO.select
Utiliser MouseListener avec le traitement
Traitement d'écriture dans IntelliJ IDEA
Traitement parallèle mesuré avec Java
Traitement du jugement Mazume dans la pêche
Appui simultané sur une touche en traitement