[RUBY] Self-Pipe-Technik, die bei der IO.select-Verarbeitung nützlich ist

Einführung

Ich denke, es gibt ein Blockierungsproblem als Problem bei der IO.select-Verarbeitung. Das Blockieren ist ein Phänomen, das auftritt, wenn sich keine Daten im Puffer befinden und der Datenleseprozess wartet, oder wenn sich Daten im Puffer befinden, aber keine Daten für die Anzahl der zu lesenden Bytes vorhanden sind und der Datenleseprozess wartet. (Gleiches gilt für das Schreiben). Wenn im IO.select-Prozess eine Blockierung auftritt, wird auf den nachfolgenden Prozess gewartet, was ineffizient ist. Es gibt eine Self-Pipe-Technik, um dieses Problem zu lösen. Die Geschichte der Selbstpfeifentechnik begann um 1990 und wird immer noch in den Bibliotheken verwendet, die wir jeden Tag benutzen. Self-Pipe-Technik

Beispiele für das Blockieren in IO.select und Lösungen

Wenn Sie beispielsweise die IO.select-Verarbeitung mit Ruby durchführen möchten, können Sie sie wie folgt mit IO.pipe schreiben. Dies ist ein Beispiel für das Blockieren.

Beispiel 1


eg_1 = -> {
 async_heavy_process = -> { puts "heavy" }
 
 r, w = IO.pipe
 fork { sleep 5; w.puts "hoge" }
 
 IO.select([r])
 async_heavy_process.()
 puts r.gets 
}
eg_1.()

Wenn Sie 5 Sekunden lang nicht stehen, wird der Prozess async_heavy_process nicht ausgeführt und das Programm ist ineffizient. Schwere Verarbeitung, die asynchron ausgeführt wird, möchten Sie schnell ausführen. Lassen Sie uns über die Lösung des Problems nachdenken. Wenn einige der an IO.select übergebenen E / A-Objekte immer bereit sind, werden sie von IO.select nicht blockiert. Die Self-Pipe-Technik wurde aus dieser Idee geboren. Werfen wir einen Blick auf Codebeispiel 2, eine modifizierte Version von "Beispiel 1", in die diese Idee eingebettet ist.

Beispiel 2


eg_2 = -> {
  async_heavy_process = -> { puts "heavy"; }
  self_reader, self_writer = IO.pipe
  self_writer.puts 0

  r, w = IO.pipe
  fork { sleep 5; w.puts "hoge" }

  IO.select([r, self_reader])
  async_heavy_process.()
  puts r.gets
}
eg_2.()

Sobald Sie es ausführen, wird "schwer" angezeigt und Sie werden sehen, dass es im IO.select-Teil nicht blockiert ist. Ich denke, es gibt andere Verwendungszwecke, aber wenn Sie es wissen, lassen Sie es mich bitte wissen.

Beispiele für Bibliotheken, die die Self-Pipe-Technik verwenden

Das im Beispiel vorgestellte Beispiel ist sehr einfach und unpraktisch, daher werde ich Ihnen schließlich zeigen, wie diese Selbstpfeifentechnik in einer kurzen Bibliothek verwendet wird.

foreman

Foreman liest die Procfile, die den Befehl definiert, den Sie starten möchten, führt ihn in mehreren Prozessen aus, und die in jedem Prozess generierte Standardausgabe (Standardfehler) wird über die Pipe an das im Hauptprozess ausgeführte Programm übergeben und als Standardausgabe angezeigt. Es ist ein Werkzeug. Speziell so etwas

Procfile


app: sleep 5 && echo 'app' && exit 1;          #Untergeordneter Prozess 1
web: while :; do sleep 1 && echo 'web'; done;  #Untergeordneter Prozess 2

bash


