Make window size configurable. (#1210)

* Make window size configurable.
This commit is contained in:
MakMukhi
2017-05-05 13:26:56 -07:00
committed by GitHub
parent 07f4e6bf4e
commit f3b5bf53ce
7 changed files with 407 additions and 91 deletions

View File

@ -105,6 +105,22 @@ const defaultClientMaxMsgSize = math.MaxInt32
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func WithInitialWindowSize(s int32) DialOption {
return func(o *dialOptions) {
o.copts.InitialWindowSize = s
}
}
// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func WithInitialConnWindowSize(s int32) DialOption {
return func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
}
}
// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
func WithMaxMsgSize(s int) DialOption {
return func(o *dialOptions) {

View File

@ -121,6 +121,8 @@ type options struct {
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
}
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
@ -128,6 +130,22 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialWindowSize(s int32) ServerOption {
return func(o *options) {
o.initialWindowSize = s
}
}
// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialConnWindowSize(s int32) ServerOption {
return func(o *options) {
o.initialConnWindowSize = s
}
}
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
return func(o *options) {
@ -489,6 +507,8 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
StatsHandler: s.opts.statsHandler,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
InitialConnWindowSize: s.opts.initialConnWindowSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {

View File

@ -445,6 +445,10 @@ type test struct {
streamServerInt grpc.StreamServerInterceptor
unknownHandler grpc.StreamHandler
sc <-chan grpc.ServiceConfig
serverInitialWindowSize int32
serverInitialConnWindowSize int32
clientInitialWindowSize int32
clientInitialConnWindowSize int32
// srv and srvAddr are set once startServer is called.
srv *grpc.Server
@ -512,6 +516,12 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
if te.unknownHandler != nil {
sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler))
}
if te.serverInitialWindowSize > 0 {
sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize))
}
if te.serverInitialConnWindowSize > 0 {
sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize))
}
la := "localhost:0"
switch te.e.network {
case "unix":
@ -605,6 +615,12 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.e.balancer {
opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
}
if te.clientInitialConnWindowSize > 0 {
opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize))
}
var err error
te.cc, err = grpc.Dial(te.srvAddr, opts...)
if err != nil {
@ -3881,3 +3897,90 @@ func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err)
}
}
type windowSizeConfig struct {
serverStream int32
serverConn int32
clientStream int32
clientConn int32
}
func max(a, b int32) int32 {
if a > b {
return a
}
return b
}
func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) {
defer leakCheck(t)()
wc := windowSizeConfig{
serverStream: 8 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
}
}
func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) {
defer leakCheck(t)()
wc := windowSizeConfig{
serverStream: 1,
serverConn: 1,
clientStream: 1,
clientConn: 1,
}
for _, e := range listTestEnv() {
testConfigurableWindowSize(t, e, wc)
}
}
func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
te := newTest(t, e)
te.serverInitialWindowSize = wc.serverStream
te.serverInitialConnWindowSize = wc.serverConn
te.clientInitialWindowSize = wc.clientStream
te.clientInitialConnWindowSize = wc.clientConn
te.startServer(&testServer{security: e.security})
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
stream, err := tc.FullDuplexCall(context.Background())
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
numOfIter := 11
// Set message size to exhaust largest of window sizes.
messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1)
messageSize = max(messageSize, 64*1024)
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize)
if err != nil {
t.Fatal(err)
}
respParams := []*testpb.ResponseParameters{
{
Size: proto.Int32(messageSize),
},
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParams,
Payload: payload,
}
for i := 0; i < numOfIter; i++ {
if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
}
if _, err := stream.Recv(); err != nil {
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
}
}
if err := stream.CloseSend(); err != nil {
t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
}
}

View File

@ -110,6 +110,8 @@ type http2Client struct {
statsHandler stats.Handler
initialWindowSize int32
mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection
activeStreams map[uint32]*Stream
@ -198,6 +200,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if kp.Timeout == 0 {
kp.Timeout = defaultClientKeepaliveTimeout
}
icwz := int32(initialConnWindowSize)
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
}
var buf bytes.Buffer
t := &http2Client{
ctx: ctx,
@ -219,7 +225,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
scheme: scheme,
state: reachable,
@ -230,6 +236,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
streamSendQuota: defaultWindowSize,
kp: kp,
statsHandler: opts.StatsHandler,
initialWindowSize: initialWindowSize,
}
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
}
// Make sure awakenKeepalive can't be written upon.
// keepalive routine will make it writable, if need be.
@ -258,10 +268,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
t.Close()
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
}
if initialWindowSize != defaultWindowSize {
if t.initialWindowSize != defaultWindowSize {
err = t.framer.writeSettings(true, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(initialWindowSize),
Val: uint32(t.initialWindowSize),
})
} else {
err = t.framer.writeSettings(true)
@ -271,7 +281,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
t.Close()
return nil, connectionErrorf(true, err, "transport: %v", err)
@ -294,7 +304,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
method: callHdr.Method,
sendCompress: callHdr.SendCompress,
buf: newRecvBuffer(),
fc: &inFlow{limit: initialWindowSize},
fc: &inFlow{limit: uint32(t.initialWindowSize)},
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
headerChan: make(chan struct{}),
}

