xds: report loads to traffic director (#2789)
This commit is contained in:
@ -32,6 +32,7 @@ import (
|
||||
"google.golang.org/grpc/balancer/roundrobin"
|
||||
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
|
||||
percentpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/type/percent"
|
||||
"google.golang.org/grpc/balancer/xds/lrs"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -56,6 +57,7 @@ type EDSBalancer struct {
|
||||
bg *balancerGroup
|
||||
subBalancerBuilder balancer.Builder
|
||||
lidToConfig map[string]*localityConfig
|
||||
loadStore lrs.Store
|
||||
|
||||
pickerMu sync.Mutex
|
||||
drops []*dropper
|
||||
@ -64,12 +66,13 @@ type EDSBalancer struct {
|
||||
}
|
||||
|
||||
// NewXDSBalancer create a new EDSBalancer.
|
||||
func NewXDSBalancer(cc balancer.ClientConn) *EDSBalancer {
|
||||
func NewXDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) *EDSBalancer {
|
||||
xdsB := &EDSBalancer{
|
||||
ClientConn: cc,
|
||||
subBalancerBuilder: balancer.Get(roundrobin.Name),
|
||||
|
||||
lidToConfig: make(map[string]*localityConfig),
|
||||
loadStore: loadStore,
|
||||
}
|
||||
// Don't start balancer group here. Start it when handling the first EDS
|
||||
// response. Otherwise the balancer group will be started with round-robin,
|
||||
@ -158,7 +161,7 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []*edspb.ClusterLoadAssignment
|
||||
xdsB.drops = newDrops
|
||||
if xdsB.innerPicker != nil {
|
||||
// Update picker with old inner picker, new drops.
|
||||
xdsB.ClientConn.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops))
|
||||
xdsB.ClientConn.UpdateBalancerState(xdsB.innerState, newDropPicker(xdsB.innerPicker, newDrops, xdsB.loadStore))
|
||||
}
|
||||
xdsB.pickerMu.Unlock()
|
||||
}
|
||||
@ -269,7 +272,7 @@ func (xdsB *EDSBalancer) UpdateBalancerState(s connectivity.State, p balancer.Pi
|
||||
xdsB.innerPicker = p
|
||||
xdsB.innerState = s
|
||||
// Don't reset drops when it's a state change.
|
||||
xdsB.ClientConn.UpdateBalancerState(s, newDropPicker(p, xdsB.drops))
|
||||
xdsB.ClientConn.UpdateBalancerState(s, newDropPicker(p, xdsB.drops, xdsB.loadStore))
|
||||
}
|
||||
|
||||
// Close closes the balancer.
|
||||
@ -278,26 +281,35 @@ func (xdsB *EDSBalancer) Close() {
|
||||
}
|
||||
|
||||
type dropPicker struct {
|
||||
drops []*dropper
|
||||
p balancer.Picker
|
||||
drops []*dropper
|
||||
p balancer.Picker
|
||||
loadStore lrs.Store
|
||||
}
|
||||
|
||||
func newDropPicker(p balancer.Picker, drops []*dropper) *dropPicker {
|
||||
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore lrs.Store) *dropPicker {
|
||||
return &dropPicker{
|
||||
drops: drops,
|
||||
p: p,
|
||||
drops: drops,
|
||||
p: p,
|
||||
loadStore: loadStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dropPicker) Pick(ctx context.Context, opts balancer.PickOptions) (conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
|
||||
var drop bool
|
||||
var (
|
||||
drop bool
|
||||
category string
|
||||
)
|
||||
for _, dp := range d.drops {
|
||||
if dp.drop() {
|
||||
drop = true
|
||||
category = dp.category
|
||||
break
|
||||
}
|
||||
}
|
||||
if drop {
|
||||
if d.loadStore != nil {
|
||||
d.loadStore.CallDropped(category)
|
||||
}
|
||||
return nil, nil, status.Errorf(codes.Unavailable, "RPC is dropped")
|
||||
}
|
||||
// TODO: (eds) don't drop unless the inner picker is READY. Similar to
|
||||
|
@ -117,7 +117,7 @@ func (clab *clusterLoadAssignmentBuilder) build() *edspb.ClusterLoadAssignment {
|
||||
// - change drop rate
|
||||
func TestEDS_OneLocality(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
|
||||
// One locality with one backend.
|
||||
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -226,7 +226,7 @@ func TestEDS_OneLocality(t *testing.T) {
|
||||
// - update locality weight
|
||||
func TestEDS_TwoLocalities(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
|
||||
// Two localities, each with one backend.
|
||||
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
|
||||
@ -385,7 +385,7 @@ func (tcp *testConstPicker) Pick(ctx context.Context, opts balancer.PickOptions)
|
||||
// eds response.
|
||||
func TestEDS_UpdateSubBalancerName(t *testing.T) {
|
||||
cc := newTestClientConn(t)
|
||||
edsb := NewXDSBalancer(cc)
|
||||
edsb := NewXDSBalancer(cc, nil)
|
||||
|
||||
t.Logf("update sub-balancer to test-const-balancer")
|
||||
edsb.HandleChildPolicy("test-const-balancer", nil)
|
||||
@ -502,7 +502,7 @@ func TestDropPicker(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
|
||||
p := newDropPicker(constPicker, tt.drops)
|
||||
p := newDropPicker(constPicker, tt.drops, nil)
|
||||
|
||||
// scCount is the number of sc's returned by pick. The opposite of
|
||||
// drop-count.
|
||||
|
@ -32,6 +32,13 @@ import (
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
)
|
||||
|
||||
// Store defines the interface for a load store. It keeps loads and can report
|
||||
// them to a server when requested.
|
||||
type Store interface {
|
||||
CallDropped(category string)
|
||||
ReportTo(ctx context.Context, cc *grpc.ClientConn)
|
||||
}
|
||||
|
||||
// lrsStore collects loads from xds balancer, and periodically sends load to the
|
||||
// server.
|
||||
type lrsStore struct {
|
||||
@ -45,7 +52,8 @@ type lrsStore struct {
|
||||
|
||||
const grpcHostname = "com.googleapis.trafficdirector.grpc_hostname"
|
||||
|
||||
func newStore(serviceName string) *lrsStore {
|
||||
// NewStore creates a store for load reports.
|
||||
func NewStore(serviceName string) Store {
|
||||
return &lrsStore{
|
||||
serviceName: serviceName,
|
||||
node: &basepb.Node{
|
||||
@ -67,7 +75,8 @@ func newStore(serviceName string) *lrsStore {
|
||||
// Update functions are called by picker for each RPC. To avoid contention, all
|
||||
// updates are done atomically.
|
||||
|
||||
func (ls *lrsStore) callDropped(category string) {
|
||||
// CallDropped adds one drop record with the given category to store.
|
||||
func (ls *lrsStore) CallDropped(category string) {
|
||||
p, ok := ls.drops.Load(category)
|
||||
if !ok {
|
||||
tp := new(uint64)
|
||||
@ -114,10 +123,10 @@ func (ls *lrsStore) buildStats() []*loadreportpb.ClusterStats {
|
||||
return ret
|
||||
}
|
||||
|
||||
// reportTo makes a streaming lrs call to cc and blocks.
|
||||
// ReportTo makes a streaming lrs call to cc and blocks.
|
||||
//
|
||||
// It retries the call (with backoff) until ctx is canceled.
|
||||
func (ls *lrsStore) reportTo(ctx context.Context, cc *grpc.ClientConn) {
|
||||
func (ls *lrsStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
|
||||
c := lrspb.NewLoadReportingServiceClient(cc)
|
||||
var (
|
||||
retryCount int
|
||||
|
@ -85,7 +85,7 @@ func Test_lrsStore_buildStats(t *testing.T) {
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
ls := newStore(testService)
|
||||
ls := NewStore(testService).(*lrsStore)
|
||||
|
||||
for _, ds := range tt.drops {
|
||||
var (
|
||||
@ -112,7 +112,7 @@ func Test_lrsStore_buildStats(t *testing.T) {
|
||||
for i := 0; i < int(count); i++ {
|
||||
wg.Add(1)
|
||||
go func(i int, c string) {
|
||||
ls.callDropped(c)
|
||||
ls.CallDropped(c)
|
||||
wg.Done()
|
||||
}(i, c)
|
||||
}
|
||||
@ -196,7 +196,7 @@ func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr str
|
||||
}
|
||||
}
|
||||
|
||||
func Test_lrsStore_reportTo(t *testing.T) {
|
||||
func Test_lrsStore_ReportTo(t *testing.T) {
|
||||
const intervalNano = 1000 * 1000 * 50
|
||||
addr, lrss, cleanup := setupServer(t, &durationpb.Duration{
|
||||
Seconds: 0,
|
||||
@ -204,7 +204,7 @@ func Test_lrsStore_reportTo(t *testing.T) {
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
ls := newStore(testService)
|
||||
ls := NewStore(testService)
|
||||
cc, err := grpc.Dial(addr, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to dial: %v", err)
|
||||
@ -213,7 +213,7 @@ func Test_lrsStore_reportTo(t *testing.T) {
|
||||
defer cancel()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
ls.reportTo(ctx, cc)
|
||||
ls.ReportTo(ctx, cc)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@ -224,7 +224,7 @@ func Test_lrsStore_reportTo(t *testing.T) {
|
||||
|
||||
for c, d := range drops {
|
||||
for i := 0; i < int(d); i++ {
|
||||
ls.callDropped(c)
|
||||
ls.CallDropped(c)
|
||||
time.Sleep(time.Nanosecond * intervalNano / 10)
|
||||
}
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
"google.golang.org/grpc/balancer/xds/edsbalancer"
|
||||
cdspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/cds"
|
||||
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
|
||||
"google.golang.org/grpc/balancer/xds/lrs"
|
||||
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -51,8 +52,8 @@ var (
|
||||
// TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove
|
||||
// this field and configure through BuildOptions instead.
|
||||
startupTimeout = defaultTimeout
|
||||
newEDSBalancer = func(cc balancer.ClientConn) edsBalancerInterface {
|
||||
return edsbalancer.NewXDSBalancer(cc)
|
||||
newEDSBalancer = func(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
|
||||
return edsbalancer.NewXDSBalancer(cc, loadStore)
|
||||
}
|
||||
)
|
||||
|
||||
@ -78,6 +79,7 @@ func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
|
||||
grpcUpdate: make(chan interface{}),
|
||||
xdsClientUpdate: make(chan interface{}),
|
||||
timer: createDrainedTimer(), // initialized a timer that won't fire without reset
|
||||
loadStore: lrs.NewStore(cc.Target()),
|
||||
}
|
||||
x.cc = &xdsClientConn{
|
||||
updateState: x.connStateMgr.updateState,
|
||||
@ -130,6 +132,7 @@ type xdsBalancer struct {
|
||||
xdsLB edsBalancerInterface
|
||||
fallbackLB balancer.Balancer
|
||||
fallbackInitData *resolver.State // may change when HandleResolved address is called
|
||||
loadStore lrs.Store
|
||||
}
|
||||
|
||||
func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
|
||||
@ -184,7 +187,7 @@ func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
|
||||
prevClient.close()
|
||||
}
|
||||
}
|
||||
x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, newADS, loseContact, exitCleanup)
|
||||
x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, x.loadStore, newADS, loseContact, exitCleanup)
|
||||
go x.client.run()
|
||||
}
|
||||
|
||||
@ -519,7 +522,7 @@ func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
|
||||
x.fallbackLB.Close()
|
||||
x.fallbackLB = nil
|
||||
}
|
||||
x.xdsLB = newEDSBalancer(x.cc)
|
||||
x.xdsLB = newEDSBalancer(x.cc, x.loadStore)
|
||||
if x.config.ChildPolicy != nil {
|
||||
x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config)
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ import (
|
||||
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
|
||||
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
|
||||
adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
|
||||
"google.golang.org/grpc/balancer/xds/lrs"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
@ -69,6 +70,9 @@ type client struct {
|
||||
cleanup func()
|
||||
backoff backoff.Strategy
|
||||
|
||||
loadStore lrs.Store
|
||||
loadReportOnce sync.Once
|
||||
|
||||
mu sync.Mutex
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
@ -249,9 +253,18 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
|
||||
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
|
||||
return
|
||||
}
|
||||
// Only start load reporting after ADS resp is received.
|
||||
//
|
||||
// Also, newADS() will close the previous load reporting stream, so we
|
||||
// don't have double reporting.
|
||||
c.loadReportOnce.Do(func() {
|
||||
if c.loadStore != nil {
|
||||
go c.loadStore.ReportTo(c.ctx, c.cc)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
|
||||
func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
|
||||
c := &client{
|
||||
balancerName: balancerName,
|
||||
serviceName: serviceName,
|
||||
@ -261,6 +274,7 @@ func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts
|
||||
loseContact: loseContact,
|
||||
cleanup: exitCleanup,
|
||||
backoff: defaultBackoffConfig,
|
||||
loadStore: loadStore,
|
||||
}
|
||||
|
||||
c.ctx, c.cancel = context.WithCancel(context.Background())
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
anypb "github.com/golang/protobuf/ptypes/any"
|
||||
durationpb "github.com/golang/protobuf/ptypes/duration"
|
||||
structpb "github.com/golang/protobuf/ptypes/struct"
|
||||
wrpb "github.com/golang/protobuf/ptypes/wrappers"
|
||||
"google.golang.org/grpc"
|
||||
@ -41,6 +42,7 @@ import (
|
||||
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
|
||||
endpointpb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/endpoint/endpoint"
|
||||
adspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/discovery/v2/ads"
|
||||
lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
@ -266,16 +268,24 @@ type testConfig struct {
|
||||
svrErr error
|
||||
}
|
||||
|
||||
func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) {
|
||||
func setupServer(t *testing.T) (addr string, td *testTrafficDirector, lrss *lrsServer, cleanup func()) {
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("listen failed due to: %v", err)
|
||||
}
|
||||
svr := grpc.NewServer()
|
||||
td = newTestTrafficDirector()
|
||||
lrss = &lrsServer{
|
||||
drops: make(map[string]uint64),
|
||||
reportingInterval: &durationpb.Duration{
|
||||
Seconds: 60 * 60, // 1 hour, each test can override this to a shorter duration.
|
||||
Nanos: 0,
|
||||
},
|
||||
}
|
||||
adspb.RegisterAggregatedDiscoveryServiceServer(svr, td)
|
||||
lrspb.RegisterLoadReportingServiceServer(svr, lrss)
|
||||
go svr.Serve(lis)
|
||||
return lis.Addr().String(), td, func() {
|
||||
return lis.Addr().String(), td, lrss, func() {
|
||||
svr.Stop()
|
||||
lis.Close()
|
||||
}
|
||||
@ -301,14 +311,14 @@ func (s) TestXdsClientResponseHandling(t *testing.T) {
|
||||
}
|
||||
|
||||
func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
adsChan := make(chan proto.Message, 10)
|
||||
newADS := func(ctx context.Context, i proto.Message) error {
|
||||
adsChan <- i
|
||||
return nil
|
||||
}
|
||||
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {})
|
||||
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, nil, newADS, func(context.Context) {}, func() {})
|
||||
defer client.close()
|
||||
go client.run()
|
||||
|
||||
@ -365,7 +375,7 @@ func (s) TestXdsClientLoseContact(t *testing.T) {
|
||||
}
|
||||
|
||||
func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
adsChan := make(chan proto.Message, 10)
|
||||
newADS := func(ctx context.Context, i proto.Message) error {
|
||||
@ -376,7 +386,7 @@ func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
|
||||
loseContactFunc := func(context.Context) {
|
||||
contactChan <- &loseContact{}
|
||||
}
|
||||
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
|
||||
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, nil, newADS, loseContactFunc, func() {})
|
||||
defer client.close()
|
||||
go client.run()
|
||||
|
||||
@ -398,7 +408,7 @@ func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
|
||||
}
|
||||
|
||||
func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
|
||||
adsChan := make(chan proto.Message, 10)
|
||||
@ -410,7 +420,7 @@ func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig
|
||||
loseContactFunc := func(context.Context) {
|
||||
contactChan <- &loseContact{}
|
||||
}
|
||||
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
|
||||
client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, nil, newADS, loseContactFunc, func() {})
|
||||
defer client.close()
|
||||
go client.run()
|
||||
|
||||
@ -432,7 +442,7 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) {
|
||||
cfg := &testConfig{
|
||||
svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
|
||||
}
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
|
||||
adsChan := make(chan proto.Message, 10)
|
||||
@ -444,7 +454,7 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) {
|
||||
loseContactFunc := func(context.Context) {
|
||||
contactChan <- &loseContact{}
|
||||
}
|
||||
client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
|
||||
client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, nil, newADS, loseContactFunc, func() {})
|
||||
defer client.close()
|
||||
go client.run()
|
||||
|
||||
|
157
balancer/xds/xds_lrs_test.go
Normal file
157
balancer/xds/xds_lrs_test.go
Normal file
@ -0,0 +1,157 @@
|
||||
// +build go1.12
|
||||
|
||||
/*
|
||||
*
|
||||
* Copyright 2019 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package xds
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
durationpb "github.com/golang/protobuf/ptypes/duration"
|
||||
structpb "github.com/golang/protobuf/ptypes/struct"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc/balancer"
|
||||
basepb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/core/base"
|
||||
lrspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/service/load_stats/v2/lrs"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
type lrsServer struct {
|
||||
mu sync.Mutex
|
||||
dropTotal uint64
|
||||
drops map[string]uint64
|
||||
reportingInterval *durationpb.Duration
|
||||
}
|
||||
|
||||
func (lrss *lrsServer) StreamLoadStats(stream lrspb.LoadReportingService_StreamLoadStatsServer) error {
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !proto.Equal(req, &lrspb.LoadStatsRequest{
|
||||
Node: &basepb.Node{
|
||||
Metadata: &structpb.Struct{
|
||||
Fields: map[string]*structpb.Value{
|
||||
grpcHostname: {
|
||||
Kind: &structpb.Value_StringValue{StringValue: testServiceName},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}) {
|
||||
return status.Errorf(codes.FailedPrecondition, "unexpected req: %+v", req)
|
||||
}
|
||||
if err := stream.Send(&lrspb.LoadStatsResponse{
|
||||
Clusters: []string{testServiceName},
|
||||
LoadReportingInterval: lrss.reportingInterval,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
stats := req.ClusterStats[0]
|
||||
lrss.mu.Lock()
|
||||
lrss.dropTotal += stats.TotalDroppedRequests
|
||||
for _, d := range stats.DroppedRequests {
|
||||
lrss.drops[d.Category] += d.DroppedCount
|
||||
}
|
||||
lrss.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestXdsLoadReporting(t *testing.T) {
|
||||
originalNewEDSBalancer := newEDSBalancer
|
||||
newEDSBalancer = newFakeEDSBalancer
|
||||
defer func() {
|
||||
newEDSBalancer = originalNewEDSBalancer
|
||||
}()
|
||||
|
||||
builder := balancer.Get("xds")
|
||||
cc := newTestClientConn()
|
||||
lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
|
||||
if !ok {
|
||||
t.Fatalf("unable to type assert to *xdsBalancer")
|
||||
}
|
||||
defer lb.Close()
|
||||
|
||||
addr, td, lrss, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
|
||||
const intervalNano = 1000 * 1000 * 50
|
||||
lrss.reportingInterval = &durationpb.Duration{
|
||||
Seconds: 0,
|
||||
Nanos: intervalNano,
|
||||
}
|
||||
|
||||
cfg := &testBalancerConfig{
|
||||
BalancerName: addr,
|
||||
ChildPolicy: []lbPolicy{fakeBalancerA}, // Set this to skip cds.
|
||||
}
|
||||
lb.UpdateResolverState(resolver.State{
|
||||
ServiceConfig: constructServiceConfigFromXdsConfig(cfg),
|
||||
})
|
||||
td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
|
||||
var (
|
||||
i int
|
||||
edsLB *fakeEDSBalancer
|
||||
)
|
||||
for i = 0; i < 10; i++ {
|
||||
edsLB = getLatestEdsBalancer()
|
||||
if edsLB != nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if i == 10 {
|
||||
t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
|
||||
}
|
||||
|
||||
var dropCategories = []string{"drop_for_real", "drop_for_fun"}
|
||||
drops := map[string]uint64{
|
||||
dropCategories[0]: 31,
|
||||
dropCategories[1]: 41,
|
||||
}
|
||||
|
||||
for c, d := range drops {
|
||||
for i := 0; i < int(d); i++ {
|
||||
edsLB.loadStore.CallDropped(c)
|
||||
time.Sleep(time.Nanosecond * intervalNano / 10)
|
||||
}
|
||||
}
|
||||
time.Sleep(time.Nanosecond * intervalNano * 2)
|
||||
|
||||
lrss.mu.Lock()
|
||||
defer lrss.mu.Unlock()
|
||||
if !cmp.Equal(lrss.drops, drops) {
|
||||
t.Errorf("different: %v", cmp.Diff(lrss.drops, drops))
|
||||
}
|
||||
}
|
@ -31,6 +31,7 @@ import (
|
||||
"google.golang.org/grpc/balancer"
|
||||
discoverypb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/discovery"
|
||||
edspb "google.golang.org/grpc/balancer/xds/internal/proto/envoy/api/v2/eds"
|
||||
"google.golang.org/grpc/balancer/xds/lrs"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/leakcheck"
|
||||
@ -214,6 +215,7 @@ type fakeEDSBalancer struct {
|
||||
childPolicy chan *loadBalancingConfig
|
||||
fallbackPolicy chan *loadBalancingConfig
|
||||
subconnStateChange chan *scStateChange
|
||||
loadStore lrs.Store
|
||||
}
|
||||
|
||||
func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
|
||||
@ -237,13 +239,14 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
|
||||
}
|
||||
}
|
||||
|
||||
func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerInterface {
|
||||
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
|
||||
lb := &fakeEDSBalancer{
|
||||
cc: cc,
|
||||
edsChan: make(chan *edspb.ClusterLoadAssignment, 10),
|
||||
childPolicy: make(chan *loadBalancingConfig, 10),
|
||||
fallbackPolicy: make(chan *loadBalancingConfig, 10),
|
||||
subconnStateChange: make(chan *scStateChange, 10),
|
||||
loadStore: loadStore,
|
||||
}
|
||||
mu.Lock()
|
||||
latestFakeEdsBalancer = lb
|
||||
@ -337,7 +340,7 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
|
||||
// In the first iteration, an eds balancer takes over fallback balancer
|
||||
// In the second iteration, a new xds client takes over previous one.
|
||||
for i := 0; i < 2; i++ {
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
cleanups = append(cleanups, cleanup)
|
||||
workingServiceConfig := constructServiceConfigFromXdsConfig(&testBalancerConfig{
|
||||
BalancerName: addr,
|
||||
@ -426,7 +429,7 @@ func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
|
||||
},
|
||||
},
|
||||
} {
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
cleanups = append(cleanups, cleanup)
|
||||
test.cfg.BalancerName = addr
|
||||
|
||||
@ -474,7 +477,7 @@ func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
|
||||
}
|
||||
defer lb.Close()
|
||||
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
|
||||
cfg := &testBalancerConfig{
|
||||
BalancerName: addr,
|
||||
@ -549,7 +552,7 @@ func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
|
||||
}
|
||||
defer lb.Close()
|
||||
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
cfg := &testBalancerConfig{
|
||||
BalancerName: addr,
|
||||
@ -629,7 +632,7 @@ func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
|
||||
}
|
||||
defer lb.Close()
|
||||
|
||||
addr, td, cleanup := setupServer(t)
|
||||
addr, td, _, cleanup := setupServer(t)
|
||||
defer cleanup()
|
||||
cfg := &testBalancerConfig{
|
||||
BalancerName: addr,
|
||||
|
Reference in New Issue
Block a user