transport: derive transport context from context.Background (#2930)

This commit is contained in:
James Protzman
2019-08-29 17:36:45 -04:00
committed by Doug Fawley
parent d5a36f00e6
commit 4d39b48954

View File

@ -65,8 +65,7 @@ var (
// http2Server implements the ServerTransport interface with HTTP2. // http2Server implements the ServerTransport interface with HTTP2.
type http2Server struct { type http2Server struct {
ctx context.Context ctx context.Context
ctxDone <-chan struct{} // Cache the context.Done() chan done chan struct{}
cancel context.CancelFunc
conn net.Conn conn net.Conn
loopy *loopyWriter loopy *loopyWriter
readerDone chan struct{} // sync point to enable testing. readerDone chan struct{} // sync point to enable testing.
@ -203,11 +202,10 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
if kep.MinTime == 0 { if kep.MinTime == 0 {
kep.MinTime = defaultKeepalivePolicyMinTime kep.MinTime = defaultKeepalivePolicyMinTime
} }
ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{})
t := &http2Server{ t := &http2Server{
ctx: ctx, ctx: context.Background(),
cancel: cancel, done: done,
ctxDone: ctx.Done(),
conn: conn, conn: conn,
remoteAddr: conn.RemoteAddr(), remoteAddr: conn.RemoteAddr(),
localAddr: conn.LocalAddr(), localAddr: conn.LocalAddr(),
@ -228,7 +226,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
czData: new(channelzData), czData: new(channelzData),
bufferPool: newBufferPool(), bufferPool: newBufferPool(),
} }
t.controlBuf = newControlBuffer(t.ctxDone) t.controlBuf = newControlBuffer(t.done)
if dynamicWindow { if dynamicWindow {
t.bdpEst = &bdpEstimator{ t.bdpEst = &bdpEstimator{
bdp: initialWindowSize, bdp: initialWindowSize,
@ -886,7 +884,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
// TODO(mmukhi, dfawley): Should the server write also return io.EOF? // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
s.cancel() s.cancel()
select { select {
case <-t.ctx.Done(): case <-t.done:
return ErrConnClosing return ErrConnClosing
default: default:
} }
@ -908,7 +906,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
} }
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
select { select {
case <-t.ctx.Done(): case <-t.done:
return ErrConnClosing return ErrConnClosing
default: default:
} }
@ -974,7 +972,7 @@ func (t *http2Server) keepalive() {
t.Close() t.Close()
// Resetting the timer so that the clean-up doesn't deadlock. // Resetting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity) maxAge.Reset(infinity)
case <-t.ctx.Done(): case <-t.done:
} }
return return
case <-keepalive.C: case <-keepalive.C:
@ -996,7 +994,7 @@ func (t *http2Server) keepalive() {
} }
t.controlBuf.put(p) t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout) keepalive.Reset(t.kp.Timeout)
case <-t.ctx.Done(): case <-t.done:
return return
} }
} }
@ -1016,7 +1014,7 @@ func (t *http2Server) Close() error {
t.activeStreams = nil t.activeStreams = nil
t.mu.Unlock() t.mu.Unlock()
t.controlBuf.finish() t.controlBuf.finish()
t.cancel() close(t.done)
err := t.conn.Close() err := t.conn.Close()
if channelz.IsOn() { if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID) channelz.RemoveEntry(t.channelzID)
@ -1156,7 +1154,7 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
select { select {
case <-t.drainChan: case <-t.drainChan:
case <-timer.C: case <-timer.C:
case <-t.ctx.Done(): case <-t.done:
return return
} }
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
@ -1206,7 +1204,7 @@ func (t *http2Server) getOutFlowWindow() int64 {
select { select {
case sz := <-resp: case sz := <-resp:
return int64(sz) return int64(sz)
case <-t.ctxDone: case <-t.done:
return -1 return -1
case <-timer.C: case <-timer.C:
return -2 return -2