don't apply defaultSC upon receipt of invalid service config (#5257)
This commit is contained in:

committed by
GitHub

parent
51ddcbad16
commit
597e5d1b1a
@ -653,16 +653,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
|
||||
} else {
|
||||
ret = balancer.ErrBadResolverState
|
||||
if cc.sc == nil {
|
||||
if cc.dopts.defaultServiceConfig != nil {
|
||||
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, s.Addresses)
|
||||
} else {
|
||||
// Apply the failing LB only if we haven't received valid service config
|
||||
// from the name resolver in the past.
|
||||
cc.applyFailingLB(s.ServiceConfig)
|
||||
cc.mu.Unlock()
|
||||
return ret
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var balCfg serviceconfig.LoadBalancingConfig
|
||||
if cc.sc != nil && cc.sc.lbConfig != nil {
|
||||
|
@ -1,94 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2017 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal/balancer/stub"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const happyBalancerName = "happy balancer"
|
||||
|
||||
func init() {
|
||||
// Register a balancer that never returns an error from
|
||||
// UpdateClientConnState, and doesn't do anything else either.
|
||||
bf := stub.BalancerFuncs{
|
||||
UpdateClientConnState: func(*stub.BalancerData, balancer.ClientConnState) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
stub.Register(happyBalancerName, bf)
|
||||
}
|
||||
|
||||
// TestResolverErrorInBuild makes the resolver.Builder call into the ClientConn
|
||||
// during the Build call. We use two separate mutexes in the code which make
|
||||
// sure there is no data race in this code path, and also that there is no
|
||||
// deadlock.
|
||||
func (s) TestResolverErrorInBuild(t *testing.T) {
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Err: errors.New("resolver build err")}})
|
||||
|
||||
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
var dummy int
|
||||
const wantMsg = "error parsing service config"
|
||||
const wantCode = codes.Unavailable
|
||||
if err := cc.Invoke(ctx, "/foo/bar", &dummy, &dummy); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
|
||||
t.Fatalf("cc.Invoke(_, _, _, _) = %v; want status.Code()==%v, status.Message() contains %q", err, wantCode, wantMsg)
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestServiceConfigErrorRPC(t *testing.T) {
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
|
||||
cc, err := Dial(r.Scheme()+":///test.server", WithTransportCredentials(insecure.NewCredentials()), WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
badsc := r.CC.ParseServiceConfig("bad config")
|
||||
r.UpdateState(resolver.State{ServiceConfig: badsc})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
var dummy int
|
||||
const wantMsg = "error parsing service config"
|
||||
const wantCode = codes.Unavailable
|
||||
if err := cc.Invoke(ctx, "/foo/bar", &dummy, &dummy); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
|
||||
t.Fatalf("cc.Invoke(_, _, _, _) = %v; want status.Code()==%v, status.Message() contains %q", err, wantCode, wantMsg)
|
||||
}
|
||||
}
|
@ -381,6 +381,9 @@ func init() {
|
||||
//
|
||||
// If any of them is NOT *ServiceConfig, return false.
|
||||
func equalServiceConfig(a, b serviceconfig.Config) bool {
|
||||
if a == nil && b == nil {
|
||||
return true
|
||||
}
|
||||
aa, ok := a.(*ServiceConfig)
|
||||
if !ok {
|
||||
return false
|
||||
|
@ -211,5 +211,4 @@ func (s) TestConfigSelector(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
280
test/resolver_update_test.go
Normal file
280
test/resolver_update_test.go
Normal file
@ -0,0 +1,280 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/balancer"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
"google.golang.org/grpc/status"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
)
|
||||
|
||||
// TestResolverUpdateDuringBuild_ServiceConfigParseError makes the
|
||||
// resolver.Builder call into the ClientConn, during the Build call, with a
|
||||
// service config parsing error.
|
||||
//
|
||||
// We use two separate mutexes in the code which make sure there is no data race
|
||||
// in this code path, and also that there is no deadlock.
|
||||
func (s) TestResolverUpdateDuringBuild_ServiceConfigParseError(t *testing.T) {
|
||||
// Setting InitialState on the manual resolver makes it call into the
|
||||
// ClientConn during the Build call.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Err: errors.New("resolver build err")}})
|
||||
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
client := testpb.NewTestServiceClient(cc)
|
||||
const wantMsg = "error parsing service config"
|
||||
const wantCode = codes.Unavailable
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
|
||||
t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeConfig struct {
|
||||
serviceconfig.Config
|
||||
}
|
||||
|
||||
// TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError makes the
|
||||
// resolver.Builder call into the ClientConn, during the Build call, with an
|
||||
// invalid service config type.
|
||||
//
|
||||
// We use two separate mutexes in the code which make sure there is no data race
|
||||
// in this code path, and also that there is no deadlock.
|
||||
func (s) TestResolverUpdateDuringBuild_ServiceConfigInvalidTypeError(t *testing.T) {
|
||||
// Setting InitialState on the manual resolver makes it call into the
|
||||
// ClientConn during the Build call.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
r.InitialState(resolver.State{ServiceConfig: &serviceconfig.ParseResult{Config: fakeConfig{}}})
|
||||
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
client := testpb.NewTestServiceClient(cc)
|
||||
const wantMsg = "illegal service config type"
|
||||
const wantCode = codes.Unavailable
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
|
||||
t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg)
|
||||
}
|
||||
}
|
||||
|
||||
// TestResolverUpdate_InvalidServiceConfigAsFirstUpdate makes the resolver send
|
||||
// an update with an invalid service config as its first update. This should
|
||||
// make the ClientConn apply the failing LB policy, and should result in RPC
|
||||
// errors indicating the failing service config.
|
||||
func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) {
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
scpr := r.CC.ParseServiceConfig("bad json service config")
|
||||
r.UpdateState(resolver.State{ServiceConfig: scpr})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
client := testpb.NewTestServiceClient(cc)
|
||||
const wantMsg = "error parsing service config"
|
||||
const wantCode = codes.Unavailable
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != wantCode || !strings.Contains(status.Convert(err).Message(), wantMsg) {
|
||||
t.Fatalf("EmptyCall RPC failed: %v; want code: %v, want message: %q", err, wantCode, wantMsg)
|
||||
}
|
||||
}
|
||||
|
||||
// The wrappingBalancer wraps a pick_first balancer and writes to a channel when
|
||||
// it receives a ClientConn update. This is different to a stub balancer which
|
||||
// only notifies of updates from grpc, but does not contain a real balanacer.
|
||||
//
|
||||
// The wrappingBalancer allows us to write tests with a real backend and make
|
||||
// real RPCs.
|
||||
type wrappingBalancerBuilder struct {
|
||||
name string
|
||||
updateCh *testutils.Channel
|
||||
}
|
||||
|
||||
func (bb wrappingBalancerBuilder) Name() string { return bb.name }
|
||||
|
||||
func (bb wrappingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
|
||||
pf := balancer.Get(grpc.PickFirstBalancerName)
|
||||
b := &wrappingBalancer{
|
||||
Balancer: pf.Build(cc, opts),
|
||||
updateCh: bb.updateCh,
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (bb wrappingBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
|
||||
cfg := &wrappingBalancerConfig{}
|
||||
if err := json.Unmarshal(c, cfg); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
type wrappingBalancer struct {
|
||||
balancer.Balancer
|
||||
updateCh *testutils.Channel
|
||||
}
|
||||
|
||||
func (b *wrappingBalancer) UpdateClientConnState(c balancer.ClientConnState) error {
|
||||
if _, ok := c.BalancerConfig.(*wrappingBalancerConfig); !ok {
|
||||
return fmt.Errorf("received balancer config of unsupported type %T", c.BalancerConfig)
|
||||
}
|
||||
b.updateCh.Send(c)
|
||||
return b.Balancer.UpdateClientConnState(c)
|
||||
}
|
||||
|
||||
type wrappingBalancerConfig struct {
|
||||
serviceconfig.LoadBalancingConfig
|
||||
Config string `json:"config,omitempty"`
|
||||
}
|
||||
|
||||
func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error {
|
||||
if got, want := got.ResolverState.Addresses, want.ResolverState.Addresses; !cmp.Equal(got, want) {
|
||||
return fmt.Errorf("update got unexpected addresses: %v, want %v", got, want)
|
||||
}
|
||||
if got, want := got.ResolverState.ServiceConfig.Config, want.ResolverState.ServiceConfig.Config; !internal.EqualServiceConfigForTesting(got, want) {
|
||||
return fmt.Errorf("received unexpected service config: \ngot: %v \nwant: %v", got, want)
|
||||
}
|
||||
if got, want := got.BalancerConfig, want.BalancerConfig; !cmp.Equal(got, want) {
|
||||
return fmt.Errorf("received unexpected balancer config: \ngot: %v \nwant: %v", cmp.Diff(nil, got), cmp.Diff(nil, want))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate tests the scenario
|
||||
// where the resolver sends an update with an invalid service config after
|
||||
// having sent a good update. This should result in the ClientConn discarding
|
||||
// the new invalid service config, and continuing to use the old good config.
|
||||
func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) {
|
||||
// Register a wrapper balancer to get notified of ClientConn updates.
|
||||
ccsCh := testutils.NewChannel()
|
||||
balancer.Register(wrappingBalancerBuilder{
|
||||
name: t.Name(),
|
||||
updateCh: ccsCh,
|
||||
})
|
||||
|
||||
// Start a backend exposing the test service.
|
||||
backend := &stubserver.StubServer{
|
||||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Logf("Started TestService backend at: %q", backend.Address)
|
||||
defer backend.Stop()
|
||||
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial(_, _) = _, %v; want _, nil", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
// Push a resolver update and verify that our balancer receives the update.
|
||||
addrs := []resolver.Address{{Addr: backend.Address}}
|
||||
const lbCfg = "wrapping balancer LB policy config"
|
||||
goodSC := r.CC.ParseServiceConfig(fmt.Sprintf(`
|
||||
{
|
||||
"loadBalancingConfig": [
|
||||
{
|
||||
"%v": {
|
||||
"config": "%s"
|
||||
}
|
||||
}
|
||||
]
|
||||
}`, t.Name(), lbCfg))
|
||||
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: goodSC})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
wantCCS := balancer.ClientConnState{
|
||||
ResolverState: resolver.State{
|
||||
Addresses: addrs,
|
||||
ServiceConfig: goodSC,
|
||||
},
|
||||
BalancerConfig: &wrappingBalancerConfig{Config: lbCfg},
|
||||
}
|
||||
ccs, err := ccsCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Timeout when waiting for ClientConnState update from grpc")
|
||||
}
|
||||
gotCCS := ccs.(balancer.ClientConnState)
|
||||
if err := verifyClientConnStateUpdate(gotCCS, wantCCS); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Ensure RPCs are successful.
|
||||
client := testpb.NewTestServiceClient(cc)
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||
t.Fatalf("EmptyCall RPC failed: %v", err)
|
||||
}
|
||||
|
||||
// Push a bad resolver update and ensure that the update is propagated to our
|
||||
// stub balancer. But since the pushed update contains an invalid service
|
||||
// config, our balancer should continue to see the old loadBalancingConfig.
|
||||
badSC := r.CC.ParseServiceConfig("bad json service config")
|
||||
wantCCS.ResolverState.ServiceConfig = badSC
|
||||
r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: badSC})
|
||||
ccs, err = ccsCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Timeout when waiting for ClientConnState update from grpc")
|
||||
}
|
||||
gotCCS = ccs.(balancer.ClientConnState)
|
||||
if err := verifyClientConnStateUpdate(gotCCS, wantCCS); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// RPCs should continue to be successful since the ClientConn is using the old
|
||||
// good service config.
|
||||
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||
t.Fatalf("EmptyCall RPC failed: %v", err)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user