Use sync.Cond to implement keepalive dormancy. (#2987)
The current code was using a buffered channel which was used to signal whether the keepalive goroutine should enter dormancy or if it should be awaked from sleep. Using sync.Cond makes the code much simpler to read, and also improves the performance numbers in all but one front.
This commit is contained in:

committed by
GitHub

parent
ad20eafd48
commit
45bd2846a3
@ -108,7 +108,7 @@ type headerFrame struct {
|
|||||||
streamID uint32
|
streamID uint32
|
||||||
hf []hpack.HeaderField
|
hf []hpack.HeaderField
|
||||||
endStream bool // Valid on server side.
|
endStream bool // Valid on server side.
|
||||||
initStream func(uint32) (bool, error) // Used only on the client side.
|
initStream func(uint32) error // Used only on the client side.
|
||||||
onWrite func()
|
onWrite func()
|
||||||
wq *writeQuota // write quota for the stream created.
|
wq *writeQuota // write quota for the stream created.
|
||||||
cleanup *cleanupStream // Valid on the server side.
|
cleanup *cleanupStream // Valid on the server side.
|
||||||
@ -637,21 +637,17 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
|||||||
|
|
||||||
func (l *loopyWriter) originateStream(str *outStream) error {
|
func (l *loopyWriter) originateStream(str *outStream) error {
|
||||||
hdr := str.itl.dequeue().(*headerFrame)
|
hdr := str.itl.dequeue().(*headerFrame)
|
||||||
sendPing, err := hdr.initStream(str.id)
|
if err := hdr.initStream(str.id); err != nil {
|
||||||
if err != nil {
|
|
||||||
if err == ErrConnClosing {
|
if err == ErrConnClosing {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Other errors(errStreamDrain) need not close transport.
|
// Other errors(errStreamDrain) need not close transport.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
|
if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
l.estdStreams[str.id] = str
|
l.estdStreams[str.id] = str
|
||||||
if sendPing {
|
|
||||||
return l.pingHandler(&ping{data: [8]byte{}})
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,8 +62,6 @@ type http2Client struct {
|
|||||||
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
// goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
|
||||||
// that the server sent GoAway on this transport.
|
// that the server sent GoAway on this transport.
|
||||||
goAway chan struct{}
|
goAway chan struct{}
|
||||||
// awakenKeepalive is used to wake up keepalive when after it has gone dormant.
|
|
||||||
awakenKeepalive chan struct{}
|
|
||||||
|
|
||||||
framer *framer
|
framer *framer
|
||||||
// controlBuf delivers all the control related tasks (e.g., window
|
// controlBuf delivers all the control related tasks (e.g., window
|
||||||
@ -110,6 +108,16 @@ type http2Client struct {
|
|||||||
// goAwayReason records the http2.ErrCode and debug data received with the
|
// goAwayReason records the http2.ErrCode and debug data received with the
|
||||||
// GoAway frame.
|
// GoAway frame.
|
||||||
goAwayReason GoAwayReason
|
goAwayReason GoAwayReason
|
||||||
|
// A condition variable used to signal when the keepalive goroutine should
|
||||||
|
// go dormant. The condition for dormancy is based on the number of active
|
||||||
|
// streams and the `PermitWithoutStream` keepalive client parameter. And
|
||||||
|
// since the number of active streams is guarded by the above mutex, we use
|
||||||
|
// the same for this condition variable as well.
|
||||||
|
kpDormancyCond *sync.Cond
|
||||||
|
// A boolean to track whether the keepalive goroutine is dormant or not.
|
||||||
|
// This is checked before attempting to signal the above condition
|
||||||
|
// variable.
|
||||||
|
kpDormant bool
|
||||||
|
|
||||||
// Fields below are for channelz metric collection.
|
// Fields below are for channelz metric collection.
|
||||||
channelzID int64 // channelz unique identification number
|
channelzID int64 // channelz unique identification number
|
||||||
@ -232,7 +240,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
|||||||
readerDone: make(chan struct{}),
|
readerDone: make(chan struct{}),
|
||||||
writerDone: make(chan struct{}),
|
writerDone: make(chan struct{}),
|
||||||
goAway: make(chan struct{}),
|
goAway: make(chan struct{}),
|
||||||
awakenKeepalive: make(chan struct{}, 1),
|
|
||||||
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
|
framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
|
||||||
fc: &trInFlow{limit: uint32(icwz)},
|
fc: &trInFlow{limit: uint32(icwz)},
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
@ -264,9 +271,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
|||||||
updateFlowControl: t.updateFlowControl,
|
updateFlowControl: t.updateFlowControl,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Make sure awakenKeepalive can't be written upon.
|
|
||||||
// keepalive routine will make it writable, if need be.
|
|
||||||
t.awakenKeepalive <- struct{}{}
|
|
||||||
if t.statsHandler != nil {
|
if t.statsHandler != nil {
|
||||||
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
|
t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||||
RemoteAddr: t.remoteAddr,
|
RemoteAddr: t.remoteAddr,
|
||||||
@ -281,6 +285,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
|||||||
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
|
t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
|
||||||
}
|
}
|
||||||
if t.keepaliveEnabled {
|
if t.keepaliveEnabled {
|
||||||
|
t.kpDormancyCond = sync.NewCond(&t.mu)
|
||||||
go t.keepalive()
|
go t.keepalive()
|
||||||
}
|
}
|
||||||
// Start the reader goroutine for incoming message. Each transport has
|
// Start the reader goroutine for incoming message. Each transport has
|
||||||
@ -564,7 +569,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
hdr := &headerFrame{
|
hdr := &headerFrame{
|
||||||
hf: headerFields,
|
hf: headerFields,
|
||||||
endStream: false,
|
endStream: false,
|
||||||
initStream: func(id uint32) (bool, error) {
|
initStream: func(id uint32) error {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
if state := t.state; state != reachable {
|
if state := t.state; state != reachable {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
@ -574,29 +579,19 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
err = ErrConnClosing
|
err = ErrConnClosing
|
||||||
}
|
}
|
||||||
cleanup(err)
|
cleanup(err)
|
||||||
return false, err
|
return err
|
||||||
}
|
}
|
||||||
t.activeStreams[id] = s
|
t.activeStreams[id] = s
|
||||||
if channelz.IsOn() {
|
if channelz.IsOn() {
|
||||||
atomic.AddInt64(&t.czData.streamsStarted, 1)
|
atomic.AddInt64(&t.czData.streamsStarted, 1)
|
||||||
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
|
atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
|
||||||
}
|
}
|
||||||
var sendPing bool
|
// If the keepalive goroutine has gone dormant, wake it up.
|
||||||
// If the number of active streams change from 0 to 1, then check if keepalive
|
if t.kpDormant {
|
||||||
// has gone dormant. If so, wake it up.
|
t.kpDormancyCond.Signal()
|
||||||
if len(t.activeStreams) == 1 && t.keepaliveEnabled {
|
|
||||||
select {
|
|
||||||
case t.awakenKeepalive <- struct{}{}:
|
|
||||||
sendPing = true
|
|
||||||
// Fill the awakenKeepalive channel again as this channel must be
|
|
||||||
// kept non-writable except at the point that the keepalive()
|
|
||||||
// goroutine is waiting either to be awaken or shutdown.
|
|
||||||
t.awakenKeepalive <- struct{}{}
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return sendPing, nil
|
return nil
|
||||||
},
|
},
|
||||||
onOrphaned: cleanup,
|
onOrphaned: cleanup,
|
||||||
wq: s.wq,
|
wq: s.wq,
|
||||||
@ -778,6 +773,11 @@ func (t *http2Client) Close() error {
|
|||||||
t.state = closing
|
t.state = closing
|
||||||
streams := t.activeStreams
|
streams := t.activeStreams
|
||||||
t.activeStreams = nil
|
t.activeStreams = nil
|
||||||
|
if t.kpDormant {
|
||||||
|
// If the keepalive goroutine is blocked on this condition variable, we
|
||||||
|
// should unblock it so that the goroutine eventually exits.
|
||||||
|
t.kpDormancyCond.Signal()
|
||||||
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
t.controlBuf.finish()
|
t.controlBuf.finish()
|
||||||
t.cancel()
|
t.cancel()
|
||||||
@ -1303,29 +1303,32 @@ func (t *http2Client) keepalive() {
|
|||||||
timer.Reset(t.kp.Time)
|
timer.Reset(t.kp.Time)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Check if keepalive should go dormant.
|
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
|
if t.state == closing {
|
||||||
// Make awakenKeepalive writable.
|
// If the transport is closing, we should exit from the
|
||||||
<-t.awakenKeepalive
|
// keepalive goroutine here. If not, we could have a race
|
||||||
|
// between the call to Signal() from Close() and the call to
|
||||||
|
// Wait() here, whereby the keepalive goroutine ends up
|
||||||
|
// blocking on the condition variable which will never be
|
||||||
|
// signalled again.
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
select {
|
|
||||||
case <-t.awakenKeepalive:
|
|
||||||
// If the control gets here a ping has been sent
|
|
||||||
// need to reset the timer with keepalive.Timeout.
|
|
||||||
case <-t.ctx.Done():
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
|
||||||
|
t.kpDormant = true
|
||||||
|
t.kpDormancyCond.Wait()
|
||||||
|
}
|
||||||
|
t.kpDormant = false
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
|
||||||
if channelz.IsOn() {
|
if channelz.IsOn() {
|
||||||
atomic.AddInt64(&t.czData.kpCount, 1)
|
atomic.AddInt64(&t.czData.kpCount, 1)
|
||||||
}
|
}
|
||||||
// Send ping.
|
// We get here either because we were dormant and a new stream was
|
||||||
|
// created which unblocked the Wait() call, or because the
|
||||||
|
// keepalive timer expired. In both cases, we need to send a ping.
|
||||||
t.controlBuf.put(p)
|
t.controlBuf.put(p)
|
||||||
}
|
|
||||||
|
|
||||||
// By the time control gets here a ping has been sent one way or the other.
|
|
||||||
timer.Reset(t.kp.Timeout)
|
timer.Reset(t.kp.Timeout)
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
|
Reference in New Issue
Block a user