Self-pipe technique useful in IO.select processing

Introduction

I think there is a blocking problem as a problem when doing IO.select processing. Blocking is a phenomenon that occurs when there is no data in the buffer and the data read process waits, or when there is data in the buffer but there is no data for the number of bytes you want to read and the data read process waits. (Same for writing). If blocking occurs in IO.select processing, the subsequent processing will be waited and it is inefficient. Self-pipe technique is a way to solve this problem. The history of self-pipe technique began around 1990 and is still used in the libraries we use every day. Self-pipe technique

Example of blocking in IO.select and solution

For example, if you want to perform IO.select processing with ruby, you can write it like this using something called IO.pipe. This is an example of blocking.

Example 1


eg_1 = -> {
 async_heavy_process = -> { puts "heavy" }
 
 r, w = IO.pipe
 fork { sleep 5; w.puts "hoge" }
 
 IO.select([r])
 async_heavy_process.()
 puts r.gets 
}
eg_1.()

If you do not stand for 5 seconds, the async_heavy_process process will not run and the program will be inefficient. Heavy processing that is executed asynchronously is something that you want to execute quickly. Let's think about solving the problem. ╩╗If some of the IO objects passed to IO.select are always ready, they will not be blocked by IO.select. The self-pipe technique was born from this idea. Let's take a look at Code Example 2 which is a modified version of Example 1` that embeds this idea.

Example 2


eg_2 = -> {
  async_heavy_process = -> { puts "heavy"; }
  self_reader, self_writer = IO.pipe
  self_writer.puts 0

  r, w = IO.pipe
  fork { sleep 5; w.puts "hoge" }

  IO.select([r, self_reader])
  async_heavy_process.()
  puts r.gets
}
eg_2.()

As soon as you execute it, " heavy " is displayed and you can see that it is not blocked in the IO.select part. I think there are other uses, but if you know it, please let me know.

Examples of libraries that use the self-pipe technique

The example presented in the example is very simple and impractical, so I'll finally show you how this self-pipe technique is used in a short library.

foreman

foreman reads the Procfile that defines the command you want to start, executes it in multiple processes, and the standard output (standard error) generated in each process is passed to the program running in the main process through a pipe and displayed as standard output. It is a tool. Specifically, something like this

Procfile


app: sleep 5 && echo 'app' && exit 1;          #Child process 1
web: while :; do sleep 1 && echo 'web'; done;  #Child process 2

bash


$ foreman start
00:57:43 app.1  | started with pid 21149              #Main process/Output in main thread
00:57:43 web.1  | started with pid 21150              #Main process/Output in main thread
00:57:44 web.1  | web                                 #Pass the writer to the child process 2 to write, and through the reader, the main process/Output on thread 2
00:57:45 web.1  | web                                 #Pass the writer to the child process 2 to write, and through the reader, the main process/Output on thread 2
00:57:46 web.1  | web                                 #Pass the writer to the child process 2 to write, and through the reader, the main process/Output on thread 2
00:57:47 web.1  | web                                 #Pass the writer to the child process 2 to write, and through the reader, the main process/Output on thread 2
00:57:48 app.1  | app                                 #Pass the writer to the child process 1 to write, and through the reader, the main process/Output on thread 2
00:57:48 web.1  | web                                 #Pass the writer to the child process 1 to write, and through the reader, the main process/Output on thread 2
00:57:48 app.1  | exited with code 1                  #Main process/Confirm the termination of child process 1 in thread 2
00:57:48 system | sending SIGTERM to all processes    #Main process/Output when SIGTERM is sent from the main thread to the child process(SIGKILL for windows)
00:57:48 web.1  | terminated by SIGTERM               #Main process/Output when the terminated of all child processes is confirmed from the main thread

Is. As for what process is used in, it is used in the process (wait_for_output) to acquire the standard output (standard error) from the pipe passed to the child process. This is the code.

  # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L406-L420
  def watch_for_output
    Thread.new do
      begin
        loop do
          io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)
          read_self_pipe
          handle_signals
          handle_io(io ? io.first : [])
        end
      rescue Exception => ex
        puts ex.message
        puts ex.backtrace
      end
    end
  end
io = IO.select([@selfpipe[:reader]] + @readers.values, nil, nil, 30)

What happens without this self-pipe? If IO.select is permanently blocked for some reason, the child process termination confirmation check process (wait_for_shutdown_or_child_termination), which is the subsequent process of watch_for_output, will not be executed. It means that it will end up. That means that foreman will not be able to kill child processes, which is the worst case.

 # https://github.com/ddollar/foreman/blob/5b815c5d8077511664a712aca90b070229ca6413/lib/foreman/engine.rb#L54-L63
 def start
    register_signal_handlers
    startup
    spawn_processes
    watch_for_output
    sleep 0.1
    wait_for_shutdown_or_child_termination
    shutdown
    exit(@exitstatus) if @exitstatus
  end

unicorn

I haven't read the code carefully so I won't go into too much detail here, but if you grep the code you can see that self-pipe is used.

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L82-L91

    # We use @self_pipe differently in the master and worker processes:
    #
    # * The master process never closes or reinitializes this once
    # initialized.  Signal handlers in the master process will write to
    # it to wake up the master from IO.select in exactly the same manner
    # djb describes in https://cr.yp.to/docs/selfpipe.html
    #
    # * The workers immediately close the pipe they inherit.  See the
    # Unicorn::Worker class for the pipe workers use.
    @self_pipe = []

And maybe this process helps to avoid blocking.

# https://github.com/defunkt/unicorn/blob/2c347116305338710331d238fefa23f00e98cf54/lib/unicorn/http_server.rb#L748

  def worker_loop(worker)
   #
   #abridgement
   #
   ret = IO.select(readers, nil, nil, @timeout) and ready = ret[0]
   #
   #abridgement
   #
  end

If you are interested, you can read the code.

Summary

I introduced a self-pipe technique to avoid blocking that occurs in IO.select processing. It's a technique that is used quite a bit in libraries such as foreman and unicorn, which I usually take care of, so why not try learning it at this time? There was no article in Japan, so I made an article. I think there are some areas that cannot be reached, but I hope it will be useful.

reference

Recommended Posts

Self-pipe technique useful in IO.select processing
Use MouseListener in Processing
Write Processing in IntelliJ IDEA
Measured parallel processing in Java
Mazume judgment processing in fishing
Simultaneous key press in Processing