send settings/window update frames if the initial window size is not equal to http2 default.
This commit is contained in:
@ -39,15 +39,12 @@ import (
|
|||||||
"github.com/bradfitz/http2"
|
"github.com/bradfitz/http2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO(zhaoq): Make the following configurable.
|
|
||||||
const (
|
const (
|
||||||
|
// The default value of flow control window size in HTTP2 spec.
|
||||||
|
defaultWindowSize = 65535
|
||||||
// The initial window size for flow control.
|
// The initial window size for flow control.
|
||||||
initialWindowSize = 65535 // for an RPC
|
initialWindowSize = defaultWindowSize // for an RPC
|
||||||
initialConnWindowSize = 65535 // for a connection
|
initialConnWindowSize = defaultWindowSize * 16 // for a connection
|
||||||
// Window update is only sent when the inbound quota reaches
|
|
||||||
// this threshold. Used to reduce the flow control traffic.
|
|
||||||
windowUpdateThreshold = 16384 // for an RPC
|
|
||||||
connWindowUpdateThreshold = 16384 // for a connection
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// The following defines various control items which could flow through
|
// The following defines various control items which could flow through
|
||||||
@ -129,6 +126,27 @@ func (qb *quotaPool) cancel() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// reset cancels the pending quota sent on acquired, incremented by v and sends
|
||||||
|
// it back on acquire.
|
||||||
|
func (qb *quotaPool) reset(v int) {
|
||||||
|
qb.mu.Lock()
|
||||||
|
defer qb.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case n := <-qb.c:
|
||||||
|
qb.quota += n
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
qb.quota += v
|
||||||
|
if qb.quota <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case qb.c <- qb.quota:
|
||||||
|
qb.quota = 0
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// acquire returns the channel on which available quota amounts are sent.
|
// acquire returns the channel on which available quota amounts are sent.
|
||||||
func (qb *quotaPool) acquire() <-chan int {
|
func (qb *quotaPool) acquire() <-chan int {
|
||||||
return qb.c
|
return qb.c
|
||||||
|
@ -76,6 +76,8 @@ type http2Client struct {
|
|||||||
// controlBuf delivers all the control related tasks (e.g., window
|
// controlBuf delivers all the control related tasks (e.g., window
|
||||||
// updates, reset streams, and various settings) to the controller.
|
// updates, reset streams, and various settings) to the controller.
|
||||||
controlBuf *recvBuffer
|
controlBuf *recvBuffer
|
||||||
|
// The inbound quota being set
|
||||||
|
recvQuota uint32
|
||||||
// sendQuotaPool provides flow control to outbound message.
|
// sendQuotaPool provides flow control to outbound message.
|
||||||
sendQuotaPool *quotaPool
|
sendQuotaPool *quotaPool
|
||||||
|
|
||||||
@ -89,8 +91,10 @@ type http2Client struct {
|
|||||||
activeStreams map[uint32]*Stream
|
activeStreams map[uint32]*Stream
|
||||||
// The max number of concurrent streams
|
// The max number of concurrent streams
|
||||||
maxStreams uint32
|
maxStreams uint32
|
||||||
// Inbound quota for flow control
|
// The accumulated inbound quota pending for updating the peer.
|
||||||
recvQuota int
|
updateQuota uint32
|
||||||
|
// the per-stream outbound flow control window size set by the peer.
|
||||||
|
streamSendQuota uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||||
@ -133,9 +137,19 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro
|
|||||||
return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
return nil, ConnectionErrorf("transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
|
||||||
}
|
}
|
||||||
framer := newFramer(conn)
|
framer := newFramer(conn)
|
||||||
if err := framer.writeSettings(true); err != nil {
|
var setting http2.Setting
|
||||||
|
if initialWindowSize != defaultWindowSize {
|
||||||
|
setting = http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)}
|
||||||
|
}
|
||||||
|
if err := framer.writeSettings(true, setting); err != nil {
|
||||||
return nil, ConnectionErrorf("transport: %v", err)
|
return nil, ConnectionErrorf("transport: %v", err)
|
||||||
}
|
}
|
||||||
|
// Adjust the connection flow control window if needed.
|
||||||
|
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
|
||||||
|
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
|
||||||
|
return nil, ConnectionErrorf("transport: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
t := &http2Client{
|
t := &http2Client{
|
||||||
target: addr,
|
target: addr,
|
||||||
@ -149,12 +163,14 @@ func newHTTP2Client(addr string, opts *DialOptions) (_ ClientTransport, err erro
|
|||||||
hBuf: &buf,
|
hBuf: &buf,
|
||||||
hEnc: hpack.NewEncoder(&buf),
|
hEnc: hpack.NewEncoder(&buf),
|
||||||
controlBuf: newRecvBuffer(),
|
controlBuf: newRecvBuffer(),
|
||||||
sendQuotaPool: newQuotaPool(initialConnWindowSize),
|
recvQuota: initialConnWindowSize,
|
||||||
|
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
state: reachable,
|
state: reachable,
|
||||||
activeStreams: make(map[uint32]*Stream),
|
activeStreams: make(map[uint32]*Stream),
|
||||||
maxStreams: math.MaxUint32,
|
maxStreams: math.MaxUint32,
|
||||||
authCreds: opts.AuthOptions,
|
authCreds: opts.AuthOptions,
|
||||||
|
streamSendQuota: defaultWindowSize,
|
||||||
}
|
}
|
||||||
go t.controller()
|
go t.controller()
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
@ -172,12 +188,13 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
id: t.nextID,
|
id: t.nextID,
|
||||||
method: callHdr.Method,
|
method: callHdr.Method,
|
||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
sendQuotaPool: newQuotaPool(initialWindowSize),
|
recvQuota: initialWindowSize,
|
||||||
|
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
||||||
headerChan: make(chan struct{}),
|
headerChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
t.nextID += 2
|
t.nextID += 2
|
||||||
s.windowHandler = func(n int) {
|
s.windowHandler = func(n int) {
|
||||||
t.addRecvQuota(s, n)
|
t.updateWindow(s, uint32(n))
|
||||||
}
|
}
|
||||||
// Make a stream be able to cancel the pending operations by itself.
|
// Make a stream be able to cancel the pending operations by itself.
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
@ -453,22 +470,22 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
|
|||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
|
|
||||||
// addRecvQuota adjusts the inbound quota for the stream and the transport.
|
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||||
// Window updates will deliver to the controller for sending when
|
// Window updates will deliver to the controller for sending when
|
||||||
// the cumulative quota exceeds the corresponding threshold.
|
// the cumulative quota exceeds the corresponding threshold.
|
||||||
func (t *http2Client) addRecvQuota(s *Stream, n int) {
|
func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
t.recvQuota += n
|
t.updateQuota += n
|
||||||
if t.recvQuota >= connWindowUpdateThreshold {
|
if t.updateQuota >= t.recvQuota/4 {
|
||||||
t.controlBuf.put(&windowUpdate{0, uint32(t.recvQuota)})
|
t.controlBuf.put(&windowUpdate{0, t.updateQuota})
|
||||||
t.recvQuota = 0
|
t.updateQuota = 0
|
||||||
}
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
|
||||||
s.recvQuota += n
|
s.updateQuota += n
|
||||||
if s.recvQuota >= windowUpdateThreshold {
|
if s.updateQuota >= s.recvQuota/4 {
|
||||||
t.controlBuf.put(&windowUpdate{s.id, uint32(s.recvQuota)})
|
t.controlBuf.put(&windowUpdate{s.id, s.updateQuota})
|
||||||
s.recvQuota = 0
|
s.updateQuota = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -506,11 +523,23 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
||||||
if v, ok := f.Value(http2.SettingMaxConcurrentStreams); ok {
|
f.ForeachSetting(func(s http2.Setting) error {
|
||||||
t.mu.Lock()
|
if v, ok := f.Value(s.ID); ok {
|
||||||
t.maxStreams = v
|
t.mu.Lock()
|
||||||
t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
}
|
switch s.ID {
|
||||||
|
case http2.SettingMaxConcurrentStreams:
|
||||||
|
t.maxStreams = v
|
||||||
|
case http2.SettingInitialWindowSize:
|
||||||
|
for _, s := range t.activeStreams {
|
||||||
|
// Adjust the sending quota for each s.
|
||||||
|
s.sendQuotaPool.reset(int(v - t.streamSendQuota))
|
||||||
|
}
|
||||||
|
t.streamSendQuota = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
||||||
|
@ -75,14 +75,18 @@ type http2Server struct {
|
|||||||
// controlBuf delivers all the control related tasks (e.g., window
|
// controlBuf delivers all the control related tasks (e.g., window
|
||||||
// updates, reset streams, and various settings) to the controller.
|
// updates, reset streams, and various settings) to the controller.
|
||||||
controlBuf *recvBuffer
|
controlBuf *recvBuffer
|
||||||
|
// The inbound quota being set
|
||||||
|
recvQuota uint32
|
||||||
// sendQuotaPool provides flow control to outbound message.
|
// sendQuotaPool provides flow control to outbound message.
|
||||||
sendQuotaPool *quotaPool
|
sendQuotaPool *quotaPool
|
||||||
|
|
||||||
mu sync.Mutex // guard the following
|
mu sync.Mutex // guard the following
|
||||||
state transportState
|
state transportState
|
||||||
activeStreams map[uint32]*Stream
|
activeStreams map[uint32]*Stream
|
||||||
// Inbound quota for flow control
|
// The accumulated inbound quota pending for updating the peer.
|
||||||
recvQuota int
|
updateQuota uint32
|
||||||
|
// the per-stream outbound flow control window size set by the peer.
|
||||||
|
streamSendQuota uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
|
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
|
||||||
@ -92,14 +96,25 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32) (_ ServerTransport, err er
|
|||||||
// Send initial settings as connection preface to client.
|
// Send initial settings as connection preface to client.
|
||||||
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
|
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
|
||||||
// permitted in the HTTP2 spec.
|
// permitted in the HTTP2 spec.
|
||||||
|
var settings []http2.Setting
|
||||||
|
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
|
||||||
|
// permitted in the HTTP2 spec.
|
||||||
if maxStreams == 0 {
|
if maxStreams == 0 {
|
||||||
err = framer.writeSettings(true)
|
|
||||||
maxStreams = math.MaxUint32
|
maxStreams = math.MaxUint32
|
||||||
} else {
|
} else {
|
||||||
err = framer.writeSettings(true, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
|
settings = append(settings, http2.Setting{http2.SettingMaxConcurrentStreams, maxStreams})
|
||||||
}
|
}
|
||||||
if err != nil {
|
if initialWindowSize != defaultWindowSize {
|
||||||
return
|
settings = append(settings, http2.Setting{http2.SettingInitialWindowSize, uint32(initialWindowSize)})
|
||||||
|
}
|
||||||
|
if err := framer.writeSettings(true, settings...); err != nil {
|
||||||
|
return nil, ConnectionErrorf("transport: %v", err)
|
||||||
|
}
|
||||||
|
// Adjust the connection flow control window if needed.
|
||||||
|
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
|
||||||
|
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
|
||||||
|
return nil, ConnectionErrorf("transport: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
t := &http2Server{
|
t := &http2Server{
|
||||||
@ -109,11 +124,13 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32) (_ ServerTransport, err er
|
|||||||
hEnc: hpack.NewEncoder(&buf),
|
hEnc: hpack.NewEncoder(&buf),
|
||||||
maxStreams: maxStreams,
|
maxStreams: maxStreams,
|
||||||
controlBuf: newRecvBuffer(),
|
controlBuf: newRecvBuffer(),
|
||||||
sendQuotaPool: newQuotaPool(initialConnWindowSize),
|
recvQuota: initialConnWindowSize,
|
||||||
|
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
||||||
state: reachable,
|
state: reachable,
|
||||||
writableChan: make(chan int, 1),
|
writableChan: make(chan int, 1),
|
||||||
shutdownChan: make(chan struct{}),
|
shutdownChan: make(chan struct{}),
|
||||||
activeStreams: make(map[uint32]*Stream),
|
activeStreams: make(map[uint32]*Stream),
|
||||||
|
streamSendQuota: defaultWindowSize,
|
||||||
}
|
}
|
||||||
go t.controller()
|
go t.controller()
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
@ -154,10 +171,11 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header
|
|||||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
|
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
||||||
t.activeStreams[s.id] = s
|
t.activeStreams[s.id] = s
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
s.windowHandler = func(n int) {
|
s.windowHandler = func(n int) {
|
||||||
t.addRecvQuota(s, n)
|
t.updateWindow(s, uint32(n))
|
||||||
}
|
}
|
||||||
if hDec.state.timeoutSet {
|
if hDec.state.timeoutSet {
|
||||||
s.ctx, s.cancel = context.WithTimeout(context.TODO(), hDec.state.timeout)
|
s.ctx, s.cancel = context.WithTimeout(context.TODO(), hDec.state.timeout)
|
||||||
@ -242,7 +260,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
id: frame.Header().StreamID,
|
id: frame.Header().StreamID,
|
||||||
st: t,
|
st: t,
|
||||||
buf: buf,
|
buf: buf,
|
||||||
sendQuotaPool: newQuotaPool(initialWindowSize),
|
recvQuota: initialWindowSize,
|
||||||
}
|
}
|
||||||
endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
|
endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
|
||||||
curStream = t.operateHeaders(hDec, curStream, frame, endStream, handle, &wg)
|
curStream = t.operateHeaders(hDec, curStream, frame, endStream, handle, &wg)
|
||||||
@ -282,22 +300,22 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
|||||||
return s, true
|
return s, true
|
||||||
}
|
}
|
||||||
|
|
||||||
// addRecvQuota adjusts the inbound quota for the stream and the transport.
|
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||||
// Window updates will deliver to the controller for sending when
|
// Window updates will deliver to the controller for sending when
|
||||||
// the cumulative quota exceeds the corresponding threshold.
|
// the cumulative quota exceeds the corresponding threshold.
|
||||||
func (t *http2Server) addRecvQuota(s *Stream, n int) {
|
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
t.recvQuota += n
|
t.updateQuota += n
|
||||||
if t.recvQuota >= connWindowUpdateThreshold {
|
if t.updateQuota >= t.recvQuota/4 {
|
||||||
t.controlBuf.put(&windowUpdate{0, uint32(t.recvQuota)})
|
t.controlBuf.put(&windowUpdate{0, t.updateQuota})
|
||||||
t.recvQuota = 0
|
t.updateQuota = 0
|
||||||
}
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
|
||||||
s.recvQuota += n
|
s.updateQuota += n
|
||||||
if s.recvQuota >= windowUpdateThreshold {
|
if s.updateQuota >= s.recvQuota/4 {
|
||||||
t.controlBuf.put(&windowUpdate{s.id, uint32(s.recvQuota)})
|
t.controlBuf.put(&windowUpdate{s.id, s.updateQuota})
|
||||||
s.recvQuota = 0
|
s.updateQuota = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,7 +360,17 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
||||||
// TODO(zhaoq): Handle the useful settings from client.
|
f.ForeachSetting(func(s http2.Setting) error {
|
||||||
|
if v, ok := f.Value(http2.SettingInitialWindowSize); ok {
|
||||||
|
t.mu.Lock()
|
||||||
|
defer t.mu.Unlock()
|
||||||
|
for _, s := range t.activeStreams {
|
||||||
|
s.sendQuotaPool.reset(int(v - t.streamSendQuota))
|
||||||
|
}
|
||||||
|
t.streamSendQuota = v
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||||
|
@ -173,8 +173,10 @@ type Stream struct {
|
|||||||
buf *recvBuffer
|
buf *recvBuffer
|
||||||
dec io.Reader
|
dec io.Reader
|
||||||
|
|
||||||
// Inbound quota for flow control
|
// The inbound quota being set
|
||||||
recvQuota int
|
recvQuota uint32
|
||||||
|
// The accumulated inbound quota pending for updating the peer.
|
||||||
|
updateQuota uint32
|
||||||
// The handler to control the window update procedure for both this
|
// The handler to control the window update procedure for both this
|
||||||
// particular stream and the associated transport.
|
// particular stream and the associated transport.
|
||||||
windowHandler func(int)
|
windowHandler func(int)
|
||||||
|
Reference in New Issue
Block a user