diff --git a/transport/control.go b/transport/control.go index f7e236a7..056d8ac9 100644 --- a/transport/control.go +++ b/transport/control.go @@ -39,15 +39,12 @@ import ( "github.com/bradfitz/http2" ) -// TODO(zhaoq): Make the following configurable. const ( + // The default value of flow control window size in HTTP2 spec. + defaultWindowSize = 65535 // The initial window size for flow control. - initialWindowSize = 65535 // for an RPC - initialConnWindowSize = 65535 // for a connection - // Window update is only sent when the inbound quota reaches - // this threshold. Used to reduce the flow control traffic. - windowUpdateThreshold = 16384 // for an RPC - connWindowUpdateThreshold = 16384 // for a connection + initialWindowSize = defaultWindowSize // for an RPC + initialConnWindowSize = defaultWindowSize * 16 // for a connection ) // The following defines various control items which could flow through @@ -129,6 +126,27 @@ func (qb *quotaPool) cancel() { } } +// reset cancels the pending quota sent on acquired, incremented by v and sends +// it back on acquire. +func (qb *quotaPool) reset(v int) { + qb.mu.Lock() + defer qb.mu.Unlock() + select { + case n := <-qb.c: + qb.quota += n + default: + } + qb.quota += v + if qb.quota <= 0 { + return + } + select { + case qb.c <- qb.quota: + qb.quota = 0 + default: + } +} + // acquire returns the channel on which available quota amounts are sent. func (qb *quotaPool) acquire() <-chan int { return qb.c diff --git a/transport/http2_client.go b/transport/http2_client.go index 0226b43f..1f7245d6 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -76,6 +76,8 @@ type http2Client struct { // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. controlBuf *recvBuffer + // The inbound quota being set + recvQuota uint32 // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool @@ -89,8 +91,10 @@ type http2Client struct { activeStreams map[uint32]*Stream // The max number of concurrent streams maxStreams uint32 - // Inbound quota for flow control - recvQuota int + // The accumulated inbound quota pending for updating the peer. + updateQuota uint32 + // the per-stream outbound flow control window size set by the peer. + streamSendQuota uint32 } // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 @@ -133,9 +137,19 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } framer := newFramer(conn) - if err := framer.writeSettings(true); err != nil { + var setting http2.Setting + if initialWindowSize != defaultWindowSize { + setting = http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)} + } + if err := framer.writeSettings(true, setting); err != nil { return nil, ConnectionErrorf("transport: %v", err) } + // Adjust the connection flow control window if needed. + if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if err := framer.writeWindowUpdate(true, 0, delta); err != nil { + return nil, ConnectionErrorf("transport: %v", err) + } + } var buf bytes.Buffer t := &http2Client{ target: addr, @@ -149,12 +163,14 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro hBuf: &buf, hEnc: hpack.NewEncoder(&buf), controlBuf: newRecvBuffer(), - sendQuotaPool: newQuotaPool(initialConnWindowSize), + recvQuota: initialConnWindowSize, + sendQuotaPool: newQuotaPool(defaultWindowSize), scheme: scheme, state: reachable, activeStreams: make(map[uint32]*Stream), maxStreams: math.MaxUint32, authCreds: opts.AuthOptions, + streamSendQuota: defaultWindowSize, } go t.controller() t.writableChan <- 0 @@ -172,12 +188,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { id: t.nextID, method: callHdr.Method, buf: newRecvBuffer(), - sendQuotaPool: newQuotaPool(initialWindowSize), + recvQuota: initialWindowSize, + sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), } t.nextID += 2 s.windowHandler = func(n int) { - t.addRecvQuota(s, n) + t.updateWindow(s, uint32(n)) } // Make a stream be able to cancel the pending operations by itself. s.ctx, s.cancel = context.WithCancel(ctx) @@ -453,22 +470,22 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { return nil, false } -// addRecvQuota adjusts the inbound quota for the stream and the transport. +// updateWindow adjusts the inbound quota for the stream and the transport. // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. -func (t *http2Client) addRecvQuota(s *Stream, n int) { +func (t *http2Client) updateWindow(s *Stream, n uint32) { t.mu.Lock() - t.recvQuota += n - if t.recvQuota >= connWindowUpdateThreshold { - t.controlBuf.put(&windowUpdate{0, uint32(t.recvQuota)}) - t.recvQuota = 0 + t.updateQuota += n + if t.updateQuota >= t.recvQuota/4 { + t.controlBuf.put(&windowUpdate{0, t.updateQuota}) + t.updateQuota = 0 } t.mu.Unlock() - s.recvQuota += n - if s.recvQuota >= windowUpdateThreshold { - t.controlBuf.put(&windowUpdate{s.id, uint32(s.recvQuota)}) - s.recvQuota = 0 + s.updateQuota += n + if s.updateQuota >= s.recvQuota/4 { + t.controlBuf.put(&windowUpdate{s.id, s.updateQuota}) + s.updateQuota = 0 } } @@ -506,11 +523,23 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { } func (t *http2Client) handleSettings(f *http2.SettingsFrame) { - if v, ok := f.Value(http2.SettingMaxConcurrentStreams); ok { - t.mu.Lock() - t.maxStreams = v - t.mu.Unlock() - } + f.ForeachSetting(func(s http2.Setting) error { + if v, ok := f.Value(s.ID); ok { + t.mu.Lock() + defer t.mu.Unlock() + switch s.ID { + case http2.SettingMaxConcurrentStreams: + t.maxStreams = v + case http2.SettingInitialWindowSize: + for _, s := range t.activeStreams { + // Adjust the sending quota for each s. + s.sendQuotaPool.reset(int(v - t.streamSendQuota)) + } + t.streamSendQuota = v + } + } + return nil + }) } func (t *http2Client) handlePing(f *http2.PingFrame) { diff --git a/transport/http2_server.go b/transport/http2_server.go index 8b279d58..6b4c2ad0 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -75,14 +75,18 @@ type http2Server struct { // controlBuf delivers all the control related tasks (e.g., window // updates, reset streams, and various settings) to the controller. controlBuf *recvBuffer + // The inbound quota being set + recvQuota uint32 // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool mu sync.Mutex // guard the following state transportState activeStreams map[uint32]*Stream - // Inbound quota for flow control - recvQuota int + // The accumulated inbound quota pending for updating the peer. + updateQuota uint32 + // the per-stream outbound flow control window size set by the peer. + streamSendQuota uint32 } // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is @@ -92,14 +96,25 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32) (_ ServerTransport, err er // Send initial settings as connection preface to client. // TODO(zhaoq): Have a better way to signal "no limit" because 0 is // permitted in the HTTP2 spec. + var settings []http2.Setting + // TODO(zhaoq): Have a better way to signal "no limit" because 0 is + // permitted in the HTTP2 spec. if maxStreams == 0 { - err = framer.writeSettings(true) maxStreams = math.MaxUint32 } else { - err = framer.writeSettings(true, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams}) + settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams}) } - if err != nil { - return + if initialWindowSize != defaultWindowSize { + settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}) + } + if err := framer.writeSettings(true, settings...); err != nil { + return nil, ConnectionErrorf("transport: %v", err) + } + // Adjust the connection flow control window if needed. + if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if err := framer.writeWindowUpdate(true, 0, delta); err != nil { + return nil, ConnectionErrorf("transport: %v", err) + } } var buf bytes.Buffer t := &http2Server{ @@ -109,11 +124,13 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32) (_ ServerTransport, err er hEnc: hpack.NewEncoder(&buf), maxStreams: maxStreams, controlBuf: newRecvBuffer(), - sendQuotaPool: newQuotaPool(initialConnWindowSize), + recvQuota: initialConnWindowSize, + sendQuotaPool: newQuotaPool(defaultWindowSize), state: reachable, writableChan: make(chan int, 1), shutdownChan: make(chan struct{}), activeStreams: make(map[uint32]*Stream), + streamSendQuota: defaultWindowSize, } go t.controller() t.writableChan <- 0 @@ -154,10 +171,11 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) return nil } + s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) t.activeStreams[s.id] = s t.mu.Unlock() s.windowHandler = func(n int) { - t.addRecvQuota(s, n) + t.updateWindow(s, uint32(n)) } if hDec.state.timeoutSet { s.ctx, s.cancel = context.WithTimeout(context.TODO(), hDec.state.timeout) @@ -242,7 +260,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { id: frame.Header().StreamID, st: t, buf: buf, - sendQuotaPool: newQuotaPool(initialWindowSize), + recvQuota: initialWindowSize, } endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream) curStream = t.operateHeaders(hDec, curStream, frame, endStream, handle, &wg) @@ -282,22 +300,22 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { return s, true } -// addRecvQuota adjusts the inbound quota for the stream and the transport. +// updateWindow adjusts the inbound quota for the stream and the transport. // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. -func (t *http2Server) addRecvQuota(s *Stream, n int) { +func (t *http2Server) updateWindow(s *Stream, n uint32) { t.mu.Lock() - t.recvQuota += n - if t.recvQuota >= connWindowUpdateThreshold { - t.controlBuf.put(&windowUpdate{0, uint32(t.recvQuota)}) - t.recvQuota = 0 + t.updateQuota += n + if t.updateQuota >= t.recvQuota/4 { + t.controlBuf.put(&windowUpdate{0, t.updateQuota}) + t.updateQuota = 0 } t.mu.Unlock() - s.recvQuota += n - if s.recvQuota >= windowUpdateThreshold { - t.controlBuf.put(&windowUpdate{s.id, uint32(s.recvQuota)}) - s.recvQuota = 0 + s.updateQuota += n + if s.updateQuota >= s.recvQuota/4 { + t.controlBuf.put(&windowUpdate{s.id, s.updateQuota}) + s.updateQuota = 0 } } @@ -342,7 +360,17 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { } func (t *http2Server) handleSettings(f *http2.SettingsFrame) { - // TODO(zhaoq): Handle the useful settings from client. + f.ForeachSetting(func(s http2.Setting) error { + if v, ok := f.Value(http2.SettingInitialWindowSize); ok { + t.mu.Lock() + defer t.mu.Unlock() + for _, s := range t.activeStreams { + s.sendQuotaPool.reset(int(v - t.streamSendQuota)) + } + t.streamSendQuota = v + } + return nil + }) } func (t *http2Server) handlePing(f *http2.PingFrame) { diff --git a/transport/transport.go b/transport/transport.go index 5529a3b7..34b22975 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -173,8 +173,10 @@ type Stream struct { buf *recvBuffer dec io.Reader - // Inbound quota for flow control - recvQuota int + // The inbound quota being set + recvQuota uint32 + // The accumulated inbound quota pending for updating the peer. + updateQuota uint32 // The handler to control the window update procedure for both this // particular stream and the associated transport. windowHandler func(int)