From f3b5bf53ce10aed1cf288098af846095bf5f5233 Mon Sep 17 00:00:00 2001 From: MakMukhi Date: Fri, 5 May 2017 13:26:56 -0700 Subject: [PATCH] Make window size configurable. (#1210) * Make window size configurable. --- clientconn.go | 16 ++++ server.go | 60 ++++++++++----- test/end2end_test.go | 131 +++++++++++++++++++++++++++---- transport/http2_client.go | 60 +++++++++------ transport/http2_server.go | 63 ++++++++------- transport/transport.go | 18 +++-- transport/transport_test.go | 150 ++++++++++++++++++++++++++++++++++++ 7 files changed, 407 insertions(+), 91 deletions(-) diff --git a/clientconn.go b/clientconn.go index 0b09b3fd..434cab43 100644 --- a/clientconn.go +++ b/clientconn.go @@ -105,6 +105,22 @@ const defaultClientMaxMsgSize = math.MaxInt32 // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. +// The lower bound for window size is 64K and any value smaller than that will be ignored. +func WithInitialWindowSize(s int32) DialOption { + return func(o *dialOptions) { + o.copts.InitialWindowSize = s + } +} + +// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. +// The lower bound for window size is 64K and any value smaller than that will be ignored. +func WithInitialConnWindowSize(s int32) DialOption { + return func(o *dialOptions) { + o.copts.InitialConnWindowSize = s + } +} + // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. func WithMaxMsgSize(s int) DialOption { return func(o *dialOptions) { diff --git a/server.go b/server.go index 54085eee..8bae2985 100644 --- a/server.go +++ b/server.go @@ -107,20 +107,22 @@ type Server struct { } type options struct { - creds credentials.TransportCredentials - codec Codec - cp Compressor - dc Decompressor - maxMsgSize int - unaryInt UnaryServerInterceptor - streamInt StreamServerInterceptor - inTapHandle tap.ServerInHandle - statsHandler stats.Handler - maxConcurrentStreams uint32 - useHandlerImpl bool // use http.Handler-based server - unknownStreamDesc *StreamDesc - keepaliveParams keepalive.ServerParameters - keepalivePolicy keepalive.EnforcementPolicy + creds credentials.TransportCredentials + codec Codec + cp Compressor + dc Decompressor + maxMsgSize int + unaryInt UnaryServerInterceptor + streamInt StreamServerInterceptor + inTapHandle tap.ServerInHandle + statsHandler stats.Handler + maxConcurrentStreams uint32 + useHandlerImpl bool // use http.Handler-based server + unknownStreamDesc *StreamDesc + keepaliveParams keepalive.ServerParameters + keepalivePolicy keepalive.EnforcementPolicy + initialWindowSize int32 + initialConnWindowSize int32 } var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit @@ -128,6 +130,22 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption func(*options) +// InitialWindowSize returns a ServerOption that sets window size for stream. +// The lower bound for window size is 64K and any value smaller than that will be ignored. +func InitialWindowSize(s int32) ServerOption { + return func(o *options) { + o.initialWindowSize = s + } +} + +// InitialConnWindowSize returns a ServerOption that sets window size for a connection. +// The lower bound for window size is 64K and any value smaller than that will be ignored. +func InitialConnWindowSize(s int32) ServerOption { + return func(o *options) { + o.initialConnWindowSize = s + } +} + // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { return func(o *options) { @@ -483,12 +501,14 @@ func (s *Server) handleRawConn(rawConn net.Conn) { // transport.NewServerTransport). func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { config := &transport.ServerConfig{ - MaxStreams: s.opts.maxConcurrentStreams, - AuthInfo: authInfo, - InTapHandle: s.opts.inTapHandle, - StatsHandler: s.opts.statsHandler, - KeepaliveParams: s.opts.keepaliveParams, - KeepalivePolicy: s.opts.keepalivePolicy, + MaxStreams: s.opts.maxConcurrentStreams, + AuthInfo: authInfo, + InTapHandle: s.opts.inTapHandle, + StatsHandler: s.opts.statsHandler, + KeepaliveParams: s.opts.keepaliveParams, + KeepalivePolicy: s.opts.keepalivePolicy, + InitialWindowSize: s.opts.initialWindowSize, + InitialConnWindowSize: s.opts.initialConnWindowSize, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index 7c835919..01b3e4f7 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -431,20 +431,24 @@ type test struct { cancel context.CancelFunc // Configurable knobs, after newTest returns: - testServer testpb.TestServiceServer // nil means none - healthServer *health.Server // nil means disabled - maxStream uint32 - tapHandle tap.ServerInHandle - maxMsgSize int - userAgent string - clientCompression bool - serverCompression bool - unaryClientInt grpc.UnaryClientInterceptor - streamClientInt grpc.StreamClientInterceptor - unaryServerInt grpc.UnaryServerInterceptor - streamServerInt grpc.StreamServerInterceptor - unknownHandler grpc.StreamHandler - sc <-chan grpc.ServiceConfig + testServer testpb.TestServiceServer // nil means none + healthServer *health.Server // nil means disabled + maxStream uint32 + tapHandle tap.ServerInHandle + maxMsgSize int + userAgent string + clientCompression bool + serverCompression bool + unaryClientInt grpc.UnaryClientInterceptor + streamClientInt grpc.StreamClientInterceptor + unaryServerInt grpc.UnaryServerInterceptor + streamServerInt grpc.StreamServerInterceptor + unknownHandler grpc.StreamHandler + sc <-chan grpc.ServiceConfig + serverInitialWindowSize int32 + serverInitialConnWindowSize int32 + clientInitialWindowSize int32 + clientInitialConnWindowSize int32 // srv and srvAddr are set once startServer is called. srv *grpc.Server @@ -512,6 +516,12 @@ func (te *test) startServer(ts testpb.TestServiceServer) { if te.unknownHandler != nil { sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) } + if te.serverInitialWindowSize > 0 { + sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) + } + if te.serverInitialConnWindowSize > 0 { + sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) + } la := "localhost:0" switch te.e.network { case "unix": @@ -605,6 +615,12 @@ func (te *test) clientConn() *grpc.ClientConn { if te.e.balancer { opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) } + if te.clientInitialWindowSize > 0 { + opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) + } + if te.clientInitialConnWindowSize > 0 { + opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) + } var err error te.cc, err = grpc.Dial(te.srvAddr, opts...) if err != nil { @@ -3881,3 +3897,90 @@ func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err) } } + +type windowSizeConfig struct { + serverStream int32 + serverConn int32 + clientStream int32 + clientConn int32 +} + +func max(a, b int32) int32 { + if a > b { + return a + } + return b +} + +func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { + defer leakCheck(t)() + wc := windowSizeConfig{ + serverStream: 8 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, + clientConn: 8 * 1024 * 1024, + } + for _, e := range listTestEnv() { + testConfigurableWindowSize(t, e, wc) + } +} + +func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { + defer leakCheck(t)() + wc := windowSizeConfig{ + serverStream: 1, + serverConn: 1, + clientStream: 1, + clientConn: 1, + } + for _, e := range listTestEnv() { + testConfigurableWindowSize(t, e, wc) + } +} + +func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { + te := newTest(t, e) + te.serverInitialWindowSize = wc.serverStream + te.serverInitialConnWindowSize = wc.serverConn + te.clientInitialWindowSize = wc.clientStream + te.clientInitialConnWindowSize = wc.clientConn + + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + stream, err := tc.FullDuplexCall(context.Background()) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + numOfIter := 11 + // Set message size to exhaust largest of window sizes. + messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1) + messageSize = max(messageSize, 64*1024) + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize) + if err != nil { + t.Fatal(err) + } + respParams := []*testpb.ResponseParameters{ + { + Size: proto.Int32(messageSize), + }, + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParams, + Payload: payload, + } + for i := 0; i < numOfIter; i++ { + if err := stream.Send(req); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, req, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) + } + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("%v.CloseSend() = %v, want ", stream, err) + } +} diff --git a/transport/http2_client.go b/transport/http2_client.go index 380fff66..bc202df2 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -110,6 +110,8 @@ type http2Client struct { statsHandler stats.Handler + initialWindowSize int32 + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -198,6 +200,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if kp.Timeout == 0 { kp.Timeout = defaultClientKeepaliveTimeout } + icwz := int32(initialConnWindowSize) + if opts.InitialConnWindowSize >= defaultWindowSize { + icwz = opts.InitialConnWindowSize + } var buf bytes.Buffer t := &http2Client{ ctx: ctx, @@ -209,27 +215,31 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( localAddr: conn.LocalAddr(), authInfo: authInfo, // The client initiated stream id is odd starting from 1. - nextID: 1, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - errorChan: make(chan struct{}), - goAway: make(chan struct{}), - awakenKeepalive: make(chan struct{}, 1), - framer: newFramer(conn), - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: initialConnWindowSize}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - scheme: scheme, - state: reachable, - activeStreams: make(map[uint32]*Stream), - creds: opts.PerRPCCredentials, - maxStreams: defaultMaxStreamsClient, - streamsQuota: newQuotaPool(defaultMaxStreamsClient), - streamSendQuota: defaultWindowSize, - kp: kp, - statsHandler: opts.StatsHandler, + nextID: 1, + writableChan: make(chan int, 1), + shutdownChan: make(chan struct{}), + errorChan: make(chan struct{}), + goAway: make(chan struct{}), + awakenKeepalive: make(chan struct{}, 1), + framer: newFramer(conn), + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + controlBuf: newRecvBuffer(), + fc: &inFlow{limit: uint32(icwz)}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + scheme: scheme, + state: reachable, + activeStreams: make(map[uint32]*Stream), + creds: opts.PerRPCCredentials, + maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), + streamSendQuota: defaultWindowSize, + kp: kp, + statsHandler: opts.StatsHandler, + initialWindowSize: initialWindowSize, + } + if opts.InitialWindowSize >= defaultWindowSize { + t.initialWindowSize = opts.InitialWindowSize } // Make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. @@ -258,10 +268,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( t.Close() return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } - if initialWindowSize != defaultWindowSize { + if t.initialWindowSize != defaultWindowSize { err = t.framer.writeSettings(true, http2.Setting{ ID: http2.SettingInitialWindowSize, - Val: uint32(initialWindowSize), + Val: uint32(t.initialWindowSize), }) } else { err = t.framer.writeSettings(true) @@ -271,7 +281,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( return nil, connectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. - if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if delta := uint32(icwz - defaultWindowSize); delta > 0 { if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { t.Close() return nil, connectionErrorf(true, err, "transport: %v", err) @@ -294,7 +304,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), - fc: &inFlow{limit: initialWindowSize}, + fc: &inFlow{limit: uint32(t.initialWindowSize)}, sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), } diff --git a/transport/http2_server.go b/transport/http2_server.go index 14cd19c6..94bd4b57 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -113,6 +113,8 @@ type http2Server struct { // 1 means yes. resetPingStrikes uint32 // Accessed atomically. + initialWindowSize int32 + mu sync.Mutex // guard the following state transportState activeStreams map[uint32]*Stream @@ -142,16 +144,24 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err Val: maxStreams, }) } - if initialWindowSize != defaultWindowSize { + iwz := int32(initialWindowSize) + if config.InitialWindowSize >= defaultWindowSize { + iwz = config.InitialWindowSize + } + icwz := int32(initialConnWindowSize) + if config.InitialConnWindowSize >= defaultWindowSize { + icwz = config.InitialConnWindowSize + } + if iwz != defaultWindowSize { settings = append(settings, http2.Setting{ ID: http2.SettingInitialWindowSize, - Val: uint32(initialWindowSize)}) + Val: uint32(iwz)}) } if err := framer.writeSettings(true, settings...); err != nil { return nil, connectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. - if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if delta := uint32(icwz - defaultWindowSize); delta > 0 { if err := framer.writeWindowUpdate(true, 0, delta); err != nil { return nil, connectionErrorf(true, err, "transport: %v", err) } @@ -180,28 +190,29 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err } var buf bytes.Buffer t := &http2Server{ - ctx: context.Background(), - conn: conn, - remoteAddr: conn.RemoteAddr(), - localAddr: conn.LocalAddr(), - authInfo: config.AuthInfo, - framer: framer, - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - maxStreams: maxStreams, - inTapHandle: config.InTapHandle, - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: initialConnWindowSize}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - state: reachable, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - activeStreams: make(map[uint32]*Stream), - streamSendQuota: defaultWindowSize, - stats: config.StatsHandler, - kp: kp, - idle: time.Now(), - kep: kep, + ctx: context.Background(), + conn: conn, + remoteAddr: conn.RemoteAddr(), + localAddr: conn.LocalAddr(), + authInfo: config.AuthInfo, + framer: framer, + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + maxStreams: maxStreams, + inTapHandle: config.InTapHandle, + controlBuf: newRecvBuffer(), + fc: &inFlow{limit: uint32(icwz)}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + state: reachable, + writableChan: make(chan int, 1), + shutdownChan: make(chan struct{}), + activeStreams: make(map[uint32]*Stream), + streamSendQuota: defaultWindowSize, + stats: config.StatsHandler, + kp: kp, + idle: time.Now(), + kep: kep, + initialWindowSize: iwz, } if t.stats != nil { t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ @@ -224,7 +235,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( id: frame.Header().StreamID, st: t, buf: buf, - fc: &inFlow{limit: initialWindowSize}, + fc: &inFlow{limit: uint32(t.initialWindowSize)}, } var state decodeState diff --git a/transport/transport.go b/transport/transport.go index 88c2c987..c22333cf 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -392,12 +392,14 @@ const ( // ServerConfig consists of all the configurations to establish a server transport. type ServerConfig struct { - MaxStreams uint32 - AuthInfo credentials.AuthInfo - InTapHandle tap.ServerInHandle - StatsHandler stats.Handler - KeepaliveParams keepalive.ServerParameters - KeepalivePolicy keepalive.EnforcementPolicy + MaxStreams uint32 + AuthInfo credentials.AuthInfo + InTapHandle tap.ServerInHandle + StatsHandler stats.Handler + KeepaliveParams keepalive.ServerParameters + KeepalivePolicy keepalive.EnforcementPolicy + InitialWindowSize int32 + InitialConnWindowSize int32 } // NewServerTransport creates a ServerTransport with conn or non-nil error @@ -425,6 +427,10 @@ type ConnectOptions struct { KeepaliveParams keepalive.ClientParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler + // InitialWindowSize sets the intial window size for a stream. + InitialWindowSize int32 + // InitialConnWindowSize sets the intial window size for a connection. + InitialConnWindowSize int32 } // TargetInfo contains the information of the target such as network address and metadata. diff --git a/transport/transport_test.go b/transport/transport_test.go index 4e986e56..7429f2e2 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1266,3 +1266,153 @@ func TestContextErr(t *testing.T) { } } } + +type windowSizeConfig struct { + serverStream int32 + serverConn int32 + clientStream int32 + clientConn int32 +} + +func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) { + wc := windowSizeConfig{ + serverStream: 10 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, + clientConn: 8 * 1024 * 1024, + } + testAccountCheckWindowSize(t, wc) +} + +func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) { + wc := windowSizeConfig{ + serverStream: defaultWindowSize, + // Note this is smaller than initialConnWindowSize which is the current default. + serverConn: defaultWindowSize, + clientStream: defaultWindowSize, + clientConn: defaultWindowSize, + } + testAccountCheckWindowSize(t, wc) +} + +func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { + serverConfig := &ServerConfig{ + InitialWindowSize: wc.serverStream, + InitialConnWindowSize: wc.serverConn, + } + connectOptions := ConnectOptions{ + InitialWindowSize: wc.clientStream, + InitialConnWindowSize: wc.clientConn, + } + server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions) + defer server.stop() + defer client.Close() + + // Wait for server conns to be populated with new server transport. + waitWhileTrue(t, func() (bool, error) { + server.mu.Lock() + defer server.mu.Unlock() + if len(server.conns) == 0 { + return true, fmt.Errorf("timed out waiting for server transport to be created") + } + return false, nil + }) + var st *http2Server + server.mu.Lock() + for k := range server.conns { + st = k.(*http2Server) + } + server.mu.Unlock() + ct := client.(*http2Client) + cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Failed to create stream. Err: %v", err) + } + // Wait for server to receive headers. + waitWhileTrue(t, func() (bool, error) { + st.mu.Lock() + defer st.mu.Unlock() + if len(st.activeStreams) == 0 { + return true, fmt.Errorf("timed out waiting for server to receive headers") + } + return false, nil + }) + // Sleeping to make sure the settings are applied in case of negative test. + time.Sleep(time.Second) + + waitWhileTrue(t, func() (bool, error) { + if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) { + return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize) + } + return false, nil + }) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire()) + if err != nil { + t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err) + } + cancel() + st.sendQuotaPool.add(serverSendQuota) + if serverSendQuota != int(connectOptions.InitialConnWindowSize) { + t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize) + } + st.mu.Lock() + if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) { + t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize) + } + st.mu.Unlock() + if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) { + t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize) + } + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire()) + if err != nil { + t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err) + } + cancel() + ct.sendQuotaPool.add(clientSendQuota) + if clientSendQuota != int(serverConfig.InitialConnWindowSize) { + t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize) + } + ct.mu.Lock() + if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) { + t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize) + } + ct.mu.Unlock() + if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) { + t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize) + } + var sstream *Stream + st.mu.Lock() + for _, v := range st.activeStreams { + sstream = v + } + st.mu.Unlock() + if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) { + t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize) + } +} + +func waitWhileTrue(t *testing.T, condition func() (bool, error)) { + var ( + wait bool + err error + ) + timer := time.NewTimer(time.Second * 5) + for { + wait, err = condition() + if wait { + select { + case <-timer.C: + t.Fatalf(err.Error()) + default: + time.Sleep(50 * time.Millisecond) + continue + } + } + if !timer.Stop() { + <-timer.C + } + break + } +}