Continuing from the last time, I will write in connection with Go's HTTP implementation. Today is Channel. Click here for details (https://qiita.com/behiron/items/78d023be96058224e583#%E3%81%AF%E3%81%98%E3%82%81%E3%81%AB)
Channel There are two types of Go, unbuffered channel and buffered channel. In the case of unbuffered channel, it is preferable to use buffered channel as much as possible because it will be blocked if either send/recv is not ready, and Go's HTTP implementation also uses buffered channel a lot.
However, in the case of buffered channel, even if you continue processing after send/recv, it is not always the case that the other party is actually processing, so you may have to be careful.
The following is a very simple example of this.
Below,
Since resc: make (chan responseAndError)
is used and it is an unbuffered channel, it receives the result of addition as expected (since it is just adding 1 for simplicity, values from 1 to 10 are added in any order. Can receive).
If you change to buffered channel with resc: make (chan responseAndError, 1)
, select with recv
select {
case <-adder.close:
return false
case rc := <-adder.resc:
It is uncertain which one will continue processing, and you will receive about half of the failures. This is because if there are multiple items in spec that can continue processing as shown below, they are randomly selected.
If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.
package main
import (
"fmt"
"sync"
"time"
)
const debug = false
type responseAndError struct {
response int
err error
}
type adder struct {
// for cancel or timeout
close chan struct{}
resc chan responseAndError
}
func newAdder() *adder {
return &adder{
close: make(chan struct{}),
// must use unbuffered channel
resc: make(chan responseAndError),
// if use buffered channel, we would get FAILED log
// resc: make(chan responseAndError, 1),
}
}
func (adder *adder) handle(a int, b int) bool {
adder.add(a, b)
time.Sleep(time.Second * 1)
select {
case <-adder.close:
return false
case rc := <-adder.resc:
if debug {
fmt.Printf("result: %d, err: %v", rc.response, rc.err)
}
return true
}
}
func (adder *adder) add(a int, b int) {
go func(a int, b int) {
defer func() {
close(adder.close)
}()
res := a + b
adder.resc <- responseAndError{res, nil}
}(a, b)
}
func main() {
wg := &sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
adder := newAdder()
ok := adder.handle(i, 1)
if !ok {
fmt.Println("================FAILED============")
}
wg.Done()
}(i)
}
wg.Wait()
}
The source is given to github.
The above example is actually based on the HTTP client implementation.
In particular,
The recv side (handle method) is the part that waits for the Persistent Connection to receive the response in another go routine
On the send side (add method), Persistent Connection reads in an infinite loop
I take out only the essence and write it. Supplementally, in the part where Persistent Connection is reading in an infinite loop, if it is determined that the connection will not be reused, it will be notified by closing the close channel as in this sample. At that time, the connection is actually closed.
The fact that it should be an unbuffered channel is mentioned in the comments, for the same reason as the sample. I will quote below. rc.ch
corresponds to adder.resc
in this sample.
// Put the idle conn back into the pool before we send the response
// so if they process it quickly and make another request, they'll
// get this same conn. But we use the unbuffered channel 'rc'
// to guarantee that persistConn.roundTrip got out of its select
// potentially waiting for this persistConn to close.
alive = alive &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyWritable {
closeErr = errCallerOwnsConn
}
select {
case rc.ch <- responseAndError{res: resp}:
case <-rc.callerGone:
return
}
// Now that they've read from the unbuffered channel, they're safely
// out of the select that also waits on this goroutine to die, so
// we're allowed to exit now if needed (if alive is false)
testHookReadLoopBeforeNextRead()
continue
In the above sample, by inserting time.Sleep (time.Second * 1)
, at the time of select
I created a state where both adder.close
and adder.resc
can be processed, but what if I reach select
with both blocked?
I moved time.Sleep (time.Second * 1)
right after res: = a + b
because I was curious. In that case, the buffered channel did not cause an error.
In other words, when sending to the buffered channel, the blocked select
judged that the buffered channel was recvable and was selected.
It was the same even if I executed it 10000000 times instead of 10 times, and I tried to prepare Gosched ()
, but the result did not change.
If the unbuffered channel is blocked by waiting for send/recv, it will always be selected by select if recv/send is done for the target channel. The same was true for the buffered channel in this sample. However, I didn't write it in the spec, and I couldn't find the description as far as I checked, so I think you should think that it is ** not guaranteed behavior **.
I will read the implementation of Go's scheduler someday, so I will check it at that time, but if anyone knows it, please let me know.
Recommended Posts