$ foreman start
00:57:43 app.1  | started with pid 21149              #Hauptprozess/Ausgabe im Haupt-Thread
00:57:43 web.1  | started with pid 21150              #Hauptprozess/Ausgabe im Haupt-Thread
00:57:44 web.1  | web                                 #Übergeben Sie den Schreiber an den untergeordneten Prozess 2, um den Hauptprozess zu schreiben, und über den Leser/Ausgabe auf Thread 2
00:57:45 web.1  | web                                 #Übergeben Sie den Schreiber an den untergeordneten Prozess 2, um den Hauptprozess zu schreiben, und über den Leser/Ausgabe auf Thread 2
00:57:46 web.1  | web                                 #Übergeben Sie den Schreiber an den untergeordneten Prozess 2, um den Hauptprozess zu schreiben, und über den Leser/Ausgabe auf Thread 2
00:57:47 web.1  | web                                 #Übergeben Sie den Schreiber an den untergeordneten Prozess 2, um den Hauptprozess zu schreiben, und über den Leser/Ausgabe auf Thread 2
00:57:48 app.1  | app                                 #Übergeben Sie den Schreiber an den untergeordneten Prozess 1, um den Hauptprozess zu schreiben, und über den Leser/Ausgabe auf Thread 2
00:57:48 web.1  | web                                 #Übergeben Sie den Schreiber an den untergeordneten Prozess 1, um den Hauptprozess zu schreiben, und über den Leser/Ausgabe auf Thread 2
00:57:48 app.1  | exited with code 1                  #Hauptprozess/Bestätigen Sie die Beendigung des untergeordneten Prozesses 1 in Thread 2
00:57:48 system | sending SIGTERM to all processes    #Hauptprozess/Ausgabe, wenn SIGTERM vom Hauptthread an den untergeordneten Prozess gesendet wird(SIGKILL für Fenster)
00:57:48 web.1  | terminated by SIGTERM               #Hauptprozess/Ausgabe, wenn die Beendigung aller untergeordneten Prozesse vom Hauptthread bestätigt wird

Ist. Welcher Prozess verwendet wird, wird im Prozess (wait_for_output) verwendet, um die Standardausgabe (Standardfehler) aus der an den untergeordneten Prozess übergebenen Pipe abzurufen. Dies ist der Code.

  # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L406-L420
  def watch_for_output
    Thread.new do
      begin
        loop do
          io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)
          read_self_pipe
          handle_signals
          handle_io(io ? io.first : [])
        end
      rescue Exception => ex
        puts ex.message
        puts ex.backtrace
      end
    end
  end
io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)

Was würde ohne diese Selbstleitung passieren? Wenn IO.select aus irgendeinem Grund dauerhaft blockiert ist, wird der Überprüfungsprozess zur Bestätigung der Beendigung des untergeordneten Prozesses (wait_for_shutdown_or_child_termination), der der nachfolgende Prozess von watch_for_output ist, nicht ausgeführt. Es bedeutet, dass es enden wird. Das bedeutet, dass der Vorarbeiter keine untergeordneten Prozesse abbrechen kann, was der schlimmste Fall ist.

 # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L54-L63
 def start
    register_signal_handlers
    startup
    spawn_processes
    watch_for_output
    sleep 0.1
    wait_for_shutdown_or_child_termination
    shutdown
    exit(@exitstatus) if @exitstatus
  end

unicorn

Ich habe den Code nicht sorgfältig gelesen, damit ich hier nicht zu sehr ins Detail gehe, aber wenn Sie den Code lesen, können Sie sehen, dass Self-Pipe verwendet wird.

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L82-L91

    # We use @self_pipe differently in the master and worker processes:
    #
    # * The master process never closes or reinitializes this once
    # initialized.  Signal handlers in the master process will write to
    # it to wake up the master from IO.select in exactly the same manner
    # djb describes in https://cr.yp.to/docs/selfpipe.html
    #
    # * The workers immediately close the pipe they inherit.  See the
    # Unicorn::Worker class for the pipe workers use.
    @self_pipe = []

Und vielleicht hilft dieser Prozess, Blockierungen zu vermeiden.

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L748

  def worker_loop(worker)
   #
   #Kürzung
   #
   ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
   #
   #Kürzung
   #
  end

Wenn Sie interessiert sind, können Sie den Code lesen.

Zusammenfassung

Einführung einer Self-Pipe-Technik zur Vermeidung von Blockierungen bei der IO.select-Verarbeitung. Diese Technik wird häufig in Bibliotheken wie Vorarbeiter und Einhorn verwendet, um die ich mich normalerweise kümmere. Warum also nicht gleich versuchen, sie zu lernen? Da es in Japan keinen Artikel gab, habe ich daraus einen Artikel gemacht. Ich denke, es gibt einige Bereiche, die nicht erreicht werden können, aber ich hoffe, dass es nützlich sein wird.

Referenz

Recommended Posts

Self-Pipe-Technik, die bei der IO.select-Verarbeitung nützlich ist
Verwenden Sie MouseListener für die Verarbeitung
Schreibverarbeitung in IntelliJ IDEA
Gemessene Parallelverarbeitung mit Java
Mazume Urteilsverarbeitung in der Fischerei
Gleichzeitiger Tastendruck in der Verarbeitung