From 6f3b6ff46b28cdc2e871df2e1176e48d4204bd33 Mon Sep 17 00:00:00 2001 From: lyuxuan Date: Thu, 19 Oct 2017 12:09:19 -0700 Subject: [PATCH] Parse ServiceConfig JSON string (#1515) --- clientconn.go | 18 + resolver_conn_wrapper.go | 1 + rpc_util.go | 59 --- service_config.go | 185 +++++++++ service_config_test.go | 323 +++++++++++++++ test/end2end_test.go | 855 ++++++++++++++++++++++++++++++++------- 6 files changed, 1228 insertions(+), 213 deletions(-) create mode 100644 service_config.go create mode 100644 service_config_test.go diff --git a/clientconn.go b/clientconn.go index 83563e94..5eec8e66 100644 --- a/clientconn.go +++ b/clientconn.go @@ -191,6 +191,8 @@ func WithBalancerBuilder(b balancer.Builder) DialOption { } // WithServiceConfig returns a DialOption which has a channel to read the service configuration. +// DEPRECATED: service config should be received through name resolver, as specified here. +// https://github.com/grpc/grpc/blob/master/doc/service_config.md func WithServiceConfig(c <-chan ServiceConfig) DialOption { return func(o *dialOptions) { o.scChan = c @@ -550,6 +552,7 @@ type ClientConn struct { mu sync.RWMutex sc ServiceConfig + scRaw string conns map[*addrConn]struct{} // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters @@ -591,6 +594,7 @@ func (cc *ClientConn) scWatcher() { // TODO: load balance policy runtime change is ignored. // We may revist this decision in the future. cc.sc = sc + cc.scRaw = "" cc.mu.Unlock() case <-cc.ctx.Done(): return @@ -800,6 +804,20 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transpor return t, done, nil } +// handleServiceConfig parses the service config string in JSON format to Go native +// struct ServiceConfig, and store both the struct and the JSON string in ClientConn. +func (cc *ClientConn) handleServiceConfig(js string) error { + sc, err := parseServiceConfig(js) + if err != nil { + return err + } + cc.mu.Lock() + cc.scRaw = js + cc.sc = sc + cc.mu.Unlock() + return nil +} + // Close tears down the ClientConn and all underlying connections. func (cc *ClientConn) Close() error { cc.cancel() diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 2f617148..7bb843ef 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -109,6 +109,7 @@ func (ccr *ccResolverWrapper) watcher() { default: } grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) + ccr.cc.handleServiceConfig(sc) case <-ccr.done: return } diff --git a/rpc_util.go b/rpc_util.go index 1703661e..d1f42fa4 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -492,65 +492,6 @@ func Errorf(c codes.Code, format string, a ...interface{}) error { return status.Errorf(c, format, a...) } -// MethodConfig defines the configuration recommended by the service providers for a -// particular method. -// This is EXPERIMENTAL and subject to change. -type MethodConfig struct { - // WaitForReady indicates whether RPCs sent to this method should wait until - // the connection is ready by default (!failfast). The value specified via the - // gRPC client API will override the value set here. - WaitForReady *bool - // Timeout is the default timeout for RPCs sent to this method. The actual - // deadline used will be the minimum of the value specified here and the value - // set by the application via the gRPC client API. If either one is not set, - // then the other will be used. If neither is set, then the RPC has no deadline. - Timeout *time.Duration - // MaxReqSize is the maximum allowed payload size for an individual request in a - // stream (client->server) in bytes. The size which is measured is the serialized - // payload after per-message compression (but before stream compression) in bytes. - // The actual value used is the minimum of the value specified here and the value set - // by the application via the gRPC client API. If either one is not set, then the other - // will be used. If neither is set, then the built-in default is used. - MaxReqSize *int - // MaxRespSize is the maximum allowed payload size for an individual response in a - // stream (server->client) in bytes. - MaxRespSize *int -} - -// ServiceConfig is provided by the service provider and contains parameters for how -// clients that connect to the service should behave. -// This is EXPERIMENTAL and subject to change. -type ServiceConfig struct { - // LB is the load balancer the service providers recommends. The balancer specified - // via grpc.WithBalancer will override this. - LB Balancer - // Methods contains a map for the methods in this service. - // If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig. - // If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists. - // Otherwise, the method has no MethodConfig to use. - Methods map[string]MethodConfig -} - -func min(a, b *int) *int { - if *a < *b { - return a - } - return b -} - -func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { - if mcMax == nil && doptMax == nil { - return &defaultVal - } - if mcMax != nil && doptMax != nil { - return min(mcMax, doptMax) - } - if mcMax != nil { - return mcMax - } - return doptMax -} - // SupportPackageIsVersion3 is referenced from generated protocol buffer files. // The latest support package version is 4. // SupportPackageIsVersion3 is kept for compatibility. It will be removed in the diff --git a/service_config.go b/service_config.go new file mode 100644 index 00000000..0631e7d2 --- /dev/null +++ b/service_config.go @@ -0,0 +1,185 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "encoding/json" + "time" + + "google.golang.org/grpc/grpclog" +) + +// MethodConfig defines the configuration recommended by the service providers for a +// particular method. +// DEPRECATED: Users should not use this struct. Service config should be received +// through name resolver, as specified here +// https://github.com/grpc/grpc/blob/master/doc/service_config.md +type MethodConfig struct { + // WaitForReady indicates whether RPCs sent to this method should wait until + // the connection is ready by default (!failfast). The value specified via the + // gRPC client API will override the value set here. + WaitForReady *bool + // Timeout is the default timeout for RPCs sent to this method. The actual + // deadline used will be the minimum of the value specified here and the value + // set by the application via the gRPC client API. If either one is not set, + // then the other will be used. If neither is set, then the RPC has no deadline. + Timeout *time.Duration + // MaxReqSize is the maximum allowed payload size for an individual request in a + // stream (client->server) in bytes. The size which is measured is the serialized + // payload after per-message compression (but before stream compression) in bytes. + // The actual value used is the minimum of the value specified here and the value set + // by the application via the gRPC client API. If either one is not set, then the other + // will be used. If neither is set, then the built-in default is used. + MaxReqSize *int + // MaxRespSize is the maximum allowed payload size for an individual response in a + // stream (server->client) in bytes. + MaxRespSize *int +} + +// ServiceConfig is provided by the service provider and contains parameters for how +// clients that connect to the service should behave. +// DEPRECATED: Users should not use this struct. Service config should be received +// through name resolver, as specified here +// https://github.com/grpc/grpc/blob/master/doc/service_config.md +type ServiceConfig struct { + // LB is the load balancer the service providers recommends. The balancer specified + // via grpc.WithBalancer will override this. + LB *string + // Methods contains a map for the methods in this service. + // If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig. + // If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists. + // Otherwise, the method has no MethodConfig to use. + Methods map[string]MethodConfig +} + +func parseTimeout(t *string) (*time.Duration, error) { + if t == nil { + return nil, nil + } + d, err := time.ParseDuration(*t) + return &d, err +} + +type jsonName struct { + Service *string + Method *string +} + +func (j jsonName) generatePath() (string, bool) { + if j.Service == nil { + return "", false + } + res := "/" + *j.Service + "/" + if j.Method != nil { + res += *j.Method + } + return res, true +} + +// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. +type jsonMC struct { + Name *[]jsonName + WaitForReady *bool + Timeout *string + MaxRequestMessageBytes *int + MaxResponseMessageBytes *int +} + +// TODO(lyuxuan): delete this struct after cleaning up old service config implementation. +type jsonSC struct { + LoadBalancingPolicy *string + MethodConfig *[]jsonMC +} + +func parseServiceConfig(js string) (ServiceConfig, error) { + var rsc jsonSC + err := json.Unmarshal([]byte(js), &rsc) + if err != nil { + grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) + return ServiceConfig{}, err + } + sc := ServiceConfig{ + LB: rsc.LoadBalancingPolicy, + Methods: make(map[string]MethodConfig), + } + if rsc.MethodConfig == nil { + return sc, nil + } + + for _, m := range *rsc.MethodConfig { + if m.Name == nil { + continue + } + d, err := parseTimeout(m.Timeout) + if err != nil { + grpclog.Warningf("grpc: parseServiceConfig error unmarshaling %s due to %v", js, err) + return ServiceConfig{}, err + } + + mc := MethodConfig{ + WaitForReady: m.WaitForReady, + Timeout: d, + MaxReqSize: m.MaxRequestMessageBytes, + MaxRespSize: m.MaxResponseMessageBytes, + } + for _, n := range *m.Name { + if path, valid := n.generatePath(); valid { + sc.Methods[path] = mc + } + } + } + + return sc, nil +} + +func min(a, b *int) *int { + if *a < *b { + return a + } + return b +} + +func getMaxSize(mcMax, doptMax *int, defaultVal int) *int { + if mcMax == nil && doptMax == nil { + return &defaultVal + } + if mcMax != nil && doptMax != nil { + return min(mcMax, doptMax) + } + if mcMax != nil { + return mcMax + } + return doptMax +} + +func newBool(b bool) *bool { + return &b +} + +func newInt(b int) *int { + return &b +} + +func newDuration(b time.Duration) *time.Duration { + return &b +} + +func newString(b string) *string { + return &b +} diff --git a/service_config_test.go b/service_config_test.go new file mode 100644 index 00000000..737cc2c4 --- /dev/null +++ b/service_config_test.go @@ -0,0 +1,323 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "reflect" + "testing" + "time" +) + +func TestParseLoadBalancer(t *testing.T) { + testcases := []struct { + scjs string + wantSC ServiceConfig + wantErr bool + }{ + { + `{ + "loadBalancingPolicy": "round_robin", + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "waitForReady": true + } + ] +}`, + ServiceConfig{ + LB: newString("round_robin"), + Methods: map[string]MethodConfig{ + "/foo/Bar": { + WaitForReady: newBool(true), + }, + }, + }, + false, + }, + { + `{ + "loadBalancingPolicy": 1, + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "waitForReady": false + } + ] +}`, + ServiceConfig{}, + true, + }, + } + + for _, c := range testcases { + sc, err := parseServiceConfig(c.scjs) + if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) + } + } +} + +func TestPraseWaitForReady(t *testing.T) { + testcases := []struct { + scjs string + wantSC ServiceConfig + wantErr bool + }{ + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "waitForReady": true + } + ] +}`, + ServiceConfig{ + Methods: map[string]MethodConfig{ + "/foo/Bar": { + WaitForReady: newBool(true), + }, + }, + }, + false, + }, + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "waitForReady": false + } + ] +}`, + ServiceConfig{ + Methods: map[string]MethodConfig{ + "/foo/Bar": { + WaitForReady: newBool(false), + }, + }, + }, + false, + }, + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "waitForReady": fall + }, + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "waitForReady": true + } + ] +}`, + ServiceConfig{}, + true, + }, + } + + for _, c := range testcases { + sc, err := parseServiceConfig(c.scjs) + if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) + } + } +} + +func TestPraseTimeOut(t *testing.T) { + testcases := []struct { + scjs string + wantSC ServiceConfig + wantErr bool + }{ + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "timeout": "1s" + } + ] +}`, + ServiceConfig{ + Methods: map[string]MethodConfig{ + "/foo/Bar": { + Timeout: newDuration(time.Second), + }, + }, + }, + false, + }, + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "timeout": "3c" + } + ] +}`, + ServiceConfig{}, + true, + }, + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "timeout": "3c" + }, + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "timeout": "1s" + } + ] +}`, + ServiceConfig{}, + true, + }, + } + + for _, c := range testcases { + sc, err := parseServiceConfig(c.scjs) + if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) + } + } +} + +func TestPraseMsgSize(t *testing.T) { + testcases := []struct { + scjs string + wantSC ServiceConfig + wantErr bool + }{ + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "maxRequestMessageBytes": 1024, + "maxResponseMessageBytes": 2048 + } + ] +}`, + ServiceConfig{ + Methods: map[string]MethodConfig{ + "/foo/Bar": { + MaxReqSize: newInt(1024), + MaxRespSize: newInt(2048), + }, + }, + }, + false, + }, + { + `{ + "methodConfig": [ + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "maxRequestMessageBytes": "1024", + "maxResponseMessageBytes": "2048" + }, + { + "name": [ + { + "service": "foo", + "method": "Bar" + } + ], + "maxRequestMessageBytes": 1024, + "maxResponseMessageBytes": 2048 + } + ] +}`, + ServiceConfig{}, + true, + }, + } + + for _, c := range testcases { + sc, err := parseServiceConfig(c.scjs) + if c.wantErr != (err != nil) || !reflect.DeepEqual(sc, c.wantSC) { + t.Fatalf("parseServiceConfig(%s) = %+v, %v, want %+v, %v", c.scjs, sc, err, c.wantSC, c.wantErr) + } + } +} diff --git a/test/end2end_test.go b/test/end2end_test.go index 49ef85e2..6f0c0fa1 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -57,6 +57,8 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" _ "google.golang.org/grpc/resolver/passthrough" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" @@ -459,6 +461,7 @@ type test struct { clientInitialWindowSize int32 clientInitialConnWindowSize int32 perRPCCreds credentials.PerRPCCredentials + resolverScheme string // All test dialing is blocking by default. Set this to true if dial // should be non-blocking. @@ -638,7 +641,12 @@ func (te *test) clientConn() *grpc.ClientConn { opts = append(opts, grpc.WithInsecure()) } // TODO(bar) switch balancer case "pickfirst". - scheme := "passthrough:///" + var scheme string + if te.resolverScheme == "" { + scheme = "passthrough:///" + } else { + scheme = te.resolverScheme + ":///" + } switch te.e.balancer { case "v1": opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) @@ -1192,11 +1200,8 @@ func testFailFast(t *testing.T, e env) { awaitNewConnLogOutput() } -func testServiceConfigSetup(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { +func testServiceConfigSetup(t *testing.T, e env) *test { te := newTest(t, e) - // We write before read. - ch := make(chan grpc.ServiceConfig, 1) - te.sc = ch te.userAgent = testAppUA te.declareLogNoise( "transport: http2Client.notifyError got notified that the client transport was broken EOF", @@ -1204,7 +1209,7 @@ func testServiceConfigSetup(t *testing.T, e env) (*test, chan grpc.ServiceConfig "grpc: addrConn.resetTransport failed to create client transport: connection error", "Failed to dial : context canceled; please retry.", ) - return te, ch + return te } func newBool(b bool) (a *bool) { @@ -1221,85 +1226,132 @@ func newDuration(b time.Duration) (a *time.Duration) { return } -func TestServiceConfigGetMethodConfig(t *testing.T) { - defer leakcheck.Check(t) - for _, e := range listTestEnv() { - testGetMethodConfig(t, e) - } -} - -func testGetMethodConfig(t *testing.T, e env) { - te, ch := testServiceConfigSetup(t, e) +func TestGetMethodConfig(t *testing.T) { + te := testServiceConfigSetup(t, tcpClearRREnv) defer te.tearDown() + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() - mc1 := grpc.MethodConfig{ - WaitForReady: newBool(true), - Timeout: newDuration(time.Millisecond), - } - mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc1 - m["/grpc.testing.TestService/"] = mc2 - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - + te.resolverScheme = r.Scheme() cc := te.clientConn() + r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "EmptyCall" + } + ], + "waitForReady": true, + "timeout": "1ms" + }, + { + "name": [ + { + "service": "grpc.testing.TestService" + } + ], + "waitForReady": false + } + ] +}`) + tc := testpb.NewTestServiceClient(cc) + + // Make sure service config has been processed by grpc. + for { + if cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall").WaitForReady != nil { + break + } + time.Sleep(time.Millisecond) + } + // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + var err error + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/UnaryCall"] = mc1 - m["/grpc.testing.TestService/"] = mc2 - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - // Wait for the new service config to propagate. + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "UnaryCall" + } + ], + "waitForReady": true, + "timeout": "1ms" + }, + { + "name": [ + { + "service": "grpc.testing.TestService" + } + ], + "waitForReady": false + } + ] +}`) + + // Make sure service config has been processed by grpc. for { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded { - continue + if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && !*mc.WaitForReady { + break } - break + time.Sleep(time.Millisecond) } // The following RPCs are expected to become fail-fast. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) } } func TestServiceConfigWaitForReady(t *testing.T) { - defer leakcheck.Check(t) - for _, e := range listTestEnv() { - testServiceConfigWaitForReady(t, e) - } -} - -func testServiceConfigWaitForReady(t *testing.T, e env) { - te, ch := testServiceConfigSetup(t, e) + te := testServiceConfigSetup(t, tcpClearRREnv) defer te.tearDown() + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. - mc := grpc.MethodConfig{ - WaitForReady: newBool(false), - Timeout: newDuration(time.Millisecond), - } - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - + te.resolverScheme = r.Scheme() cc := te.clientConn() + r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "EmptyCall" + }, + { + "service": "grpc.testing.TestService", + "method": "FullDuplexCall" + } + ], + "waitForReady": false, + "timeout": "1ms" + } + ] +}`) + tc := testpb.NewTestServiceClient(cc) + + // Make sure service config has been processed by grpc. + for { + if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").WaitForReady != nil { + break + } + time.Sleep(time.Millisecond) + } + // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + var err error + if _, err = tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { @@ -1307,25 +1359,32 @@ func testServiceConfigWaitForReady(t *testing.T, e env) { } // Generate a service config update. - // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. - mc.WaitForReady = newBool(true) - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc + // Case2:Client API set failfast to be false, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "EmptyCall" + }, + { + "service": "grpc.testing.TestService", + "method": "FullDuplexCall" + } + ], + "waitForReady": true, + "timeout": "1ms" + } + ] +}`) // Wait for the new service config to take effect. - mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") for { - if !*mc.WaitForReady { - time.Sleep(100 * time.Millisecond) - mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") - continue + if mc := cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall"); mc.WaitForReady != nil && *mc.WaitForReady { + break } - break + time.Sleep(time.Millisecond) } // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { @@ -1337,85 +1396,105 @@ func testServiceConfigWaitForReady(t *testing.T, e env) { } func TestServiceConfigTimeout(t *testing.T) { - defer leakcheck.Check(t) - for _, e := range listTestEnv() { - testServiceConfigTimeout(t, e) - } -} - -func testServiceConfigTimeout(t *testing.T, e env) { - te, ch := testServiceConfigSetup(t, e) + te := testServiceConfigSetup(t, tcpClearRREnv) defer te.tearDown() + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. - mc := grpc.MethodConfig{ - Timeout: newDuration(time.Hour), - } - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } - ch <- sc - + te.resolverScheme = r.Scheme() cc := te.clientConn() + r.NewAddress([]resolver.Address{{Addr: te.srvAddr}}) + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "EmptyCall" + }, + { + "service": "grpc.testing.TestService", + "method": "FullDuplexCall" + } + ], + "waitForReady": true, + "timeout": "1h" + } + ] +}`) + tc := testpb.NewTestServiceClient(cc) + + // Make sure service config has been processed by grpc. + for { + if cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").Timeout != nil { + break + } + time.Sleep(time.Millisecond) + } + // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. + var err error ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() + ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) - if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() // Generate a service config update. // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. - mc.Timeout = newDuration(time.Nanosecond) - m = make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/EmptyCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc = grpc.ServiceConfig{ - Methods: m, - } - ch <- sc + r.NewServiceConfig(`{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "EmptyCall" + }, + { + "service": "grpc.testing.TestService", + "method": "FullDuplexCall" + } + ], + "waitForReady": true, + "timeout": "1ns" + } + ] +}`) // Wait for the new service config to take effect. - mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") for { - if *mc.Timeout != time.Nanosecond { - time.Sleep(100 * time.Millisecond) - mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") - continue + if mc := cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall"); mc.Timeout != nil && *mc.Timeout == time.Nanosecond { + break } - break + time.Sleep(time.Millisecond) } ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) } cancel() ctx, cancel = context.WithTimeout(context.Background(), time.Hour) - if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + if _, err = tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) } cancel() } func TestServiceConfigMaxMsgSize(t *testing.T) { - defer leakcheck.Check(t) - for _, e := range listTestEnv() { - testServiceConfigMaxMsgSize(t, e) - } -} + e := tcpClearRREnv + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() -func testServiceConfigMaxMsgSize(t *testing.T, e env) { // Setting up values and objects shared across all test cases. const smallSize = 1 const largeSize = 1024 @@ -1434,32 +1513,53 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { t.Fatal(err) } - mc := grpc.MethodConfig{ - MaxReqSize: newInt(extraLargeSize), - MaxRespSize: newInt(extraLargeSize), - } + scjs := `{ + "methodConfig": [ + { + "name": [ + { + "service": "grpc.testing.TestService", + "method": "UnaryCall" + }, + { + "service": "grpc.testing.TestService", + "method": "FullDuplexCall" + } + ], + "maxRequestMessageBytes": 2048, + "maxResponseMessageBytes": 2048 + } + ] +}` - m := make(map[string]grpc.MethodConfig) - m["/grpc.testing.TestService/UnaryCall"] = mc - m["/grpc.testing.TestService/FullDuplexCall"] = mc - sc := grpc.ServiceConfig{ - Methods: m, - } // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te1, ch1 := testServiceConfigSetup(t, e) - te1.startServer(&testServer{security: e.security}) + te1 := testServiceConfigSetup(t, e) defer te1.tearDown() - ch1 <- sc - tc := testpb.NewTestServiceClient(te1.clientConn()) + te1.resolverScheme = r.Scheme() + te1.nonBlockingDial = true + te1.startServer(&testServer{security: e.security}) + cc1 := te1.clientConn() + + r.NewAddress([]resolver.Address{{Addr: te1.srvAddr}}) + r.NewServiceConfig(scjs) + tc := testpb.NewTestServiceClient(cc1) req := &testpb.SimpleRequest{ ResponseType: testpb.PayloadType_COMPRESSABLE, ResponseSize: int32(extraLargeSize), Payload: smallPayload, } + + for { + if cc1.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { + break + } + time.Sleep(time.Millisecond) + } + // Test for unary RPC recv. - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1485,10 +1585,10 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err != nil { + if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -1499,24 +1599,36 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te2, ch2 := testServiceConfigSetup(t, e) + te2 := testServiceConfigSetup(t, e) + te2.resolverScheme = r.Scheme() + te2.nonBlockingDial = true te2.maxClientReceiveMsgSize = newInt(1024) te2.maxClientSendMsgSize = newInt(1024) + te2.startServer(&testServer{security: e.security}) defer te2.tearDown() - ch2 <- sc - tc = testpb.NewTestServiceClient(te2.clientConn()) + cc2 := te2.clientConn() + r.NewAddress([]resolver.Address{{Addr: te2.srvAddr}}) + r.NewServiceConfig(scjs) + tc = testpb.NewTestServiceClient(cc2) + + for { + if cc2.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { + break + } + time.Sleep(time.Millisecond) + } // Test for unary RPC recv. req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1534,10 +1646,10 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err != nil { + if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -1548,24 +1660,37 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } - if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if err = stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) } // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). - te3, ch3 := testServiceConfigSetup(t, e) + te3 := testServiceConfigSetup(t, e) + te3.resolverScheme = r.Scheme() + te3.nonBlockingDial = true te3.maxClientReceiveMsgSize = newInt(4096) te3.maxClientSendMsgSize = newInt(4096) + te3.startServer(&testServer{security: e.security}) defer te3.tearDown() - ch3 <- sc - tc = testpb.NewTestServiceClient(te3.clientConn()) + + cc3 := te3.clientConn() + r.NewAddress([]resolver.Address{{Addr: te3.srvAddr}}) + r.NewServiceConfig(scjs) + tc = testpb.NewTestServiceClient(cc3) + + for { + if cc3.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall").MaxReqSize != nil { + break + } + time.Sleep(time.Millisecond) + } // Test for unary RPC recv. req.Payload = smallPayload req.ResponseSize = int32(largeSize) - if _, err := tc.UnaryCall(context.Background(), req); err != nil { + if _, err = tc.UnaryCall(context.Background(), req, grpc.FailFast(false)); err != nil { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) } @@ -1582,7 +1707,7 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { } req.Payload = extraLargePayload - if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) } @@ -1594,19 +1719,19 @@ func testServiceConfigMaxMsgSize(t *testing.T, e env) { respParam[0].Size = int32(largeSize) sreq.Payload = smallPayload - if err := stream.Send(sreq); err != nil { + if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err != nil { + if _, err = stream.Recv(); err != nil { t.Fatalf("%v.Recv() = _, %v, want ", stream, err) } respParam[0].Size = int32(extraLargeSize) - if err := stream.Send(sreq); err != nil { + if err = stream.Send(sreq); err != nil { t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) } - if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + if _, err = stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } @@ -4988,3 +5113,425 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) { t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) } } + +// The following functions with function name ending with TD indicates that they +// should be deleted after old service config API is deprecated and deleted. +func testServiceConfigSetupTD(t *testing.T, e env) (*test, chan grpc.ServiceConfig) { + te := newTest(t, e) + // We write before read. + ch := make(chan grpc.ServiceConfig, 1) + te.sc = ch + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing", + "grpc: addrConn.resetTransport failed to create client transport: connection error", + "Failed to dial : context canceled; please retry.", + ) + return te, ch +} + +func TestServiceConfigGetMethodConfigTD(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testGetMethodConfigTD(t, e) + } +} + +func testGetMethodConfigTD(t *testing.T, e env) { + te, ch := testServiceConfigSetupTD(t, e) + defer te.tearDown() + + mc1 := grpc.MethodConfig{ + WaitForReady: newBool(true), + Timeout: newDuration(time.Millisecond), + } + mc2 := grpc.MethodConfig{WaitForReady: newBool(false)} + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc1 + m["/grpc.testing.TestService/"] = mc2 + sc := grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + + m = make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/UnaryCall"] = mc1 + m["/grpc.testing.TestService/"] = mc2 + sc = grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + // Wait for the new service config to propagate. + for { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.DeadlineExceeded { + continue + } + break + } + // The following RPCs are expected to become fail-fast. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.Unavailable) + } +} + +func TestServiceConfigWaitForReadyTD(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testServiceConfigWaitForReadyTD(t, e) + } +} + +func testServiceConfigWaitForReadyTD(t *testing.T, e env) { + te, ch := testServiceConfigSetupTD(t, e) + defer te.tearDown() + + // Case1: Client API set failfast to be false, and service config set wait_for_ready to be false, Client API should win, and the rpc will wait until deadline exceeds. + mc := grpc.MethodConfig{ + WaitForReady: newBool(false), + Timeout: newDuration(time.Millisecond), + } + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc := grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + if _, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } + + // Generate a service config update. + // Case2: Client API does not set failfast, and service config set wait_for_ready to be true, and the rpc will wait until deadline exceeds. + mc.WaitForReady = newBool(true) + m = make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc = grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + // Wait for the new service config to take effect. + mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") + for { + if !*mc.WaitForReady { + time.Sleep(100 * time.Millisecond) + mc = cc.GetMethodConfig("/grpc.testing.TestService/EmptyCall") + continue + } + break + } + // The following RPCs are expected to become non-fail-fast ones with 1ms deadline. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + if _, err := tc.FullDuplexCall(context.Background()); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } +} + +func TestServiceConfigTimeoutTD(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testServiceConfigTimeoutTD(t, e) + } +} + +func testServiceConfigTimeoutTD(t *testing.T, e env) { + te, ch := testServiceConfigSetupTD(t, e) + defer te.tearDown() + + // Case1: Client API sets timeout to be 1ns and ServiceConfig sets timeout to be 1hr. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. + mc := grpc.MethodConfig{ + Timeout: newDuration(time.Hour), + } + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc := grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + // The following RPCs are expected to become non-fail-fast ones with 1ns deadline. + ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + cancel() + ctx, cancel = context.WithTimeout(context.Background(), time.Nanosecond) + if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } + cancel() + + // Generate a service config update. + // Case2: Client API sets timeout to be 1hr and ServiceConfig sets timeout to be 1ns. Timeout should be 1ns (min of 1ns and 1hr) and the rpc will wait until deadline exceeds. + mc.Timeout = newDuration(time.Nanosecond) + m = make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/EmptyCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc = grpc.ServiceConfig{ + Methods: m, + } + ch <- sc + + // Wait for the new service config to take effect. + mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") + for { + if *mc.Timeout != time.Nanosecond { + time.Sleep(100 * time.Millisecond) + mc = cc.GetMethodConfig("/grpc.testing.TestService/FullDuplexCall") + continue + } + break + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Hour) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded) + } + cancel() + + ctx, cancel = context.WithTimeout(context.Background(), time.Hour) + if _, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/FullDuplexCall(_) = _, %v, want %s", err, codes.DeadlineExceeded) + } + cancel() +} + +func TestServiceConfigMaxMsgSizeTD(t *testing.T) { + defer leakcheck.Check(t) + for _, e := range listTestEnv() { + testServiceConfigMaxMsgSizeTD(t, e) + } +} + +func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) { + // Setting up values and objects shared across all test cases. + const smallSize = 1 + const largeSize = 1024 + const extraLargeSize = 2048 + + smallPayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, smallSize) + if err != nil { + t.Fatal(err) + } + largePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, largeSize) + if err != nil { + t.Fatal(err) + } + extraLargePayload, err := newPayload(testpb.PayloadType_COMPRESSABLE, extraLargeSize) + if err != nil { + t.Fatal(err) + } + + mc := grpc.MethodConfig{ + MaxReqSize: newInt(extraLargeSize), + MaxRespSize: newInt(extraLargeSize), + } + + m := make(map[string]grpc.MethodConfig) + m["/grpc.testing.TestService/UnaryCall"] = mc + m["/grpc.testing.TestService/FullDuplexCall"] = mc + sc := grpc.ServiceConfig{ + Methods: m, + } + // Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv). + te1, ch1 := testServiceConfigSetupTD(t, e) + te1.startServer(&testServer{security: e.security}) + defer te1.tearDown() + + ch1 <- sc + tc := testpb.NewTestServiceClient(te1.clientConn()) + + req := &testpb.SimpleRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseSize: int32(extraLargeSize), + Payload: smallPayload, + } + // Test for unary RPC recv. + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) + } + + // Test for unary RPC send. + req.Payload = extraLargePayload + req.ResponseSize = int32(smallSize) + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) + } + + // Test for streaming RPC recv. + respParam := []*testpb.ResponseParameters{ + { + Size: int32(extraLargeSize), + }, + } + sreq := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseParameters: respParam, + Payload: smallPayload, + } + stream, err := tc.FullDuplexCall(te1.ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) + } + + // Test for streaming RPC send. + respParam[0].Size = int32(smallSize) + sreq.Payload = extraLargePayload + stream, err = tc.FullDuplexCall(te1.ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) + } + + // Case2: Client API set maxReqSize to 1024 (send), maxRespSize to 1024 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). + te2, ch2 := testServiceConfigSetupTD(t, e) + te2.maxClientReceiveMsgSize = newInt(1024) + te2.maxClientSendMsgSize = newInt(1024) + te2.startServer(&testServer{security: e.security}) + defer te2.tearDown() + ch2 <- sc + tc = testpb.NewTestServiceClient(te2.clientConn()) + + // Test for unary RPC recv. + req.Payload = smallPayload + req.ResponseSize = int32(largeSize) + + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) + } + + // Test for unary RPC send. + req.Payload = largePayload + req.ResponseSize = int32(smallSize) + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) + } + + // Test for streaming RPC recv. + stream, err = tc.FullDuplexCall(te2.ctx) + respParam[0].Size = int32(largeSize) + sreq.Payload = smallPayload + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) + } + + // Test for streaming RPC send. + respParam[0].Size = int32(smallSize) + sreq.Payload = largePayload + stream, err = tc.FullDuplexCall(te2.ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) + } + + // Case3: Client API set maxReqSize to 4096 (send), maxRespSize to 4096 (recv). Sc sets maxReqSize to 2048 (send), maxRespSize to 2048 (recv). + te3, ch3 := testServiceConfigSetupTD(t, e) + te3.maxClientReceiveMsgSize = newInt(4096) + te3.maxClientSendMsgSize = newInt(4096) + te3.startServer(&testServer{security: e.security}) + defer te3.tearDown() + ch3 <- sc + tc = testpb.NewTestServiceClient(te3.clientConn()) + + // Test for unary RPC recv. + req.Payload = smallPayload + req.ResponseSize = int32(largeSize) + + if _, err := tc.UnaryCall(context.Background(), req); err != nil { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) + } + + req.ResponseSize = int32(extraLargeSize) + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) + } + + // Test for unary RPC send. + req.Payload = largePayload + req.ResponseSize = int32(smallSize) + if _, err := tc.UnaryCall(context.Background(), req); err != nil { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want ", err) + } + + req.Payload = extraLargePayload + if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.ResourceExhausted) + } + + // Test for streaming RPC recv. + stream, err = tc.FullDuplexCall(te3.ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + respParam[0].Size = int32(largeSize) + sreq.Payload = smallPayload + + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = _, %v, want ", stream, err) + } + + respParam[0].Size = int32(extraLargeSize) + + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.ResourceExhausted) + } + + // Test for streaming RPC send. + respParam[0].Size = int32(smallSize) + sreq.Payload = largePayload + stream, err = tc.FullDuplexCall(te3.ctx) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + if err := stream.Send(sreq); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, sreq, err) + } + sreq.Payload = extraLargePayload + if err := stream.Send(sreq); err == nil || grpc.Code(err) != codes.ResourceExhausted { + t.Fatalf("%v.Send(%v) = %v, want _, error code: %s", stream, sreq, err, codes.ResourceExhausted) + } +}