View File

@ -113,6 +113,8 @@ type http2Server struct {
// 1 means yes.
resetPingStrikes uint32 // Accessed atomically.
initialWindowSize int32
mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
@ -142,16 +144,24 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
Val: maxStreams,
})
}
if initialWindowSize != defaultWindowSize {
iwz := int32(initialWindowSize)
if config.InitialWindowSize >= defaultWindowSize {
iwz = config.InitialWindowSize
}
icwz := int32(initialConnWindowSize)
if config.InitialConnWindowSize >= defaultWindowSize {
icwz = config.InitialConnWindowSize
}
if iwz != defaultWindowSize {
settings = append(settings, http2.Setting{
ID: http2.SettingInitialWindowSize,
Val: uint32(initialWindowSize)})
Val: uint32(iwz)})
}
if err := framer.writeSettings(true, settings...); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
// Adjust the connection flow control window if needed.
if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 {
if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
return nil, connectionErrorf(true, err, "transport: %v", err)
}
@ -191,7 +201,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
state: reachable,
writableChan: make(chan int, 1),
@ -202,6 +212,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
kp: kp,
idle: time.Now(),
kep: kep,
initialWindowSize: iwz,
}
if t.stats != nil {
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
@ -224,7 +235,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
id: frame.Header().StreamID,
st: t,
buf: buf,
fc: &inFlow{limit: initialWindowSize},
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
var state decodeState

View File

@ -398,6 +398,8 @@ type ServerConfig struct {
StatsHandler stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
}
// NewServerTransport creates a ServerTransport with conn or non-nil error
@ -425,6 +427,10 @@ type ConnectOptions struct {
KeepaliveParams keepalive.ClientParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
// InitialWindowSize sets the intial window size for a stream.
InitialWindowSize int32
// InitialConnWindowSize sets the intial window size for a connection.
InitialConnWindowSize int32
}
// TargetInfo contains the information of the target such as network address and metadata.

View File

@ -1266,3 +1266,153 @@ func TestContextErr(t *testing.T) {
}
}
}
type windowSizeConfig struct {
serverStream int32
serverConn int32
clientStream int32
clientConn int32
}
func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) {
wc := windowSizeConfig{
serverStream: 10 * 1024 * 1024,
serverConn: 12 * 1024 * 1024,
clientStream: 6 * 1024 * 1024,
clientConn: 8 * 1024 * 1024,
}
testAccountCheckWindowSize(t, wc)
}
func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) {
wc := windowSizeConfig{
serverStream: defaultWindowSize,
// Note this is smaller than initialConnWindowSize which is the current default.
serverConn: defaultWindowSize,
clientStream: defaultWindowSize,
clientConn: defaultWindowSize,
}
testAccountCheckWindowSize(t, wc)
}
func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
serverConfig := &ServerConfig{
InitialWindowSize: wc.serverStream,
InitialConnWindowSize: wc.serverConn,
}
connectOptions := ConnectOptions{
InitialWindowSize: wc.clientStream,
InitialConnWindowSize: wc.clientConn,
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions)
defer server.stop()
defer client.Close()
// Wait for server conns to be populated with new server transport.
waitWhileTrue(t, func() (bool, error) {
server.mu.Lock()
defer server.mu.Unlock()
if len(server.conns) == 0 {
return true, fmt.Errorf("timed out waiting for server transport to be created")
}
return false, nil
})
var st *http2Server
server.mu.Lock()
for k := range server.conns {
st = k.(*http2Server)
}
server.mu.Unlock()
ct := client.(*http2Client)
cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Failed to create stream. Err: %v", err)
}
// Wait for server to receive headers.
waitWhileTrue(t, func() (bool, error) {
st.mu.Lock()
defer st.mu.Unlock()
if len(st.activeStreams) == 0 {
return true, fmt.Errorf("timed out waiting for server to receive headers")
}
return false, nil
})
// Sleeping to make sure the settings are applied in case of negative test.
time.Sleep(time.Second)
waitWhileTrue(t, func() (bool, error) {
if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) {
return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize)
}
return false, nil
})
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire())
if err != nil {
t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
}
cancel()
st.sendQuotaPool.add(serverSendQuota)
if serverSendQuota != int(connectOptions.InitialConnWindowSize) {
t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize)
}
st.mu.Lock()
if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) {
t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize)
}
st.mu.Unlock()
if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) {
t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize)
}
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire())
if err != nil {
t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
}
cancel()
ct.sendQuotaPool.add(clientSendQuota)
if clientSendQuota != int(serverConfig.InitialConnWindowSize) {
t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize)
}
ct.mu.Lock()
if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) {
t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize)
}
ct.mu.Unlock()
if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) {
t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize)
}
var sstream *Stream
st.mu.Lock()
for _, v := range st.activeStreams {
sstream = v
}
st.mu.Unlock()
if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) {
t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize)
}
}
func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
var (
wait bool
err error
)
timer := time.NewTimer(time.Second * 5)
for {
wait, err = condition()
if wait {
select {
case <-timer.C:
t.Fatalf(err.Error())
default:
time.Sleep(50 * time.Millisecond)
continue
}
}
if !timer.Stop() {
<-timer.C
}
break
}
}