diff --git a/balancer/xds/edsbalancer/edsbalancer.go b/balancer/xds/edsbalancer/edsbalancer.go index 97e329d6..483228fe 100644 --- a/balancer/xds/edsbalancer/edsbalancer.go +++ b/balancer/xds/edsbalancer/edsbalancer.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/balancer/roundrobin" edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds" percentpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/type/percent" + "google.golang.org/grpc/balancer/xds/lrs" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -56,6 +57,7 @@ type EDSBalancer struct { bg *balancerGroup subBalancerBuilder balancer.Builder lidToConfig map[string]*localityConfig + loadStore lrs.Store pickerMu sync.Mutex drops []*dropper @@ -64,12 +66,13 @@ type EDSBalancer struct { } // NewXDSBalancer create a new EDSBalancer. -func NewXDSBalancer(cc balancer.ClientConn) *EDSBalancer { +func NewXDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) *EDSBalancer { xdsB := &EDSBalancer{ ClientConn: cc, subBalancerBuilder: balancer.Get(roundrobin.Name), lidToConfig: make(map[string]*localityConfig), + loadStore: loadStore, } // Don't start balancer group here. Start it when handling the first EDS // response. Otherwise the balancer group will be started with round-robin, @@ -158,7 +161,7 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []*edspb.ClusterLoadAssignment xdsB.drops = newDrops if xdsB.innerPicker != nil { // Update picker with old inner picker, new drops. - xdsB.ClientConn.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops)) + xdsB.ClientConn.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops, xdsB.loadStore)) } xdsB.pickerMu.Unlock() } @@ -269,7 +272,7 @@ func (xdsB *EDSBalancer) UpdateBalancerState(s connectivity.State, p balancer.Pi xdsB.innerPicker = p xdsB.innerState = s // Don't reset drops when it's a state change. - xdsB.ClientConn.UpdateBalancerState(s, newDropPicker(p, xdsB.drops)) + xdsB.ClientConn.UpdateBalancerState(s, newDropPicker(p, xdsB.drops, xdsB.loadStore)) } // Close closes the balancer. @@ -278,26 +281,35 @@ func (xdsB *EDSBalancer) Close() { } type dropPicker struct { - drops []*dropper - p balancer.Picker + drops []*dropper + p balancer.Picker + loadStore lrs.Store } -func newDropPicker(p balancer.Picker, drops []*dropper) *dropPicker { +func newDropPicker(p balancer.Picker, drops []*dropper, loadStore lrs.Store) *dropPicker { return &dropPicker{ - drops: drops, - p: p, + drops: drops, + p: p, + loadStore: loadStore, } } func (d *dropPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) { - var drop bool + var ( + drop bool + category string + ) for _, dp := range d.drops { if dp.drop() { drop = true + category = dp.category break } } if drop { + if d.loadStore != nil { + d.loadStore.CallDropped(category) + } return nil, nil, status.Errorf(codes.Unavailable, "RPC is dropped") } // TODO: (eds) don't drop unless the inner picker is READY. Similar to diff --git a/balancer/xds/edsbalancer/edsbalancer_test.go b/balancer/xds/edsbalancer/edsbalancer_test.go index b9b037f3..b1d7c339 100644 --- a/balancer/xds/edsbalancer/edsbalancer_test.go +++ b/balancer/xds/edsbalancer/edsbalancer_test.go @@ -117,7 +117,7 @@ func (clab *clusterLoadAssignmentBuilder) build() *edspb.ClusterLoadAssignment { // - change drop rate func TestEDS_OneLocality(t *testing.T) { cc := newTestClientConn(t) - edsb := NewXDSBalancer(cc) + edsb := NewXDSBalancer(cc, nil) // One locality with one backend. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) @@ -226,7 +226,7 @@ func TestEDS_OneLocality(t *testing.T) { // - update locality weight func TestEDS_TwoLocalities(t *testing.T) { cc := newTestClientConn(t) - edsb := NewXDSBalancer(cc) + edsb := NewXDSBalancer(cc, nil) // Two localities, each with one backend. clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) @@ -385,7 +385,7 @@ func (tcp *testConstPicker) Pick(ctx context.Context, opts balancer.PickOptions) // eds response. func TestEDS_UpdateSubBalancerName(t *testing.T) { cc := newTestClientConn(t) - edsb := NewXDSBalancer(cc) + edsb := NewXDSBalancer(cc, nil) t.Logf("update sub-balancer to test-const-balancer") edsb.HandleChildPolicy("test-const-balancer", nil) @@ -502,7 +502,7 @@ func TestDropPicker(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newDropPicker(constPicker, tt.drops) + p := newDropPicker(constPicker, tt.drops, nil) // scCount is the number of sc's returned by pick. The opposite of // drop-count. diff --git a/balancer/xds/lrs/lrs.go b/balancer/xds/lrs/lrs.go index e6689edd..a332ada0 100644 --- a/balancer/xds/lrs/lrs.go +++ b/balancer/xds/lrs/lrs.go @@ -32,6 +32,13 @@ import ( "google.golang.org/grpc/internal/backoff" ) +// Store defines the interface for a load store. It keeps loads and can report +// them to a server when requested. +type Store interface { + CallDropped(category string) + ReportTo(ctx context.Context, cc *grpc.ClientConn) +} + // lrsStore collects loads from xds balancer, and periodically sends load to the // server. type lrsStore struct { @@ -45,7 +52,8 @@ type lrsStore struct { const grpcHostname = "com.googleapis.trafficdirector.grpc_hostname" -func newStore(serviceName string) *lrsStore { +// NewStore creates a store for load reports. +func NewStore(serviceName string) Store { return &lrsStore{ serviceName: serviceName, node: &basepb.Node{ @@ -67,7 +75,8 @@ func newStore(serviceName string) *lrsStore { // Update functions are called by picker for each RPC. To avoid contention, all // updates are done atomically. -func (ls *lrsStore) callDropped(category string) { +// CallDropped adds one drop record with the given category to store. +func (ls *lrsStore) CallDropped(category string) { p, ok := ls.drops.Load(category) if !ok { tp := new(uint64) @@ -114,10 +123,10 @@ func (ls *lrsStore) buildStats() []*loadreportpb.ClusterStats { return ret } -// reportTo makes a streaming lrs call to cc and blocks. +// ReportTo makes a streaming lrs call to cc and blocks. // // It retries the call (with backoff) until ctx is canceled. -func (ls *lrsStore) reportTo(ctx context.Context, cc *grpc.ClientConn) { +func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) { c := lrspb.NewLoadReportingServiceClient(cc) var ( retryCount int diff --git a/balancer/xds/lrs/lrs_test.go b/balancer/xds/lrs/lrs_test.go index ab3b3229..91c4aaa0 100644 --- a/balancer/xds/lrs/lrs_test.go +++ b/balancer/xds/lrs/lrs_test.go @@ -85,7 +85,7 @@ func Test_lrsStore_buildStats(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ls := newStore(testService) + ls := NewStore(testService).(*lrsStore) for _, ds := range tt.drops { var ( @@ -112,7 +112,7 @@ func Test_lrsStore_buildStats(t *testing.T) { for i := 0; i < int(count); i++ { wg.Add(1) go func(i int, c string) { - ls.callDropped(c) + ls.CallDropped(c) wg.Done() }(i, c) } @@ -196,7 +196,7 @@ func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr str } } -func Test_lrsStore_reportTo(t *testing.T) { +func Test_lrsStore_ReportTo(t *testing.T) { const intervalNano = 1000 * 1000 * 50 addr, lrss, cleanup := setupServer(t, &durationpb.Duration{ Seconds: 0, @@ -204,7 +204,7 @@ func Test_lrsStore_reportTo(t *testing.T) { }) defer cleanup() - ls := newStore(testService) + ls := NewStore(testService) cc, err := grpc.Dial(addr, grpc.WithInsecure()) if err != nil { t.Fatalf("failed to dial: %v", err) @@ -213,7 +213,7 @@ func Test_lrsStore_reportTo(t *testing.T) { defer cancel() done := make(chan struct{}) go func() { - ls.reportTo(ctx, cc) + ls.ReportTo(ctx, cc) close(done) }() @@ -224,7 +224,7 @@ func Test_lrsStore_reportTo(t *testing.T) { for c, d := range drops { for i := 0; i < int(d); i++ { - ls.callDropped(c) + ls.CallDropped(c) time.Sleep(time.Nanosecond * intervalNano / 10) } } diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index ba8485a9..c305c828 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/balancer/xds/edsbalancer" cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds" edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds" + "google.golang.org/grpc/balancer/xds/lrs" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -51,8 +52,8 @@ var ( // TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove // this field and configure through BuildOptions instead. startupTimeout = defaultTimeout - newEDSBalancer = func(cc balancer.ClientConn) edsBalancerInterface { - return edsbalancer.NewXDSBalancer(cc) + newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { + return edsbalancer.NewXDSBalancer(cc, loadStore) } ) @@ -78,6 +79,7 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp grpcUpdate: make(chan interface{}), xdsClientUpdate: make(chan interface{}), timer: createDrainedTimer(), // initialized a timer that won't fire without reset + loadStore: lrs.NewStore(cc.Target()), } x.cc = &xdsClientConn{ updateState: x.connStateMgr.updateState, @@ -130,6 +132,7 @@ type xdsBalancer struct { xdsLB edsBalancerInterface fallbackLB balancer.Balancer fallbackInitData *resolver.State // may change when HandleResolved address is called + loadStore lrs.Store } func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { @@ -184,7 +187,7 @@ func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { prevClient.close() } } - x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, newADS, loseContact, exitCleanup) + x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, x.loadStore, newADS, loseContact, exitCleanup) go x.client.run() } @@ -519,7 +522,7 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { x.fallbackLB.Close() x.fallbackLB = nil } - x.xdsLB = newEDSBalancer(x.cc) + x.xdsLB = newEDSBalancer(x.cc, x.loadStore) if x.config.ChildPolicy != nil { x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config) } diff --git a/balancer/xds/xds_client.go b/balancer/xds/xds_client.go index 0cc8d4c0..60d86254 100644 --- a/balancer/xds/xds_client.go +++ b/balancer/xds/xds_client.go @@ -35,6 +35,7 @@ import ( discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery" edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds" adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads" + "google.golang.org/grpc/balancer/xds/lrs" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" @@ -69,6 +70,9 @@ type client struct { cleanup func() backoff backoff.Strategy + loadStore lrs.Store + loadReportOnce sync.Once + mu sync.Mutex cc *grpc.ClientConn } @@ -249,9 +253,18 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) { grpclog.Warningf("xds: processing new ADS message failed due to %v.", err) return } + // Only start load reporting after ADS resp is received. + // + // Also, newADS() will close the previous load reporting stream, so we + // don't have double reporting. + c.loadReportOnce.Do(func() { + if c.loadStore != nil { + go c.loadStore.ReportTo(c.ctx, c.cc) + } + }) } } -func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client { +func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client { c := &client{ balancerName: balancerName, serviceName: serviceName, @@ -261,6 +274,7 @@ func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts loseContact: loseContact, cleanup: exitCleanup, backoff: defaultBackoffConfig, + loadStore: loadStore, } c.ctx, c.cancel = context.WithCancel(context.Background()) diff --git a/balancer/xds/xds_client_test.go b/balancer/xds/xds_client_test.go index 416a97fd..a2b34e2c 100644 --- a/balancer/xds/xds_client_test.go +++ b/balancer/xds/xds_client_test.go @@ -30,6 +30,7 @@ import ( "github.com/golang/protobuf/proto" anypb "github.com/golang/protobuf/ptypes/any" + durationpb "github.com/golang/protobuf/ptypes/duration" structpb "github.com/golang/protobuf/ptypes/struct" wrpb "github.com/golang/protobuf/ptypes/wrappers" "google.golang.org/grpc" @@ -41,6 +42,7 @@ import ( edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds" endpointpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/endpoint" adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads" + lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -266,16 +268,24 @@ type testConfig struct { svrErr error } -func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) { +func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsServer, cleanup func()) { lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("listen failed due to: %v", err) } svr := grpc.NewServer() td = newTestTrafficDirector() + lrss = &lrsServer{ + drops: make(map[string]uint64), + reportingInterval: &durationpb.Duration{ + Seconds: 60 * 60, // 1 hour, each test can override this to a shorter duration. + Nanos: 0, + }, + } adspb.RegisterAggregatedDiscoveryServiceServer(svr, td) + lrspb.RegisterLoadReportingServiceServer(svr, lrss) go svr.Serve(lis) - return lis.Addr().String(), td, func() { + return lis.Addr().String(), td, lrss, func() { svr.Stop() lis.Close() } @@ -301,14 +311,14 @@ func (s) TestXdsClientResponseHandling(t *testing.T) { } func testXdsClientResponseHandling(t *testing.T, test *testConfig) { - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) newADS := func(ctx context.Context, i proto.Message) error { adsChan <- i return nil } - client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {}) + client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, nil, newADS, func(context.Context) {}, func() {}) defer client.close() go client.run() @@ -365,7 +375,7 @@ func (s) TestXdsClientLoseContact(t *testing.T) { } func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) newADS := func(ctx context.Context, i proto.Message) error { @@ -376,7 +386,7 @@ func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { loseContactFunc := func(context.Context) { contactChan <- &loseContact{} } - client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) + client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, nil, newADS, loseContactFunc, func() {}) defer client.close() go client.run() @@ -398,7 +408,7 @@ func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { } func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) { - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) @@ -410,7 +420,7 @@ func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig loseContactFunc := func(context.Context) { contactChan <- &loseContact{} } - client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) + client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, nil, newADS, loseContactFunc, func() {}) defer client.close() go client.run() @@ -432,7 +442,7 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) { cfg := &testConfig{ svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"), } - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) defer cleanup() adsChan := make(chan proto.Message, 10) @@ -444,7 +454,7 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) { loseContactFunc := func(context.Context) { contactChan <- &loseContact{} } - client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) + client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, nil, newADS, loseContactFunc, func() {}) defer client.close() go client.run() diff --git a/balancer/xds/xds_lrs_test.go b/balancer/xds/xds_lrs_test.go new file mode 100644 index 00000000..848d157a --- /dev/null +++ b/balancer/xds/xds_lrs_test.go @@ -0,0 +1,157 @@ +// +build go1.12 + +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "io" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + durationpb "github.com/golang/protobuf/ptypes/duration" + structpb "github.com/golang/protobuf/ptypes/struct" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" + basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base" + lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/status" +) + +type lrsServer struct { + mu sync.Mutex + dropTotal uint64 + drops map[string]uint64 + reportingInterval *durationpb.Duration +} + +func (lrss *lrsServer) StreamLoadStats(stream lrspb.LoadReportingService_StreamLoadStatsServer) error { + req, err := stream.Recv() + if err != nil { + return err + } + if !proto.Equal(req, &lrspb.LoadStatsRequest{ + Node: &basepb.Node{ + Metadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + grpcHostname: { + Kind: &structpb.Value_StringValue{StringValue: testServiceName}, + }, + }, + }, + }, + }) { + return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req) + } + if err := stream.Send(&lrspb.LoadStatsResponse{ + Clusters: []string{testServiceName}, + LoadReportingInterval: lrss.reportingInterval, + }); err != nil { + return err + } + + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + return err + } + stats := req.ClusterStats[0] + lrss.mu.Lock() + lrss.dropTotal += stats.TotalDroppedRequests + for _, d := range stats.DroppedRequests { + lrss.drops[d.Category] += d.DroppedCount + } + lrss.mu.Unlock() + } +} + +func (s) TestXdsLoadReporting(t *testing.T) { + originalNewEDSBalancer := newEDSBalancer + newEDSBalancer = newFakeEDSBalancer + defer func() { + newEDSBalancer = originalNewEDSBalancer + }() + + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + + addr, td, lrss, cleanup := setupServer(t) + defer cleanup() + + const intervalNano = 1000 * 1000 * 50 + lrss.reportingInterval = &durationpb.Duration{ + Seconds: 0, + Nanos: intervalNano, + } + + cfg := &testBalancerConfig{ + BalancerName: addr, + ChildPolicy: []lbPolicy{fakeBalancerA}, // Set this to skip cds. + } + lb.UpdateResolverState(resolver.State{ + ServiceConfig: constructServiceConfigFromXdsConfig(cfg), + }) + td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) + var ( + i int + edsLB *fakeEDSBalancer + ) + for i = 0; i < 10; i++ { + edsLB = getLatestEdsBalancer() + if edsLB != nil { + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") + } + + var dropCategories = []string{"drop_for_real", "drop_for_fun"} + drops := map[string]uint64{ + dropCategories[0]: 31, + dropCategories[1]: 41, + } + + for c, d := range drops { + for i := 0; i < int(d); i++ { + edsLB.loadStore.CallDropped(c) + time.Sleep(time.Nanosecond * intervalNano / 10) + } + } + time.Sleep(time.Nanosecond * intervalNano * 2) + + lrss.mu.Lock() + defer lrss.mu.Unlock() + if !cmp.Equal(lrss.drops, drops) { + t.Errorf("different: %v", cmp.Diff(lrss.drops, drops)) + } +} diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go index aacbdcf1..89334364 100644 --- a/balancer/xds/xds_test.go +++ b/balancer/xds/xds_test.go @@ -31,6 +31,7 @@ import ( "google.golang.org/grpc/balancer" discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery" edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds" + "google.golang.org/grpc/balancer/xds/lrs" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" "google.golang.org/grpc/internal/leakcheck" @@ -214,6 +215,7 @@ type fakeEDSBalancer struct { childPolicy chan *loadBalancingConfig fallbackPolicy chan *loadBalancingConfig subconnStateChange chan *scStateChange + loadStore lrs.Store } func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { @@ -237,13 +239,14 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) } } -func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerInterface { +func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { lb := &fakeEDSBalancer{ cc: cc, edsChan: make(chan *edspb.ClusterLoadAssignment, 10), childPolicy: make(chan *loadBalancingConfig, 10), fallbackPolicy: make(chan *loadBalancingConfig, 10), subconnStateChange: make(chan *scStateChange, 10), + loadStore: loadStore, } mu.Lock() latestFakeEdsBalancer = lb @@ -337,7 +340,7 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { // In the first iteration, an eds balancer takes over fallback balancer // In the second iteration, a new xds client takes over previous one. for i := 0; i < 2; i++ { - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) cleanups = append(cleanups, cleanup) workingServiceConfig := constructServiceConfigFromXdsConfig(&testBalancerConfig{ BalancerName: addr, @@ -426,7 +429,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { }, }, } { - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) cleanups = append(cleanups, cleanup) test.cfg.BalancerName = addr @@ -474,7 +477,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { } defer lb.Close() - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) cfg := &testBalancerConfig{ BalancerName: addr, @@ -549,7 +552,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { } defer lb.Close() - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) defer cleanup() cfg := &testBalancerConfig{ BalancerName: addr, @@ -629,7 +632,7 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { } defer lb.Close() - addr, td, cleanup := setupServer(t) + addr, td, _, cleanup := setupServer(t) defer cleanup() cfg := &testBalancerConfig{ BalancerName: addr,