BDP estimation and window update. (#1310)
This commit is contained in:
125
transport/bdp_estimator.go
Normal file
125
transport/bdp_estimator.go
Normal file
@ -0,0 +1,125 @@
|
||||
package transport
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
// bdpLimit is the maximum value the flow control windows
|
||||
// will be increased to.
|
||||
bdpLimit = (1 << 20) * 4
|
||||
// alpha is a constant factor used to keep a moving average
|
||||
// of RTTs.
|
||||
alpha = 0.9
|
||||
// If the current bdp sample is greater than or equal to
|
||||
// our beta * our estimated bdp and the current bandwidth
|
||||
// sample is the maximum bandwidth observed so far, we
|
||||
// increase our bbp estimate by a factor of gamma.
|
||||
beta = 0.66
|
||||
// To put our bdp to be smaller than or equal to twice the real BDP,
|
||||
// we should multiply our current sample with 4/3, however to round things out
|
||||
// we use 2 as the multiplication factor.
|
||||
gamma = 2
|
||||
)
|
||||
|
||||
var (
|
||||
// Adding arbitrary data to ping so that its ack can be
|
||||
// identified.
|
||||
// Easter-egg: what does the ping message say?
|
||||
bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
|
||||
)
|
||||
|
||||
type bdpEstimator struct {
|
||||
// sentAt is the time when the ping was sent.
|
||||
sentAt time.Time
|
||||
|
||||
mu sync.Mutex
|
||||
// bdp is the current bdp estimate.
|
||||
bdp uint32
|
||||
// sample is the number of bytes received in one measurement cycle.
|
||||
sample uint32
|
||||
// bwMax is the maximum bandwidth noted so far (bytes/sec).
|
||||
bwMax float64
|
||||
// bool to keep track of the begining of a new measurement cycle.
|
||||
isSent bool
|
||||
// Callback to update the window sizes.
|
||||
updateFlowControl func(n uint32)
|
||||
// sampleCount is the number of samples taken so far.
|
||||
sampleCount uint64
|
||||
// round trip time (seconds)
|
||||
rtt float64
|
||||
}
|
||||
|
||||
// timesnap registers the time bdp ping was sent out so that
|
||||
// network rtt can be calculated when its ack is recieved.
|
||||
// It is called (by controller) when the bdpPing is
|
||||
// being written on the wire.
|
||||
func (b *bdpEstimator) timesnap(d [8]byte) {
|
||||
if bdpPing.data != d {
|
||||
return
|
||||
}
|
||||
b.sentAt = time.Now()
|
||||
}
|
||||
|
||||
// add adds bytes to the current sample for calculating bdp.
|
||||
// It returns true only if a ping must be sent. This can be used
|
||||
// by the caller (handleData) to make decision about batching
|
||||
// a window update with it.
|
||||
func (b *bdpEstimator) add(n uint32) bool {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
if b.bdp == bdpLimit {
|
||||
return false
|
||||
}
|
||||
if !b.isSent {
|
||||
b.isSent = true
|
||||
b.sample = n
|
||||
b.sentAt = time.Time{}
|
||||
b.sampleCount++
|
||||
return true
|
||||
}
|
||||
b.sample += n
|
||||
return false
|
||||
}
|
||||
|
||||
// calculate is called when an ack for a bdp ping is received.
|
||||
// Here we calculate the current bdp and bandwidth sample and
|
||||
// decide if the flow control windows should go up.
|
||||
func (b *bdpEstimator) calculate(d [8]byte) {
|
||||
// Check if the ping acked for was the bdp ping.
|
||||
if bdpPing.data != d {
|
||||
return
|
||||
}
|
||||
b.mu.Lock()
|
||||
rttSample := time.Since(b.sentAt).Seconds()
|
||||
if b.sampleCount < 10 {
|
||||
// Bootstrap rtt with an average of first 10 rtt samples.
|
||||
b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
|
||||
} else {
|
||||
// Heed to the recent past more.
|
||||
b.rtt += (rttSample - b.rtt) * float64(alpha)
|
||||
}
|
||||
b.isSent = false
|
||||
// The number of bytes accumalated so far in the sample is smaller
|
||||
// than or equal to 1.5 times the real BDP on a saturated connection.
|
||||
bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
|
||||
if bwCurrent > b.bwMax {
|
||||
b.bwMax = bwCurrent
|
||||
}
|
||||
// If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
|
||||
// greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
|
||||
// should update our perception of the network BDP.
|
||||
if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
|
||||
sampleFloat := float64(b.sample)
|
||||
b.bdp = uint32(gamma * sampleFloat)
|
||||
if b.bdp > bdpLimit {
|
||||
b.bdp = bdpLimit
|
||||
}
|
||||
bdp := b.bdp
|
||||
b.mu.Unlock()
|
||||
b.updateFlowControl(bdp)
|
||||
return
|
||||
}
|
||||
b.mu.Unlock()
|
||||
}
|
@ -31,8 +31,7 @@ const (
|
||||
// The default value of flow control window size in HTTP2 spec.
|
||||
defaultWindowSize = 65535
|
||||
// The initial window size for flow control.
|
||||
initialWindowSize = defaultWindowSize // for an RPC
|
||||
initialConnWindowSize = defaultWindowSize * 16 // for a connection
|
||||
initialWindowSize = defaultWindowSize // for an RPC
|
||||
infinity = time.Duration(math.MaxInt64)
|
||||
defaultClientKeepaliveTime = infinity
|
||||
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
@ -145,10 +144,9 @@ func (qb *quotaPool) acquire() <-chan int {
|
||||
|
||||
// inFlow deals with inbound flow control
|
||||
type inFlow struct {
|
||||
mu sync.Mutex
|
||||
// The inbound flow control limit for pending data.
|
||||
limit uint32
|
||||
|
||||
mu sync.Mutex
|
||||
// pendingData is the overall data which have been received but not been
|
||||
// consumed by applications.
|
||||
pendingData uint32
|
||||
@ -160,6 +158,16 @@ type inFlow struct {
|
||||
delta uint32
|
||||
}
|
||||
|
||||
// newLimit updates the inflow window to a new value n.
|
||||
// It assumes that n is always greater than the old limit.
|
||||
func (f *inFlow) newLimit(n uint32) uint32 {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
d := n - f.limit
|
||||
f.limit = n
|
||||
return d
|
||||
}
|
||||
|
||||
func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||
if n > uint32(math.MaxInt32) {
|
||||
n = uint32(math.MaxInt32)
|
||||
|
@ -99,6 +99,8 @@ type http2Client struct {
|
||||
|
||||
initialWindowSize int32
|
||||
|
||||
bdpEst *bdpEstimator
|
||||
|
||||
mu sync.Mutex // guard the following variables
|
||||
state transportState // the state of underlying connection
|
||||
activeStreams map[uint32]*Stream
|
||||
@ -191,9 +193,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
||||
if kp.Timeout == 0 {
|
||||
kp.Timeout = defaultClientKeepaliveTimeout
|
||||
}
|
||||
icwz := int32(initialConnWindowSize)
|
||||
dynamicWindow := true
|
||||
icwz := int32(initialWindowSize)
|
||||
if opts.InitialConnWindowSize >= defaultWindowSize {
|
||||
icwz = opts.InitialConnWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
t := &http2Client{
|
||||
@ -232,6 +236,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
||||
}
|
||||
if opts.InitialWindowSize >= defaultWindowSize {
|
||||
t.initialWindowSize = opts.InitialWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
if dynamicWindow {
|
||||
t.bdpEst = &bdpEstimator{
|
||||
bdp: initialWindowSize,
|
||||
updateFlowControl: t.updateFlowControl,
|
||||
}
|
||||
}
|
||||
// Make sure awakenKeepalive can't be written upon.
|
||||
// keepalive routine will make it writable, if need be.
|
||||
@ -827,11 +838,33 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
// updateFlowControl updates the incoming flow control windows
|
||||
// for the transport and the stream based on the current bdp
|
||||
// estimation.
|
||||
func (t *http2Client) updateFlowControl(n uint32) {
|
||||
t.mu.Lock()
|
||||
for _, s := range t.activeStreams {
|
||||
s.fc.newLimit(n)
|
||||
}
|
||||
t.initialWindowSize = int32(n)
|
||||
t.mu.Unlock()
|
||||
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
|
||||
t.controlBuf.put(&settings{
|
||||
ack: false,
|
||||
ss: []http2.Setting{
|
||||
{
|
||||
ID: http2.SettingInitialWindowSize,
|
||||
Val: uint32(n),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
size := f.Header().Length
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
t.notifyError(connectionErrorf(true, err, "%v", err))
|
||||
return
|
||||
var sendBDPPing bool
|
||||
if t.bdpEst != nil {
|
||||
sendBDPPing = t.bdpEst.add(uint32(size))
|
||||
}
|
||||
// Decouple connection's flow control from application's read.
|
||||
// An update on connection's flow control should not depend on
|
||||
@ -841,8 +874,20 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||
// Decoupling the connection flow control will prevent other
|
||||
// active(fast) streams from starving in presence of slow or
|
||||
// inactive streams.
|
||||
if w := t.fc.onRead(uint32(size)); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, w, true})
|
||||
//
|
||||
// Furthermore, if a bdpPing is being sent out we can piggyback
|
||||
// connection's window update for the bytes we just received.
|
||||
if sendBDPPing {
|
||||
t.controlBuf.put(&windowUpdate{0, uint32(size), false})
|
||||
t.controlBuf.put(bdpPing)
|
||||
} else {
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
t.notifyError(connectionErrorf(true, err, "%v", err))
|
||||
return
|
||||
}
|
||||
if w := t.fc.onRead(uint32(size)); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, w, true})
|
||||
}
|
||||
}
|
||||
// Select the right stream to dispatch.
|
||||
s, ok := t.getStream(f)
|
||||
@ -930,7 +975,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
||||
}
|
||||
|
||||
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
||||
if f.IsAck() { // Do nothing.
|
||||
if f.IsAck() {
|
||||
// Maybe it's a BDP ping.
|
||||
if t.bdpEst != nil {
|
||||
t.bdpEst.calculate(f.Data)
|
||||
}
|
||||
return
|
||||
}
|
||||
pingAck := &ping{ack: true}
|
||||
@ -1202,6 +1251,9 @@ func (t *http2Client) controller() {
|
||||
case *flushIO:
|
||||
t.framer.flushWrite()
|
||||
case *ping:
|
||||
if !i.ack {
|
||||
t.bdpEst.timesnap(i.data)
|
||||
}
|
||||
t.framer.writePing(true, i.ack, i.data)
|
||||
default:
|
||||
grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
|
||||
|
@ -100,6 +100,8 @@ type http2Server struct {
|
||||
|
||||
initialWindowSize int32
|
||||
|
||||
bdpEst *bdpEstimator
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
@ -117,32 +119,35 @@ type http2Server struct {
|
||||
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
|
||||
framer := newFramer(conn)
|
||||
// Send initial settings as connection preface to client.
|
||||
var settings []http2.Setting
|
||||
var isettings []http2.Setting
|
||||
// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
|
||||
// permitted in the HTTP2 spec.
|
||||
maxStreams := config.MaxStreams
|
||||
if maxStreams == 0 {
|
||||
maxStreams = math.MaxUint32
|
||||
} else {
|
||||
settings = append(settings, http2.Setting{
|
||||
isettings = append(isettings, http2.Setting{
|
||||
ID: http2.SettingMaxConcurrentStreams,
|
||||
Val: maxStreams,
|
||||
})
|
||||
}
|
||||
dynamicWindow := true
|
||||
iwz := int32(initialWindowSize)
|
||||
if config.InitialWindowSize >= defaultWindowSize {
|
||||
iwz = config.InitialWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
icwz := int32(initialConnWindowSize)
|
||||
icwz := int32(initialWindowSize)
|
||||
if config.InitialConnWindowSize >= defaultWindowSize {
|
||||
icwz = config.InitialConnWindowSize
|
||||
dynamicWindow = false
|
||||
}
|
||||
if iwz != defaultWindowSize {
|
||||
settings = append(settings, http2.Setting{
|
||||
isettings = append(isettings, http2.Setting{
|
||||
ID: http2.SettingInitialWindowSize,
|
||||
Val: uint32(iwz)})
|
||||
}
|
||||
if err := framer.writeSettings(true, settings...); err != nil {
|
||||
if err := framer.writeSettings(true, isettings...); err != nil {
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
// Adjust the connection flow control window if needed.
|
||||
@ -199,6 +204,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
kep: kep,
|
||||
initialWindowSize: iwz,
|
||||
}
|
||||
if dynamicWindow {
|
||||
t.bdpEst = &bdpEstimator{
|
||||
bdp: initialWindowSize,
|
||||
updateFlowControl: t.updateFlowControl,
|
||||
}
|
||||
}
|
||||
if t.stats != nil {
|
||||
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
RemoteAddr: t.remoteAddr,
|
||||
@ -458,12 +469,34 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||
}
|
||||
}
|
||||
|
||||
// updateFlowControl updates the incoming flow control windows
|
||||
// for the transport and the stream based on the current bdp
|
||||
// estimation.
|
||||
func (t *http2Server) updateFlowControl(n uint32) {
|
||||
t.mu.Lock()
|
||||
for _, s := range t.activeStreams {
|
||||
s.fc.newLimit(n)
|
||||
}
|
||||
t.initialWindowSize = int32(n)
|
||||
t.mu.Unlock()
|
||||
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
|
||||
t.controlBuf.put(&settings{
|
||||
ack: false,
|
||||
ss: []http2.Setting{
|
||||
{
|
||||
ID: http2.SettingInitialWindowSize,
|
||||
Val: uint32(n),
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
size := f.Header().Length
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
grpclog.Printf("transport: http2Server %v", err)
|
||||
t.Close()
|
||||
return
|
||||
var sendBDPPing bool
|
||||
if t.bdpEst != nil {
|
||||
sendBDPPing = t.bdpEst.add(uint32(size))
|
||||
}
|
||||
// Decouple connection's flow control from application's read.
|
||||
// An update on connection's flow control should not depend on
|
||||
@ -473,8 +506,21 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||
// Decoupling the connection flow control will prevent other
|
||||
// active(fast) streams from starving in presence of slow or
|
||||
// inactive streams.
|
||||
if w := t.fc.onRead(uint32(size)); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, w, true})
|
||||
//
|
||||
// Furthermore, if a bdpPing is being sent out we can piggyback
|
||||
// connection's window update for the bytes we just received.
|
||||
if sendBDPPing {
|
||||
t.controlBuf.put(&windowUpdate{0, uint32(size), false})
|
||||
t.controlBuf.put(bdpPing)
|
||||
} else {
|
||||
if err := t.fc.onData(uint32(size)); err != nil {
|
||||
grpclog.Printf("transport: http2Server %v", err)
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
if w := t.fc.onRead(uint32(size)); w > 0 {
|
||||
t.controlBuf.put(&windowUpdate{0, w, true})
|
||||
}
|
||||
}
|
||||
// Select the right stream to dispatch.
|
||||
s, ok := t.getStream(f)
|
||||
@ -546,7 +592,11 @@ const (
|
||||
)
|
||||
|
||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||
if f.IsAck() { // Do nothing.
|
||||
if f.IsAck() {
|
||||
// Maybe it's a BDP ping.
|
||||
if t.bdpEst != nil {
|
||||
t.bdpEst.calculate(f.Data)
|
||||
}
|
||||
return
|
||||
}
|
||||
pingAck := &ping{ack: true}
|
||||
@ -778,13 +828,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
if writeHeaderFrame {
|
||||
t.WriteHeader(s, nil)
|
||||
}
|
||||
defer func() {
|
||||
if err == nil {
|
||||
// Reset ping strikes when sending data since this might cause
|
||||
// the peer to send ping.
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
}()
|
||||
r := bytes.NewBuffer(data)
|
||||
for {
|
||||
if r.Len() == 0 {
|
||||
@ -848,6 +891,9 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
|
||||
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
|
||||
forceFlush = true
|
||||
}
|
||||
// Reset ping strikes when sending data since this might cause
|
||||
// the peer to send ping.
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
|
||||
t.Close()
|
||||
return connectionErrorf(true, err, "transport: %v", err)
|
||||
@ -997,6 +1043,9 @@ func (t *http2Server) controller() {
|
||||
case *flushIO:
|
||||
t.framer.flushWrite()
|
||||
case *ping:
|
||||
if !i.ack {
|
||||
t.bdpEst.timesnap(i.data)
|
||||
}
|
||||
t.framer.writePing(true, i.ack, i.data)
|
||||
default:
|
||||
grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
|
||||
|
@ -1033,7 +1033,8 @@ func TestLargeMessageSuspension(t *testing.T) {
|
||||
t.Fatalf("failed to open stream: %v", err)
|
||||
}
|
||||
// Write should not be done successfully due to flow control.
|
||||
err = ct.Write(s, expectedRequestLarge, &Options{Last: true, Delay: false})
|
||||
msg := make([]byte, initialWindowSize*8)
|
||||
err = ct.Write(s, msg, &Options{Last: true, Delay: false})
|
||||
expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded)
|
||||
if err != expectedErr {
|
||||
t.Fatalf("Write got %v, want %v", err, expectedErr)
|
||||
@ -1426,8 +1427,8 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
||||
}
|
||||
ss.fc.mu.Unlock()
|
||||
}
|
||||
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen {
|
||||
t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0)
|
||||
if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 {
|
||||
t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, 0, 0)
|
||||
}
|
||||
// Keep sending until the server inbound window is drained for that stream.
|
||||
for sent <= initialWindowSize {
|
||||
@ -1447,16 +1448,18 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
||||
t.Fatalf("%v got status %v; want Code=%v", s, s.status, code)
|
||||
}
|
||||
|
||||
if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize {
|
||||
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
|
||||
}
|
||||
ct.CloseStream(s, nil)
|
||||
ct.Close()
|
||||
server.stop()
|
||||
}
|
||||
|
||||
func TestClientWithMisbehavedServer(t *testing.T) {
|
||||
server, ct := setUp(t, 0, math.MaxUint32, misbehaved)
|
||||
// Turn off BDP estimation so that the server can
|
||||
// violate stream window.
|
||||
connectOptions := ConnectOptions{
|
||||
InitialWindowSize: initialWindowSize,
|
||||
}
|
||||
server, ct := setUpWithOptions(t, 0, &ServerConfig{}, misbehaved, connectOptions)
|
||||
callHdr := &CallHdr{
|
||||
Host: "localhost",
|
||||
Method: "foo.Stream",
|
||||
@ -1481,8 +1484,8 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
|
||||
t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0)
|
||||
if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 {
|
||||
t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, %d, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, 0, 0)
|
||||
}
|
||||
|
||||
if err != io.EOF {
|
||||
@ -1493,9 +1496,6 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
||||
}
|
||||
|
||||
conn.CloseStream(s, err)
|
||||
if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize {
|
||||
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
|
||||
}
|
||||
ct.Close()
|
||||
server.stop()
|
||||
}
|
||||
@ -1686,7 +1686,10 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
|
||||
time.Sleep(time.Second)
|
||||
|
||||
waitWhileTrue(t, func() (bool, error) {
|
||||
if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) {
|
||||
st.fc.mu.Lock()
|
||||
lim := st.fc.limit
|
||||
st.fc.mu.Unlock()
|
||||
if lim != uint32(serverConfig.InitialConnWindowSize) {
|
||||
return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize)
|
||||
}
|
||||
return false, nil
|
||||
@ -1703,12 +1706,16 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
|
||||
t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize)
|
||||
}
|
||||
st.mu.Lock()
|
||||
if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) {
|
||||
t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize)
|
||||
}
|
||||
ssq := st.streamSendQuota
|
||||
st.mu.Unlock()
|
||||
if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) {
|
||||
t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize)
|
||||
if ssq != uint32(connectOptions.InitialWindowSize) {
|
||||
t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ssq, connectOptions.InitialWindowSize)
|
||||
}
|
||||
ct.fc.mu.Lock()
|
||||
limit := ct.fc.limit
|
||||
ct.fc.mu.Unlock()
|
||||
if limit != uint32(connectOptions.InitialConnWindowSize) {
|
||||
t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize)
|
||||
}
|
||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||
clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire())
|
||||
@ -1721,12 +1728,16 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
|
||||
t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize)
|
||||
}
|
||||
ct.mu.Lock()
|
||||
if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) {
|
||||
t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize)
|
||||
}
|
||||
ssq = ct.streamSendQuota
|
||||
ct.mu.Unlock()
|
||||
if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) {
|
||||
t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize)
|
||||
if ssq != uint32(serverConfig.InitialWindowSize) {
|
||||
t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ssq, serverConfig.InitialWindowSize)
|
||||
}
|
||||
cstream.fc.mu.Lock()
|
||||
limit = cstream.fc.limit
|
||||
cstream.fc.mu.Unlock()
|
||||
if limit != uint32(connectOptions.InitialWindowSize) {
|
||||
t.Fatalf("Client stream flow control window size is %v, want %v", limit, connectOptions.InitialWindowSize)
|
||||
}
|
||||
var sstream *Stream
|
||||
st.mu.Lock()
|
||||
@ -1734,8 +1745,11 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
|
||||
sstream = v
|
||||
}
|
||||
st.mu.Unlock()
|
||||
if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) {
|
||||
t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize)
|
||||
sstream.fc.mu.Lock()
|
||||
limit = sstream.fc.limit
|
||||
sstream.fc.mu.Unlock()
|
||||
if limit != uint32(serverConfig.InitialWindowSize) {
|
||||
t.Fatalf("Server stream flow control window size is %v, want %v", limit, serverConfig.InitialWindowSize)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1846,11 +1860,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
||||
}
|
||||
sstream.sendQuotaPool.add(serverStreamSendQuota)
|
||||
cstream.fc.mu.Lock()
|
||||
if uint32(serverStreamSendQuota) != cstream.fc.limit-cstream.fc.pendingUpdate {
|
||||
cstream.fc.mu.Unlock()
|
||||
return true, fmt.Errorf("server stream outflow: %v, estimated by client: %v", serverStreamSendQuota, cstream.fc.limit-cstream.fc.pendingUpdate)
|
||||
}
|
||||
clientEst := cstream.fc.limit - cstream.fc.pendingUpdate
|
||||
cstream.fc.mu.Unlock()
|
||||
if uint32(serverStreamSendQuota) != clientEst {
|
||||
return true, fmt.Errorf("server stream outflow: %v, estimated by client: %v", serverStreamSendQuota, clientEst)
|
||||
}
|
||||
|
||||
// Check flow control window on server stream is equal to out flow on client stream.
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
@ -1860,11 +1874,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
||||
}
|
||||
cstream.sendQuotaPool.add(clientStreamSendQuota)
|
||||
sstream.fc.mu.Lock()
|
||||
if uint32(clientStreamSendQuota) != sstream.fc.limit-sstream.fc.pendingUpdate {
|
||||
sstream.fc.mu.Unlock()
|
||||
return true, fmt.Errorf("client stream outflow: %v. estimated by server: %v", clientStreamSendQuota, sstream.fc.limit-sstream.fc.pendingUpdate)
|
||||
}
|
||||
serverEst := sstream.fc.limit - sstream.fc.pendingUpdate
|
||||
sstream.fc.mu.Unlock()
|
||||
if uint32(clientStreamSendQuota) != serverEst {
|
||||
return true, fmt.Errorf("client stream outflow: %v. estimated by server: %v", clientStreamSendQuota, serverEst)
|
||||
}
|
||||
|
||||
// Check flow control window on client transport is equal to out flow of server transport.
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
@ -1874,11 +1888,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
||||
}
|
||||
st.sendQuotaPool.add(serverTrSendQuota)
|
||||
ct.fc.mu.Lock()
|
||||
if uint32(serverTrSendQuota) != ct.fc.limit-ct.fc.pendingUpdate {
|
||||
ct.fc.mu.Unlock()
|
||||
return true, fmt.Errorf("server transport outflow: %v, estimated by client: %v", serverTrSendQuota, ct.fc.limit-ct.fc.pendingUpdate)
|
||||
}
|
||||
clientEst = ct.fc.limit - ct.fc.pendingUpdate
|
||||
ct.fc.mu.Unlock()
|
||||
if uint32(serverTrSendQuota) != clientEst {
|
||||
return true, fmt.Errorf("server transport outflow: %v, estimated by client: %v", serverTrSendQuota, clientEst)
|
||||
}
|
||||
|
||||
// Check flow control window on server transport is equal to out flow of client transport.
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
@ -1888,11 +1902,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
||||
}
|
||||
ct.sendQuotaPool.add(clientTrSendQuota)
|
||||
st.fc.mu.Lock()
|
||||
if uint32(clientTrSendQuota) != st.fc.limit-st.fc.pendingUpdate {
|
||||
st.fc.mu.Unlock()
|
||||
return true, fmt.Errorf("client transport outflow: %v, estimated by client: %v", clientTrSendQuota, st.fc.limit-st.fc.pendingUpdate)
|
||||
}
|
||||
serverEst = st.fc.limit - st.fc.pendingUpdate
|
||||
st.fc.mu.Unlock()
|
||||
if uint32(clientTrSendQuota) != serverEst {
|
||||
return true, fmt.Errorf("client transport outflow: %v, estimated by client: %v", clientTrSendQuota, serverEst)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
})
|
||||
|
Reference in New Issue
Block a user