diff --git a/transport/bdp_estimator.go b/transport/bdp_estimator.go new file mode 100644 index 00000000..643652ad --- /dev/null +++ b/transport/bdp_estimator.go @@ -0,0 +1,125 @@ +package transport + +import ( + "sync" + "time" +) + +const ( + // bdpLimit is the maximum value the flow control windows + // will be increased to. + bdpLimit = (1 << 20) * 4 + // alpha is a constant factor used to keep a moving average + // of RTTs. + alpha = 0.9 + // If the current bdp sample is greater than or equal to + // our beta * our estimated bdp and the current bandwidth + // sample is the maximum bandwidth observed so far, we + // increase our bbp estimate by a factor of gamma. + beta = 0.66 + // To put our bdp to be smaller than or equal to twice the real BDP, + // we should multiply our current sample with 4/3, however to round things out + // we use 2 as the multiplication factor. + gamma = 2 +) + +var ( + // Adding arbitrary data to ping so that its ack can be + // identified. + // Easter-egg: what does the ping message say? + bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}} +) + +type bdpEstimator struct { + // sentAt is the time when the ping was sent. + sentAt time.Time + + mu sync.Mutex + // bdp is the current bdp estimate. + bdp uint32 + // sample is the number of bytes received in one measurement cycle. + sample uint32 + // bwMax is the maximum bandwidth noted so far (bytes/sec). + bwMax float64 + // bool to keep track of the begining of a new measurement cycle. + isSent bool + // Callback to update the window sizes. + updateFlowControl func(n uint32) + // sampleCount is the number of samples taken so far. + sampleCount uint64 + // round trip time (seconds) + rtt float64 +} + +// timesnap registers the time bdp ping was sent out so that +// network rtt can be calculated when its ack is recieved. +// It is called (by controller) when the bdpPing is +// being written on the wire. +func (b *bdpEstimator) timesnap(d [8]byte) { + if bdpPing.data != d { + return + } + b.sentAt = time.Now() +} + +// add adds bytes to the current sample for calculating bdp. +// It returns true only if a ping must be sent. This can be used +// by the caller (handleData) to make decision about batching +// a window update with it. +func (b *bdpEstimator) add(n uint32) bool { + b.mu.Lock() + defer b.mu.Unlock() + if b.bdp == bdpLimit { + return false + } + if !b.isSent { + b.isSent = true + b.sample = n + b.sentAt = time.Time{} + b.sampleCount++ + return true + } + b.sample += n + return false +} + +// calculate is called when an ack for a bdp ping is received. +// Here we calculate the current bdp and bandwidth sample and +// decide if the flow control windows should go up. +func (b *bdpEstimator) calculate(d [8]byte) { + // Check if the ping acked for was the bdp ping. + if bdpPing.data != d { + return + } + b.mu.Lock() + rttSample := time.Since(b.sentAt).Seconds() + if b.sampleCount < 10 { + // Bootstrap rtt with an average of first 10 rtt samples. + b.rtt += (rttSample - b.rtt) / float64(b.sampleCount) + } else { + // Heed to the recent past more. + b.rtt += (rttSample - b.rtt) * float64(alpha) + } + b.isSent = false + // The number of bytes accumalated so far in the sample is smaller + // than or equal to 1.5 times the real BDP on a saturated connection. + bwCurrent := float64(b.sample) / (b.rtt * float64(1.5)) + if bwCurrent > b.bwMax { + b.bwMax = bwCurrent + } + // If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is + // greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we + // should update our perception of the network BDP. + if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit { + sampleFloat := float64(b.sample) + b.bdp = uint32(gamma * sampleFloat) + if b.bdp > bdpLimit { + b.bdp = bdpLimit + } + bdp := b.bdp + b.mu.Unlock() + b.updateFlowControl(bdp) + return + } + b.mu.Unlock() +} diff --git a/transport/control.go b/transport/control.go index dc9276eb..de9420a3 100644 --- a/transport/control.go +++ b/transport/control.go @@ -31,8 +31,7 @@ const ( // The default value of flow control window size in HTTP2 spec. defaultWindowSize = 65535 // The initial window size for flow control. - initialWindowSize = defaultWindowSize // for an RPC - initialConnWindowSize = defaultWindowSize * 16 // for a connection + initialWindowSize = defaultWindowSize // for an RPC infinity = time.Duration(math.MaxInt64) defaultClientKeepaliveTime = infinity defaultClientKeepaliveTimeout = time.Duration(20 * time.Second) @@ -145,10 +144,9 @@ func (qb *quotaPool) acquire() <-chan int { // inFlow deals with inbound flow control type inFlow struct { + mu sync.Mutex // The inbound flow control limit for pending data. limit uint32 - - mu sync.Mutex // pendingData is the overall data which have been received but not been // consumed by applications. pendingData uint32 @@ -160,6 +158,16 @@ type inFlow struct { delta uint32 } +// newLimit updates the inflow window to a new value n. +// It assumes that n is always greater than the old limit. +func (f *inFlow) newLimit(n uint32) uint32 { + f.mu.Lock() + defer f.mu.Unlock() + d := n - f.limit + f.limit = n + return d +} + func (f *inFlow) maybeAdjust(n uint32) uint32 { if n > uint32(math.MaxInt32) { n = uint32(math.MaxInt32) diff --git a/transport/http2_client.go b/transport/http2_client.go index 09bc4f4a..094bb5c0 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -99,6 +99,8 @@ type http2Client struct { initialWindowSize int32 + bdpEst *bdpEstimator + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -191,9 +193,11 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if kp.Timeout == 0 { kp.Timeout = defaultClientKeepaliveTimeout } - icwz := int32(initialConnWindowSize) + dynamicWindow := true + icwz := int32(initialWindowSize) if opts.InitialConnWindowSize >= defaultWindowSize { icwz = opts.InitialConnWindowSize + dynamicWindow = false } var buf bytes.Buffer t := &http2Client{ @@ -232,6 +236,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } if opts.InitialWindowSize >= defaultWindowSize { t.initialWindowSize = opts.InitialWindowSize + dynamicWindow = false + } + if dynamicWindow { + t.bdpEst = &bdpEstimator{ + bdp: initialWindowSize, + updateFlowControl: t.updateFlowControl, + } } // Make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. @@ -827,11 +838,33 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { } } +// updateFlowControl updates the incoming flow control windows +// for the transport and the stream based on the current bdp +// estimation. +func (t *http2Client) updateFlowControl(n uint32) { + t.mu.Lock() + for _, s := range t.activeStreams { + s.fc.newLimit(n) + } + t.initialWindowSize = int32(n) + t.mu.Unlock() + t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) + t.controlBuf.put(&settings{ + ack: false, + ss: []http2.Setting{ + { + ID: http2.SettingInitialWindowSize, + Val: uint32(n), + }, + }, + }) +} + func (t *http2Client) handleData(f *http2.DataFrame) { size := f.Header().Length - if err := t.fc.onData(uint32(size)); err != nil { - t.notifyError(connectionErrorf(true, err, "%v", err)) - return + var sendBDPPing bool + if t.bdpEst != nil { + sendBDPPing = t.bdpEst.add(uint32(size)) } // Decouple connection's flow control from application's read. // An update on connection's flow control should not depend on @@ -841,8 +874,20 @@ func (t *http2Client) handleData(f *http2.DataFrame) { // Decoupling the connection flow control will prevent other // active(fast) streams from starving in presence of slow or // inactive streams. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w, true}) + // + // Furthermore, if a bdpPing is being sent out we can piggyback + // connection's window update for the bytes we just received. + if sendBDPPing { + t.controlBuf.put(&windowUpdate{0, uint32(size), false}) + t.controlBuf.put(bdpPing) + } else { + if err := t.fc.onData(uint32(size)); err != nil { + t.notifyError(connectionErrorf(true, err, "%v", err)) + return + } + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w, true}) + } } // Select the right stream to dispatch. s, ok := t.getStream(f) @@ -930,7 +975,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) { } func (t *http2Client) handlePing(f *http2.PingFrame) { - if f.IsAck() { // Do nothing. + if f.IsAck() { + // Maybe it's a BDP ping. + if t.bdpEst != nil { + t.bdpEst.calculate(f.Data) + } return } pingAck := &ping{ack: true} @@ -1202,6 +1251,9 @@ func (t *http2Client) controller() { case *flushIO: t.framer.flushWrite() case *ping: + if !i.ack { + t.bdpEst.timesnap(i.data) + } t.framer.writePing(true, i.ack, i.data) default: grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) diff --git a/transport/http2_server.go b/transport/http2_server.go index 2c6e4bca..d188bd88 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -100,6 +100,8 @@ type http2Server struct { initialWindowSize int32 + bdpEst *bdpEstimator + mu sync.Mutex // guard the following state transportState activeStreams map[uint32]*Stream @@ -117,32 +119,35 @@ type http2Server struct { func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { framer := newFramer(conn) // Send initial settings as connection preface to client. - var settings []http2.Setting + var isettings []http2.Setting // TODO(zhaoq): Have a better way to signal "no limit" because 0 is // permitted in the HTTP2 spec. maxStreams := config.MaxStreams if maxStreams == 0 { maxStreams = math.MaxUint32 } else { - settings = append(settings, http2.Setting{ + isettings = append(isettings, http2.Setting{ ID: http2.SettingMaxConcurrentStreams, Val: maxStreams, }) } + dynamicWindow := true iwz := int32(initialWindowSize) if config.InitialWindowSize >= defaultWindowSize { iwz = config.InitialWindowSize + dynamicWindow = false } - icwz := int32(initialConnWindowSize) + icwz := int32(initialWindowSize) if config.InitialConnWindowSize >= defaultWindowSize { icwz = config.InitialConnWindowSize + dynamicWindow = false } if iwz != defaultWindowSize { - settings = append(settings, http2.Setting{ + isettings = append(isettings, http2.Setting{ ID: http2.SettingInitialWindowSize, Val: uint32(iwz)}) } - if err := framer.writeSettings(true, settings...); err != nil { + if err := framer.writeSettings(true, isettings...); err != nil { return nil, connectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. @@ -199,6 +204,12 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err kep: kep, initialWindowSize: iwz, } + if dynamicWindow { + t.bdpEst = &bdpEstimator{ + bdp: initialWindowSize, + updateFlowControl: t.updateFlowControl, + } + } if t.stats != nil { t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, @@ -458,12 +469,34 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) { } } +// updateFlowControl updates the incoming flow control windows +// for the transport and the stream based on the current bdp +// estimation. +func (t *http2Server) updateFlowControl(n uint32) { + t.mu.Lock() + for _, s := range t.activeStreams { + s.fc.newLimit(n) + } + t.initialWindowSize = int32(n) + t.mu.Unlock() + t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) + t.controlBuf.put(&settings{ + ack: false, + ss: []http2.Setting{ + { + ID: http2.SettingInitialWindowSize, + Val: uint32(n), + }, + }, + }) + +} + func (t *http2Server) handleData(f *http2.DataFrame) { size := f.Header().Length - if err := t.fc.onData(uint32(size)); err != nil { - grpclog.Printf("transport: http2Server %v", err) - t.Close() - return + var sendBDPPing bool + if t.bdpEst != nil { + sendBDPPing = t.bdpEst.add(uint32(size)) } // Decouple connection's flow control from application's read. // An update on connection's flow control should not depend on @@ -473,8 +506,21 @@ func (t *http2Server) handleData(f *http2.DataFrame) { // Decoupling the connection flow control will prevent other // active(fast) streams from starving in presence of slow or // inactive streams. - if w := t.fc.onRead(uint32(size)); w > 0 { - t.controlBuf.put(&windowUpdate{0, w, true}) + // + // Furthermore, if a bdpPing is being sent out we can piggyback + // connection's window update for the bytes we just received. + if sendBDPPing { + t.controlBuf.put(&windowUpdate{0, uint32(size), false}) + t.controlBuf.put(bdpPing) + } else { + if err := t.fc.onData(uint32(size)); err != nil { + grpclog.Printf("transport: http2Server %v", err) + t.Close() + return + } + if w := t.fc.onRead(uint32(size)); w > 0 { + t.controlBuf.put(&windowUpdate{0, w, true}) + } } // Select the right stream to dispatch. s, ok := t.getStream(f) @@ -546,7 +592,11 @@ const ( ) func (t *http2Server) handlePing(f *http2.PingFrame) { - if f.IsAck() { // Do nothing. + if f.IsAck() { + // Maybe it's a BDP ping. + if t.bdpEst != nil { + t.bdpEst.calculate(f.Data) + } return } pingAck := &ping{ack: true} @@ -778,13 +828,6 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { if writeHeaderFrame { t.WriteHeader(s, nil) } - defer func() { - if err == nil { - // Reset ping strikes when sending data since this might cause - // the peer to send ping. - atomic.StoreUint32(&t.resetPingStrikes, 1) - } - }() r := bytes.NewBuffer(data) for { if r.Len() == 0 { @@ -848,6 +891,9 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { forceFlush = true } + // Reset ping strikes when sending data since this might cause + // the peer to send ping. + atomic.StoreUint32(&t.resetPingStrikes, 1) if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { t.Close() return connectionErrorf(true, err, "transport: %v", err) @@ -997,6 +1043,9 @@ func (t *http2Server) controller() { case *flushIO: t.framer.flushWrite() case *ping: + if !i.ack { + t.bdpEst.timesnap(i.data) + } t.framer.writePing(true, i.ack, i.data) default: grpclog.Printf("transport: http2Server.controller got unexpected item type %v\n", i) diff --git a/transport/transport_test.go b/transport/transport_test.go index ec3f75c0..86104788 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1033,7 +1033,8 @@ func TestLargeMessageSuspension(t *testing.T) { t.Fatalf("failed to open stream: %v", err) } // Write should not be done successfully due to flow control. - err = ct.Write(s, expectedRequestLarge, &Options{Last: true, Delay: false}) + msg := make([]byte, initialWindowSize*8) + err = ct.Write(s, msg, &Options{Last: true, Delay: false}) expectedErr := streamErrorf(codes.DeadlineExceeded, "%v", context.DeadlineExceeded) if err != expectedErr { t.Fatalf("Write got %v, want %v", err, expectedErr) @@ -1426,8 +1427,8 @@ func TestServerWithMisbehavedClient(t *testing.T) { } ss.fc.mu.Unlock() } - if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != http2MaxFrameLen { - t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, http2MaxFrameLen, 0) + if ss.fc.pendingData != http2MaxFrameLen || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 { + t.Fatalf("Server mistakenly updates inbound flow control params: got %d, %d, %d, %d; want %d, %d, %d, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, http2MaxFrameLen, 0, 0, 0) } // Keep sending until the server inbound window is drained for that stream. for sent <= initialWindowSize { @@ -1447,16 +1448,18 @@ func TestServerWithMisbehavedClient(t *testing.T) { t.Fatalf("%v got status %v; want Code=%v", s, s.status, code) } - if sc.fc.pendingData != 0 || sc.fc.pendingUpdate <= initialWindowSize { - t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize) - } ct.CloseStream(s, nil) ct.Close() server.stop() } func TestClientWithMisbehavedServer(t *testing.T) { - server, ct := setUp(t, 0, math.MaxUint32, misbehaved) + // Turn off BDP estimation so that the server can + // violate stream window. + connectOptions := ConnectOptions{ + InitialWindowSize: initialWindowSize, + } + server, ct := setUpWithOptions(t, 0, &ServerConfig{}, misbehaved, connectOptions) callHdr := &CallHdr{ Host: "localhost", Method: "foo.Stream", @@ -1481,8 +1484,8 @@ func TestClientWithMisbehavedServer(t *testing.T) { break } } - if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize { - t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, >%d, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, initialWindowSize, 0) + if s.fc.pendingData <= initialWindowSize || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 { + t.Fatalf("Client mistakenly updates inbound flow control params: got %d, %d, %d, %d; want >%d, %d, %d, >%d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize, 0, 0, 0) } if err != io.EOF { @@ -1493,9 +1496,6 @@ func TestClientWithMisbehavedServer(t *testing.T) { } conn.CloseStream(s, err) - if conn.fc.pendingData != 0 || conn.fc.pendingUpdate <= initialWindowSize { - t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d; want 0, >%d", conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize) - } ct.Close() server.stop() } @@ -1686,7 +1686,10 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { time.Sleep(time.Second) waitWhileTrue(t, func() (bool, error) { - if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) { + st.fc.mu.Lock() + lim := st.fc.limit + st.fc.mu.Unlock() + if lim != uint32(serverConfig.InitialConnWindowSize) { return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize) } return false, nil @@ -1703,12 +1706,16 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize) } st.mu.Lock() - if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) { - t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize) - } + ssq := st.streamSendQuota st.mu.Unlock() - if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) { - t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize) + if ssq != uint32(connectOptions.InitialWindowSize) { + t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ssq, connectOptions.InitialWindowSize) + } + ct.fc.mu.Lock() + limit := ct.fc.limit + ct.fc.mu.Unlock() + if limit != uint32(connectOptions.InitialConnWindowSize) { + t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize) } ctx, cancel = context.WithTimeout(context.Background(), time.Second) clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire()) @@ -1721,12 +1728,16 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize) } ct.mu.Lock() - if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) { - t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize) - } + ssq = ct.streamSendQuota ct.mu.Unlock() - if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) { - t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize) + if ssq != uint32(serverConfig.InitialWindowSize) { + t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ssq, serverConfig.InitialWindowSize) + } + cstream.fc.mu.Lock() + limit = cstream.fc.limit + cstream.fc.mu.Unlock() + if limit != uint32(connectOptions.InitialWindowSize) { + t.Fatalf("Client stream flow control window size is %v, want %v", limit, connectOptions.InitialWindowSize) } var sstream *Stream st.mu.Lock() @@ -1734,8 +1745,11 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { sstream = v } st.mu.Unlock() - if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) { - t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize) + sstream.fc.mu.Lock() + limit = sstream.fc.limit + sstream.fc.mu.Unlock() + if limit != uint32(serverConfig.InitialWindowSize) { + t.Fatalf("Server stream flow control window size is %v, want %v", limit, serverConfig.InitialWindowSize) } } @@ -1846,11 +1860,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) { } sstream.sendQuotaPool.add(serverStreamSendQuota) cstream.fc.mu.Lock() - if uint32(serverStreamSendQuota) != cstream.fc.limit-cstream.fc.pendingUpdate { - cstream.fc.mu.Unlock() - return true, fmt.Errorf("server stream outflow: %v, estimated by client: %v", serverStreamSendQuota, cstream.fc.limit-cstream.fc.pendingUpdate) - } + clientEst := cstream.fc.limit - cstream.fc.pendingUpdate cstream.fc.mu.Unlock() + if uint32(serverStreamSendQuota) != clientEst { + return true, fmt.Errorf("server stream outflow: %v, estimated by client: %v", serverStreamSendQuota, clientEst) + } // Check flow control window on server stream is equal to out flow on client stream. ctx, _ = context.WithTimeout(context.Background(), time.Second) @@ -1860,11 +1874,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) { } cstream.sendQuotaPool.add(clientStreamSendQuota) sstream.fc.mu.Lock() - if uint32(clientStreamSendQuota) != sstream.fc.limit-sstream.fc.pendingUpdate { - sstream.fc.mu.Unlock() - return true, fmt.Errorf("client stream outflow: %v. estimated by server: %v", clientStreamSendQuota, sstream.fc.limit-sstream.fc.pendingUpdate) - } + serverEst := sstream.fc.limit - sstream.fc.pendingUpdate sstream.fc.mu.Unlock() + if uint32(clientStreamSendQuota) != serverEst { + return true, fmt.Errorf("client stream outflow: %v. estimated by server: %v", clientStreamSendQuota, serverEst) + } // Check flow control window on client transport is equal to out flow of server transport. ctx, _ = context.WithTimeout(context.Background(), time.Second) @@ -1874,11 +1888,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) { } st.sendQuotaPool.add(serverTrSendQuota) ct.fc.mu.Lock() - if uint32(serverTrSendQuota) != ct.fc.limit-ct.fc.pendingUpdate { - ct.fc.mu.Unlock() - return true, fmt.Errorf("server transport outflow: %v, estimated by client: %v", serverTrSendQuota, ct.fc.limit-ct.fc.pendingUpdate) - } + clientEst = ct.fc.limit - ct.fc.pendingUpdate ct.fc.mu.Unlock() + if uint32(serverTrSendQuota) != clientEst { + return true, fmt.Errorf("server transport outflow: %v, estimated by client: %v", serverTrSendQuota, clientEst) + } // Check flow control window on server transport is equal to out flow of client transport. ctx, _ = context.WithTimeout(context.Background(), time.Second) @@ -1888,11 +1902,11 @@ func TestAccountCheckExpandingWindow(t *testing.T) { } ct.sendQuotaPool.add(clientTrSendQuota) st.fc.mu.Lock() - if uint32(clientTrSendQuota) != st.fc.limit-st.fc.pendingUpdate { - st.fc.mu.Unlock() - return true, fmt.Errorf("client transport outflow: %v, estimated by client: %v", clientTrSendQuota, st.fc.limit-st.fc.pendingUpdate) - } + serverEst = st.fc.limit - st.fc.pendingUpdate st.fc.mu.Unlock() + if uint32(clientTrSendQuota) != serverEst { + return true, fmt.Errorf("client transport outflow: %v, estimated by client: %v", clientTrSendQuota, serverEst) + } return false, nil })