diff --git a/balancer/xds/edsbalancer/edsbalancer.go b/balancer/xds/edsbalancer/edsbalancer.go new file mode 100644 index 00000000..6c1da917 --- /dev/null +++ b/balancer/xds/edsbalancer/edsbalancer.go @@ -0,0 +1,58 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package edsbalancer + +// TODO: this file is used as a place holder. It should be deleted after edsbalancer implementation +// is merged. + +import ( + "encoding/json" + + v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" +) + +type dummyEdsBalancer struct{} + +func (d *dummyEdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + panic("implement me") +} + +func (d *dummyEdsBalancer) HandleResolvedAddrs([]resolver.Address, error) { + panic("implement me") +} + +func (d *dummyEdsBalancer) Close() { + panic("implement me") +} + +func (d *dummyEdsBalancer) HandleEDSResponse(edsResp *v2.ClusterLoadAssignment) { + panic("implement me") +} + +func (d *dummyEdsBalancer) HandleChildPolicy(name string, config json.RawMessage) { + panic("implement me") +} + +// NewXDSBalancer creates an edsBalancer +func NewXDSBalancer(cc balancer.ClientConn) interface{} { + return &dummyEdsBalancer{} +} diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go index 2bd77d48..53f4c6e5 100644 --- a/balancer/xds/xds.go +++ b/balancer/xds/xds.go @@ -1,4 +1,5 @@ /* + * * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,7 +13,588 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * */ -// Package xds implements xds balancer. -package xds // import "google.golang.org/grpc/balancer/xds" +// Package xds implements a balancer that communicates with a remote balancer using the Envoy xDS +// protocol. +package xds + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "reflect" + "sync" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/gogo/protobuf/proto" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/xds/edsbalancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +const ( + defaultTimeout = 10 * time.Second + xdsName = "xds" +) + +var ( + // This field is for testing purpose. + // TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove + // this field and configure through BuildOptions instead. + startupTimeout = defaultTimeout + newEDSBalancer = edsbalancer.NewXDSBalancer +) + +func init() { + balancer.Register(newXDSBalancerBuilder()) +} + +type xdsBalancerBuilder struct{} + +func newXDSBalancerBuilder() balancer.Builder { + return &xdsBalancerBuilder{} +} + +func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + ctx, cancel := context.WithCancel(context.Background()) + x := &xdsBalancer{ + ctx: ctx, + cancel: cancel, + buildOpts: opts, + startupTimeout: startupTimeout, + connStateMgr: &connStateMgr{}, + startup: true, + grpcUpdate: make(chan interface{}), + xdsClientUpdate: make(chan interface{}), + timer: createDrainedTimer(), // initialized a timer that won't fire without reset + } + x.cc = &xdsClientConn{ + updateState: x.connStateMgr.updateState, + ClientConn: cc, + } + go x.run() + return x +} + +func (b *xdsBalancerBuilder) Name() string { + return xdsName +} + +// EdsBalancer defines the interface that edsBalancer must implement to communicate with xdsBalancer. +type EdsBalancer interface { + balancer.Balancer + // HandleEDSResponse passes the received EDS message from traffic director to eds balancer. + HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) + // HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. + HandleChildPolicy(name string, config json.RawMessage) +} + +// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer, +// or fallback LB). +type xdsBalancer struct { + cc balancer.ClientConn // *xdsClientConn + buildOpts balancer.BuildOptions + startupTimeout time.Duration + xdsStaleTimeout *time.Duration + connStateMgr *connStateMgr + ctx context.Context + cancel context.CancelFunc + startup bool // startup indicates whether this xdsBalancer is in startup stage. + inFallbackMonitor bool + + // xdsBalancer continuously monitor the channels below, and will handle events from them in sync. + grpcUpdate chan interface{} + xdsClientUpdate chan interface{} + timer *time.Timer + noSubConnAlert <-chan struct{} + + client *client // may change when passed a different service config + config *xdsConfig // may change when passed a different service config + xdsLB EdsBalancer + fallbackLB balancer.Balancer + fallbackInitData *addressUpdate // may change when HandleResolved address is called +} + +func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) { + // If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first + // xdsClient to get a response from the traffic director. + if x.startup { + x.startFallbackMonitoring() + } + + // Whenever service config gives a new traffic director name, we need to create an xds client to + // connect to it. However, previous xds client should not be closed until the new one successfully + // connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore, + // we let each new client to be responsible to close its immediate predecessor. In this way, + // xdsBalancer does not to implement complex synchronization to achieve the same purpose. + prevClient := x.client + // haveGotADS is true means, this xdsClient has got ADS response from director in the past, which + // means it can close previous client if it hasn't and it now can send lose contact signal for + // fallback monitoring. + var haveGotADS bool + + // set up callbacks for the xds client. + newADS := func(ctx context.Context, resp proto.Message) error { + if !haveGotADS { + if prevClient != nil { + prevClient.close() + } + haveGotADS = true + } + return x.newADSResponse(ctx, resp) + } + loseContact := func(ctx context.Context) { + // loseContact signal is only useful when the current xds client has received ADS response before, + // and has not been closed by later xds client. + if haveGotADS { + select { + case <-ctx.Done(): + return + default: + } + x.loseContact(ctx) + } + } + exitCleanup := func() { + // Each xds client is responsible to close its predecessor if there's one. There are two paths + // for a xds client to close its predecessor: + // 1. Once it receives its first ADS response. + // 2. It hasn't received its first ADS response yet, but its own successor has received ADS + // response (which triggers the exit of it). Therefore, it needs to close its predecessor if + // it has one. + // Here the exitCleanup is for the 2nd path. + if !haveGotADS && prevClient != nil { + prevClient.close() + } + } + x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, newADS, loseContact, exitCleanup) + go x.client.run() +} + +// run gets executed in a goroutine once xdsBalancer is created. It monitors updates from grpc, +// xdsClient and load balancer. It synchronizes the operations that happen inside xdsBalancer. It +// exits when xdsBalancer is closed. +func (x *xdsBalancer) run() { + for { + select { + case update := <-x.grpcUpdate: + x.handleGRPCUpdate(update) + case update := <-x.xdsClientUpdate: + x.handleXDSClientUpdate(update) + case <-x.timer.C: // x.timer.C will block if we are not in fallback monitoring stage. + x.switchFallback() + case <-x.noSubConnAlert: // x.noSubConnAlert will block if we are not in fallback monitoring stage. + x.switchFallback() + case <-x.ctx.Done(): + if x.client != nil { + x.client.close() + } + if x.xdsLB != nil { + x.xdsLB.Close() + } + if x.fallbackLB != nil { + x.fallbackLB.Close() + } + return + } + } +} + +func (x *xdsBalancer) handleGRPCUpdate(update interface{}) { + switch u := update.(type) { + case *addressUpdate: + if x.fallbackLB != nil { + x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err) + } + x.fallbackInitData = u + case *subConnStateUpdate: + if x.xdsLB != nil { + x.xdsLB.HandleSubConnStateChange(u.sc, u.state) + } + if x.fallbackLB != nil { + x.fallbackLB.HandleSubConnStateChange(u.sc, u.state) + } + case *xdsConfig: + if x.config == nil { + // The first time we get config, we just need to start the xdsClient. + x.startNewXDSClient(u) + x.config = u + return + } + // With a different BalancerName, we need to create a new xdsClient. + // If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient. + // This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't. + if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) { + x.startNewXDSClient(u) + } + // We will update the xdsLB with the new child policy, if we got a different one and it's not nil. + // The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time. + if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil { + x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config) + } + if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil { + x.fallbackLB.Close() + x.startFallBackBalancer(u) + } + x.config = u + default: + // unreachable path + panic("wrong update type") + } +} + +func (x *xdsBalancer) handleXDSClientUpdate(update interface{}) { + switch u := update.(type) { + case *cdsResp: + select { + case <-u.ctx.Done(): + return + default: + } + x.cancelFallbackAndSwitchEDSBalancerIfNecessary() + // TODO: Get the optional xds record stale timeout from OutlierDetection message. If not exist, + // reset to 0. + // x.xdsStaleTimeout = u.OutlierDetection.TO_BE_DEFINED_AND_ADDED + x.xdsLB.HandleChildPolicy(u.resp.LbPolicy.String(), nil) + case *edsResp: + select { + case <-u.ctx.Done(): + return + default: + } + x.cancelFallbackAndSwitchEDSBalancerIfNecessary() + x.xdsLB.HandleEDSResponse(u.resp) + case *loseContact: + select { + case <-u.ctx.Done(): + return + default: + } + // if we are already doing fallback monitoring, then we ignore new loseContact signal. + if x.inFallbackMonitor { + return + } + x.inFallbackMonitor = true + x.startFallbackMonitoring() + default: + panic("unexpected xds client update type") + } +} + +type connStateMgr struct { + mu sync.Mutex + curState connectivity.State + notify chan struct{} +} + +func (c *connStateMgr) updateState(s connectivity.State) { + c.mu.Lock() + defer c.mu.Unlock() + c.curState = s + if s != connectivity.Ready && c.notify != nil { + close(c.notify) + c.notify = nil + } +} + +func (c *connStateMgr) notifyWhenNotReady() <-chan struct{} { + c.mu.Lock() + defer c.mu.Unlock() + if c.curState != connectivity.Ready { + ch := make(chan struct{}) + close(ch) + return ch + } + c.notify = make(chan struct{}) + return c.notify +} + +// xdsClientConn wraps around the balancer.ClientConn passed in from grpc. The wrapping is to add +// functionality to get notification when no subconn is in READY state. +// TODO: once we have the change that keeps both edsbalancer and fallback balancer alive at the same +// time, we need to make sure to synchronize updates from both entities on the ClientConn. +type xdsClientConn struct { + updateState func(s connectivity.State) + balancer.ClientConn +} + +func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + w.updateState(s) + w.ClientConn.UpdateBalancerState(s, p) +} + +type addressUpdate struct { + addrs []resolver.Address + err error +} + +type subConnStateUpdate struct { + sc balancer.SubConn + state connectivity.State +} + +func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + update := &subConnStateUpdate{ + sc: sc, + state: state, + } + select { + case x.grpcUpdate <- update: + case <-x.ctx.Done(): + } +} + +func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + update := &addressUpdate{ + addrs: addrs, + err: err, + } + select { + case x.grpcUpdate <- update: + case <-x.ctx.Done(): + } +} + +// TODO: once the API is merged, check whether we need to change the function name/signature here. +func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error { + var cfg xdsConfig + if err := json.Unmarshal(config, &cfg); err != nil { + return errors.New("unable to unmarshal balancer config into xds config") + } + + select { + case x.grpcUpdate <- &cfg: + case <-x.ctx.Done(): + } + return nil +} + +type cdsResp struct { + ctx context.Context + resp *xdspb.Cluster +} + +type edsResp struct { + ctx context.Context + resp *xdspb.ClusterLoadAssignment +} + +func (x *xdsBalancer) newADSResponse(ctx context.Context, resp proto.Message) error { + var update interface{} + switch u := resp.(type) { + case *xdspb.Cluster: + if u.GetName() != x.cc.Target() { + return fmt.Errorf("unmatched service name, got %s, want %s", u.GetName(), x.cc.Target()) + } + if u.GetType() != xdspb.Cluster_EDS { + return fmt.Errorf("unexpected service discovery type, got %v, want %v", u.GetType(), xdspb.Cluster_EDS) + } + update = &cdsResp{ctx: ctx, resp: u} + case *xdspb.ClusterLoadAssignment: + // nothing to check + update = &edsResp{ctx: ctx, resp: u} + default: + grpclog.Warningf("xdsBalancer: got a response that's neither CDS nor EDS, type = %T", u) + } + + select { + case x.xdsClientUpdate <- update: + case <-x.ctx.Done(): + case <-ctx.Done(): + } + + return nil +} + +type loseContact struct { + ctx context.Context +} + +func (x *xdsBalancer) loseContact(ctx context.Context) { + select { + case x.xdsClientUpdate <- &loseContact{ctx: ctx}: + case <-x.ctx.Done(): + case <-ctx.Done(): + } +} + +func (x *xdsBalancer) switchFallback() { + if x.xdsLB != nil { + x.xdsLB.Close() + x.xdsLB = nil + } + x.startFallBackBalancer(x.config) + x.cancelFallbackMonitoring() +} + +// x.cancelFallbackAndSwitchEDSBalancerIfNecessary() will be no-op if we have a working xds client. +// It will cancel fallback monitoring if we are in fallback monitoring stage. +// If there's no running edsBalancer currently, it will create one and initialize it. Also, it will +// shutdown the fallback balancer if there's one running. +func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() { + // xDS update will cancel fallback monitoring if we are in fallback monitoring stage. + x.cancelFallbackMonitoring() + + // xDS update will switch balancer back to edsBalancer if we are in fallback. + if x.xdsLB == nil { + if x.fallbackLB != nil { + x.fallbackLB.Close() + x.fallbackLB = nil + } + x.xdsLB = newEDSBalancer(x.cc).(EdsBalancer) + if x.config.ChildPolicy != nil { + x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config) + } + } +} + +func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) { + if c.FallBackPolicy == nil { + x.startFallBackBalancer(&xdsConfig{ + FallBackPolicy: &loadBalancingConfig{ + Name: "round_robin", + }, + }) + return + } + // builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified + // balancer is registered or not. + builder := balancer.Get(c.FallBackPolicy.Name) + x.fallbackLB = builder.Build(x.cc, x.buildOpts) + if x.fallbackInitData != nil { + // TODO: uncomment when HandleBalancerConfig API is merged. + //x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config) + x.fallbackLB.HandleResolvedAddrs(x.fallbackInitData.addrs, x.fallbackInitData.err) + } +} + +// There are three ways that could lead to fallback: +// 1. During startup (i.e. the first xds client is just created and attempts to contact the traffic +// director), fallback if it has not received any response from the director within the configured +// timeout. +// 2. After xds client loses contact with the remote, fallback if all connections to the backends are +// lost (i.e. not in state READY). +// 3. After xds client loses contact with the remote, fallback if the stale eds timeout has been +// configured through CDS and is timed out. +func (x *xdsBalancer) startFallbackMonitoring() { + if x.startup { + x.startup = false + x.timer.Reset(x.startupTimeout) + return + } + + x.noSubConnAlert = x.connStateMgr.notifyWhenNotReady() + if x.xdsStaleTimeout != nil { + if !x.timer.Stop() { + <-x.timer.C + } + x.timer.Reset(*x.xdsStaleTimeout) + } +} + +// There are two cases where fallback monitoring should be canceled: +// 1. xDS client returns a new ADS message. +// 2. fallback has been triggered. +func (x *xdsBalancer) cancelFallbackMonitoring() { + if !x.timer.Stop() { + select { + case <-x.timer.C: + // For cases where some fallback condition happens along with the timeout, but timeout loses + // the race, so we need to drain the x.timer.C. thus we don't trigger fallback again. + default: + // if the timer timeout leads us here, then there's no thing to drain from x.timer.C. + } + } + x.noSubConnAlert = nil + x.inFallbackMonitor = false +} + +func (x *xdsBalancer) Close() { + x.cancel() +} + +func createDrainedTimer() *time.Timer { + timer := time.NewTimer(0 * time.Millisecond) + // make sure initially the timer channel is blocking until reset. + if !timer.Stop() { + <-timer.C + } + return timer +} + +type xdsConfig struct { + BalancerName string + ChildPolicy *loadBalancingConfig + FallBackPolicy *loadBalancingConfig +} + +// When unmarshalling json to xdsConfig, we iterate through the childPolicy/fallbackPolicy lists +// and select the first LB policy which has been registered to be stored in the returned xdsConfig. +func (p *xdsConfig) UnmarshalJSON(data []byte) error { + var val map[string]json.RawMessage + if err := json.Unmarshal(data, &val); err != nil { + return err + } + for k, v := range val { + switch k { + case "BalancerName": + if err := json.Unmarshal(v, &p.BalancerName); err != nil { + return err + } + case "ChildPolicy": + var lbcfgs []*loadBalancingConfig + if err := json.Unmarshal(v, &lbcfgs); err != nil { + return err + } + for _, lbcfg := range lbcfgs { + if balancer.Get(lbcfg.Name) != nil { + p.ChildPolicy = lbcfg + break + } + } + case "FallbackPolicy": + var lbcfgs []*loadBalancingConfig + if err := json.Unmarshal(v, &lbcfgs); err != nil { + return err + } + for _, lbcfg := range lbcfgs { + if balancer.Get(lbcfg.Name) != nil { + p.FallBackPolicy = lbcfg + break + } + } + } + } + return nil +} + +func (p *xdsConfig) MarshalJSON() ([]byte, error) { + return nil, nil +} + +type loadBalancingConfig struct { + Name string + Config json.RawMessage +} + +func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) { + return nil, nil +} + +func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error { + var cfg map[string]json.RawMessage + if err := json.Unmarshal(data, &cfg); err != nil { + return err + } + for name, config := range cfg { + l.Name = name + l.Config = config + } + return nil +} diff --git a/balancer/xds/xds_client.go b/balancer/xds/xds_client.go new file mode 100644 index 00000000..d18d9033 --- /dev/null +++ b/balancer/xds/xds_client.go @@ -0,0 +1,264 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "sync" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/backoff" + "google.golang.org/grpc/internal/channelz" +) + +const ( + grpcHostname = "com.googleapis.trafficdirector.grpc_hostname" + cdsType = "type.googleapis.com/envoy.api.v2.Cluster" + edsType = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment" + endpointRequired = "endpoints_required" +) + +var ( + defaultBackoffConfig = backoff.Exponential{ + MaxDelay: 120 * time.Second, + } +) + +// client is responsible for connecting to the specified traffic director, passing the received +// ADS response from the traffic director, and sending notification when communication with the +// traffic director is lost. +type client struct { + ctx context.Context + cancel context.CancelFunc + cli xdsdiscoverypb.AggregatedDiscoveryServiceClient + opts balancer.BuildOptions + balancerName string // the traffic director name + serviceName string // the user dial target name + enableCDS bool + newADS func(ctx context.Context, resp proto.Message) error + loseContact func(ctx context.Context) + cleanup func() + backoff backoff.Strategy + + mu sync.Mutex + cc *grpc.ClientConn +} + +func (c *client) run() { + c.dial() + c.makeADSCall() +} + +func (c *client) close() { + c.cancel() + c.mu.Lock() + if c.cc != nil { + c.cc.Close() + } + c.mu.Unlock() + c.cleanup() +} + +func (c *client) dial() { + var dopts []grpc.DialOption + if creds := c.opts.DialCreds; creds != nil { + if err := creds.OverrideServerName(c.balancerName); err == nil { + dopts = append(dopts, grpc.WithTransportCredentials(creds)) + } else { + grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err) + dopts = append(dopts, grpc.WithInsecure()) + } + } else { + dopts = append(dopts, grpc.WithInsecure()) + } + if c.opts.Dialer != nil { + dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer)) + } + // Explicitly set pickfirst as the balancer. + dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName)) + if channelz.IsOn() { + dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID)) + } + + cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...) + // Since this is a non-blocking dial, so if it fails, it due to some serious error (not network + // related) error. + if err != nil { + grpclog.Fatalf("xds: failed to dial: %v", err) + } + c.mu.Lock() + select { + case <-c.ctx.Done(): + cc.Close() + default: + // only assign c.cc when xds client has not been closed, to prevent ClientConn leak. + c.cc = cc + } + c.mu.Unlock() +} + +func (c *client) newCDSRequest() *xdspb.DiscoveryRequest { + cdsReq := &xdspb.DiscoveryRequest{ + Node: &xdscorepb.Node{ + Metadata: &types.Struct{ + Fields: map[string]*types.Value{ + grpcHostname: { + Kind: &types.Value_StringValue{StringValue: c.serviceName}, + }, + }, + }, + }, + TypeUrl: cdsType, + } + return cdsReq +} + +func (c *client) newEDSRequest() *xdspb.DiscoveryRequest { + edsReq := &xdspb.DiscoveryRequest{ + Node: &xdscorepb.Node{ + Metadata: &types.Struct{ + Fields: map[string]*types.Value{ + endpointRequired: { + Kind: &types.Value_BoolValue{BoolValue: c.enableCDS}, + }, + }, + }, + }, + ResourceNames: []string{c.serviceName}, + TypeUrl: edsType, + } + return edsReq +} + +func (c *client) makeADSCall() { + c.cli = xdsdiscoverypb.NewAggregatedDiscoveryServiceClient(c.cc) + retryCount := 0 + var doRetry bool + + for { + select { + case <-c.ctx.Done(): + return + default: + } + + if doRetry { + backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount)) + select { + case <-backoffTimer.C: + case <-c.ctx.Done(): + backoffTimer.Stop() + return + } + retryCount++ + } + + firstRespReceived := c.adsCallAttempt() + if firstRespReceived { + retryCount = 0 + doRetry = false + } else { + doRetry = true + } + c.loseContact(c.ctx) + } +} + +func (c *client) adsCallAttempt() (firstRespReceived bool) { + firstRespReceived = false + ctx, cancel := context.WithCancel(c.ctx) + defer cancel() + st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true)) + if err != nil { + grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err) + return + } + if c.enableCDS { + if err := st.Send(c.newCDSRequest()); err != nil { + // current stream is broken, start a new one. + return + } + } + if err := st.Send(c.newEDSRequest()); err != nil { + // current stream is broken, start a new one. + return + } + expectCDS := c.enableCDS + for { + resp, err := st.Recv() + if err != nil { + // current stream is broken, start a new one. + return + } + firstRespReceived = true + resources := resp.GetResources() + if len(resources) < 1 { + grpclog.Warning("xds: ADS response contains 0 resource info.") + // start a new call as server misbehaves by sending a ADS response with 0 resource info. + return + } + if resp.GetTypeUrl() == cdsType && !c.enableCDS { + grpclog.Warning("xds: received CDS response in custom plugin mode.") + // start a new call as we receive CDS response when in EDS-only mode. + return + } + var adsResp types.DynamicAny + if err := types.UnmarshalAny(&resources[0], &adsResp); err != nil { + grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err) + return + } + switch adsResp.Message.(type) { + case *xdspb.Cluster: + expectCDS = false + case *xdspb.ClusterLoadAssignment: + if expectCDS { + grpclog.Warningf("xds: expecting CDS response, got EDS response instead.") + return + } + } + if err := c.newADS(c.ctx, adsResp.Message); err != nil { + grpclog.Warningf("xds: processing new ADS message failed due to %v.", err) + return + } + } +} +func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client { + c := &client{ + balancerName: balancerName, + serviceName: serviceName, + enableCDS: enableCDS, + opts: opts, + newADS: newADS, + loseContact: loseContact, + cleanup: exitCleanup, + backoff: defaultBackoffConfig, + } + + c.ctx, c.cancel = context.WithCancel(context.Background()) + + return c +} diff --git a/balancer/xds/xds_client_test.go b/balancer/xds/xds_client_test.go new file mode 100644 index 00000000..37dc8797 --- /dev/null +++ b/balancer/xds/xds_client_test.go @@ -0,0 +1,465 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "context" + "errors" + "io" + "net" + "testing" + "time" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + xdsendpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +var ( + testServiceName = "test/foo" + testCDSReq = &xdspb.DiscoveryRequest{ + Node: &xdscorepb.Node{ + Metadata: &types.Struct{ + Fields: map[string]*types.Value{ + grpcHostname: { + Kind: &types.Value_StringValue{StringValue: testServiceName}, + }, + }, + }, + }, + TypeUrl: cdsType, + } + testEDSReq = &xdspb.DiscoveryRequest{ + Node: &xdscorepb.Node{ + Metadata: &types.Struct{ + Fields: map[string]*types.Value{ + endpointRequired: { + Kind: &types.Value_BoolValue{BoolValue: true}, + }, + }, + }, + }, + ResourceNames: []string{testServiceName}, + TypeUrl: edsType, + } + testEDSReqWithoutEndpoints = &xdspb.DiscoveryRequest{ + Node: &xdscorepb.Node{ + Metadata: &types.Struct{ + Fields: map[string]*types.Value{ + endpointRequired: { + Kind: &types.Value_BoolValue{BoolValue: false}, + }, + }, + }, + }, + ResourceNames: []string{testServiceName}, + TypeUrl: edsType, + } + testCluster = &xdspb.Cluster{ + Name: testServiceName, + Type: xdspb.Cluster_EDS, + LbPolicy: xdspb.Cluster_ROUND_ROBIN, + } + marshaledCluster, _ = proto.Marshal(testCluster) + testCDSResp = &xdspb.DiscoveryResponse{ + Resources: []types.Any{ + { + TypeUrl: cdsType, + Value: marshaledCluster, + }, + }, + TypeUrl: cdsType, + } + testClusterLoadAssignment = &xdspb.ClusterLoadAssignment{ + ClusterName: testServiceName, + Endpoints: []xdsendpointpb.LocalityLbEndpoints{ + { + Locality: &xdscorepb.Locality{ + Region: "asia-east1", + Zone: "1", + SubZone: "sa", + }, + LbEndpoints: []xdsendpointpb.LbEndpoint{ + { + HostIdentifier: &xdsendpointpb.LbEndpoint_Endpoint{ + Endpoint: &xdsendpointpb.Endpoint{ + Address: &xdscorepb.Address{ + Address: &xdscorepb.Address_SocketAddress{ + SocketAddress: &xdscorepb.SocketAddress{ + Address: "1.1.1.1", + PortSpecifier: &xdscorepb.SocketAddress_PortValue{ + PortValue: 10001, + }, + ResolverName: "dns", + }, + }, + }, + HealthCheckConfig: nil, + }, + }, + Metadata: &xdscorepb.Metadata{ + FilterMetadata: map[string]*types.Struct{ + "xx.lb": { + Fields: map[string]*types.Value{ + "endpoint_name": { + Kind: &types.Value_StringValue{ + StringValue: "some.endpoint.name", + }, + }, + }, + }, + }, + }, + }, + }, + LoadBalancingWeight: &types.UInt32Value{ + Value: 1, + }, + Priority: 0, + }, + }, + } + marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment) + testEDSResp = &xdspb.DiscoveryResponse{ + Resources: []types.Any{ + { + TypeUrl: edsType, + Value: marshaledClusterLoadAssignment, + }, + }, + TypeUrl: edsType, + } + testClusterLoadAssignmentWithoutEndpoints = &xdspb.ClusterLoadAssignment{ + ClusterName: testServiceName, + Endpoints: []xdsendpointpb.LocalityLbEndpoints{ + { + Locality: &xdscorepb.Locality{ + SubZone: "sa", + }, + LoadBalancingWeight: &types.UInt32Value{ + Value: 128, + }, + Priority: 0, + }, + }, + Policy: nil, + } + marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints) + testEDSRespWithoutEndpoints = &xdspb.DiscoveryResponse{ + Resources: []types.Any{ + { + TypeUrl: edsType, + Value: marshaledClusterLoadAssignmentWithoutEndpoints, + }, + }, + TypeUrl: edsType, + } +) + +type testTrafficDirector struct { + reqChan chan *request + respChan chan *response +} + +type request struct { + req *xdspb.DiscoveryRequest + err error +} + +type response struct { + resp *xdspb.DiscoveryResponse + err error +} + +func (ttd *testTrafficDirector) StreamAggregatedResources(s xdsdiscoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { + for { + req, err := s.Recv() + if err != nil { + ttd.reqChan <- &request{ + req: nil, + err: err, + } + if err == io.EOF { + return nil + } + return err + } + ttd.reqChan <- &request{ + req: req, + err: nil, + } + if req.TypeUrl == edsType { + break + } + } + + for { + select { + case resp := <-ttd.respChan: + if resp.err != nil { + return resp.err + } + if err := s.Send(resp.resp); err != nil { + return err + } + case <-s.Context().Done(): + return s.Context().Err() + } + } +} + +func (ttd *testTrafficDirector) IncrementalAggregatedResources(xdsdiscoverypb.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error { + return status.Error(codes.Unimplemented, "") +} + +func (ttd *testTrafficDirector) sendResp(resp *response) { + ttd.respChan <- resp +} + +func (ttd *testTrafficDirector) getReq() *request { + return <-ttd.reqChan +} + +func newTestTrafficDirector() *testTrafficDirector { + return &testTrafficDirector{ + reqChan: make(chan *request, 10), + respChan: make(chan *response, 10), + } +} + +type testConfig struct { + doCDS bool + expectedRequests []*xdspb.DiscoveryRequest + responsesToSend []*xdspb.DiscoveryResponse + expectedADSResponses []proto.Message + adsErr error + svrErr error +} + +func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("listen failed due to: %v", err) + } + svr := grpc.NewServer() + td = newTestTrafficDirector() + xdsdiscoverypb.RegisterAggregatedDiscoveryServiceServer(svr, td) + go svr.Serve(lis) + return lis.Addr().String(), td, func() { + svr.Stop() + lis.Close() + } +} + +func (s) TestXdsClientResponseHandling(t *testing.T) { + for _, test := range []*testConfig{ + { + doCDS: true, + expectedRequests: []*xdspb.DiscoveryRequest{testCDSReq, testEDSReq}, + responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp, testEDSResp}, + expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment}, + }, + { + doCDS: false, + expectedRequests: []*xdspb.DiscoveryRequest{testEDSReqWithoutEndpoints}, + responsesToSend: []*xdspb.DiscoveryResponse{testEDSRespWithoutEndpoints}, + expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints}, + }, + } { + testXdsClientResponseHandling(t, test) + } +} + +func testXdsClientResponseHandling(t *testing.T, test *testConfig) { + addr, td, cleanup := setupServer(t) + defer cleanup() + adsChan := make(chan proto.Message, 10) + newADS := func(ctx context.Context, i proto.Message) error { + adsChan <- i + return nil + } + client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {}) + defer client.close() + go client.run() + + for _, expectedReq := range test.expectedRequests { + req := td.getReq() + if req.err != nil { + t.Fatalf("ads RPC failed with err: %v", req.err) + } + if !proto.Equal(req.req, expectedReq) { + t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq) + } + } + + for i, resp := range test.responsesToSend { + td.sendResp(&response{resp: resp}) + ads := <-adsChan + if !proto.Equal(ads, test.expectedADSResponses[i]) { + t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i]) + } + } +} + +func (s) TestXdsClientLoseContact(t *testing.T) { + for _, test := range []*testConfig{ + { + doCDS: true, + responsesToSend: []*xdspb.DiscoveryResponse{}, + }, + { + doCDS: false, + responsesToSend: []*xdspb.DiscoveryResponse{testEDSRespWithoutEndpoints}, + }, + } { + testXdsClientLoseContactRemoteClose(t, test) + } + + for _, test := range []*testConfig{ + { + doCDS: false, + responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode. + }, + { + doCDS: true, + responsesToSend: []*xdspb.DiscoveryResponse{{}}, // response with 0 resources is an error case. + }, + { + doCDS: true, + responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp}, + adsErr: errors.New("some ads parsing error from xdsBalancer"), + }, + } { + testXdsClientLoseContactADSRelatedErrorOccur(t, test) + } +} + +func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { + addr, td, cleanup := setupServer(t) + defer cleanup() + adsChan := make(chan proto.Message, 10) + newADS := func(ctx context.Context, i proto.Message) error { + adsChan <- i + return nil + } + contactChan := make(chan *loseContact, 10) + loseContactFunc := func(context.Context) { + contactChan <- &loseContact{} + } + client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) + defer client.close() + go client.run() + + // make sure server side get the request (i.e stream created successfully on client side) + td.getReq() + + for _, resp := range test.responsesToSend { + td.sendResp(&response{resp: resp}) + // make sure client side receives it + <-adsChan + } + cleanup() + + select { + case <-contactChan: + case <-time.After(2 * time.Second): + t.Fatal("time out when expecting lost contact signal") + } +} + +func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) { + addr, td, cleanup := setupServer(t) + defer cleanup() + + adsChan := make(chan proto.Message, 10) + newADS := func(ctx context.Context, i proto.Message) error { + adsChan <- i + return test.adsErr + } + contactChan := make(chan *loseContact, 10) + loseContactFunc := func(context.Context) { + contactChan <- &loseContact{} + } + client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) + defer client.close() + go client.run() + + // make sure server side get the request (i.e stream created successfully on client side) + td.getReq() + + for _, resp := range test.responsesToSend { + td.sendResp(&response{resp: resp}) + } + + select { + case <-contactChan: + case <-time.After(2 * time.Second): + t.Fatal("time out when expecting lost contact signal") + } +} + +func (s) TestXdsClientExponentialRetry(t *testing.T) { + cfg := &testConfig{ + svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"), + } + addr, td, cleanup := setupServer(t) + defer cleanup() + + adsChan := make(chan proto.Message, 10) + newADS := func(ctx context.Context, i proto.Message) error { + adsChan <- i + return nil + } + contactChan := make(chan *loseContact, 10) + loseContactFunc := func(context.Context) { + contactChan <- &loseContact{} + } + client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {}) + defer client.close() + go client.run() + + var secondRetry, thirdRetry time.Time + for i := 0; i < 3; i++ { + // make sure server side get the request (i.e stream created successfully on client side) + td.getReq() + td.sendResp(&response{err: cfg.svrErr}) + + select { + case <-contactChan: + if i == 1 { + secondRetry = time.Now() + } + if i == 2 { + thirdRetry = time.Now() + } + case <-time.After(2 * time.Second): + t.Fatal("time out when expecting lost contact signal") + } + } + if thirdRetry.Sub(secondRetry) < 1*time.Second { + t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry)) + } +} diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go new file mode 100644 index 00000000..b5eb04a4 --- /dev/null +++ b/balancer/xds/xds_test.go @@ -0,0 +1,706 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds + +import ( + "encoding/json" + "reflect" + "sync" + "testing" + "time" + + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/leakcheck" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/xds/edsbalancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" +) + +var lbABuilder *balancerABuilder + +func init() { + lbABuilder = &balancerABuilder{} + balancer.Register(lbABuilder) + balancer.Register(&balancerBBuilder{}) +} + +type s struct{} + +func (s) Teardown(t *testing.T) { + leakcheck.Check(t) +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +type lbPolicy string + +const ( + fakeBalancerA lbPolicy = "fake_balancer_A" + fakeBalancerB lbPolicy = "fake_balancer_B" + fakeBalancerC lbPolicy = "fake_balancer_C" +) + +var ( + testBalancerNameFooBar = "foo.bar" + testBalancerConfigFooBar, _ = json.Marshal(&testBalancerConfig{ + BalancerName: testBalancerNameFooBar, + ChildPolicy: []lbPolicy{fakeBalancerA}, + FallbackPolicy: []lbPolicy{fakeBalancerA}, + }) + specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"} + specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"} + + // mu protects the access of latestFakeEdsBalancer + mu sync.Mutex + latestFakeEdsBalancer *fakeEDSBalancer +) + +type testBalancerConfig struct { + BalancerName string + ChildPolicy []lbPolicy + FallbackPolicy []lbPolicy +} + +func (l *lbPolicy) UnmarshalJSON(b []byte) error { + // no need to implement, not used. + return nil +} + +func (l lbPolicy) MarshalJSON() ([]byte, error) { + m := make(map[string]struct{}) + m[string(l)] = struct{}{} + return json.Marshal(m) +} + +type balancerABuilder struct { + mu sync.Mutex + lastBalancer *balancerA +} + +func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + b.mu.Lock() + b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)} + b.mu.Unlock() + return b.lastBalancer +} + +func (b *balancerABuilder) Name() string { + return string(fakeBalancerA) +} + +func (b *balancerABuilder) getLastBalancer() *balancerA { + b.mu.Lock() + defer b.mu.Unlock() + return b.lastBalancer +} + +func (b *balancerABuilder) clearLastBalancer() { + b.mu.Lock() + defer b.mu.Unlock() + b.lastBalancer = nil +} + +type balancerBBuilder struct{} + +func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &balancerB{cc: cc} +} + +func (*balancerBBuilder) Name() string { + return string(fakeBalancerB) +} + +type balancerA struct { + cc balancer.ClientConn + subconnStateChange chan *scStateChange +} + +func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + b.subconnStateChange <- &scStateChange{sc: sc, state: state} +} + +func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) { + _, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{}) +} + +func (b *balancerA) Close() {} + +type balancerB struct { + cc balancer.ClientConn +} + +func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + panic("implement me") +} + +func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) { + _, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{}) +} + +func (balancerB) Close() {} + +func newTestClientConn() *testClientConn { + return &testClientConn{ + newSubConns: make(chan []resolver.Address, 10), + } +} + +type testClientConn struct { + newSubConns chan []resolver.Address +} + +func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + t.newSubConns <- addrs + return nil, nil +} + +func (testClientConn) RemoveSubConn(balancer.SubConn) { +} + +func (testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { +} + +func (testClientConn) ResolveNow(resolver.ResolveNowOption) {} + +func (testClientConn) Target() string { + return testServiceName +} + +type scStateChange struct { + sc balancer.SubConn + state connectivity.State +} + +type fakeEDSBalancer struct { + cc balancer.ClientConn + edsChan chan *xdspb.ClusterLoadAssignment + childPolicy chan *loadBalancingConfig + fallbackPolicy chan *loadBalancingConfig + subconnStateChange chan *scStateChange +} + +func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { + f.subconnStateChange <- &scStateChange{sc: sc, state: state} +} + +func (f *fakeEDSBalancer) HandleResolvedAddrs([]resolver.Address, error) { + panic("implement me") +} + +func (f *fakeEDSBalancer) Close() { + mu.Lock() + defer mu.Unlock() + latestFakeEdsBalancer = nil +} + +func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) { + f.edsChan <- edsResp +} + +func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) { + f.childPolicy <- &loadBalancingConfig{ + Name: name, + Config: config, + } +} + +func newFakeEDSBalancer(cc balancer.ClientConn) interface{} { + lb := &fakeEDSBalancer{ + cc: cc, + edsChan: make(chan *xdspb.ClusterLoadAssignment, 10), + childPolicy: make(chan *loadBalancingConfig, 10), + fallbackPolicy: make(chan *loadBalancingConfig, 10), + subconnStateChange: make(chan *scStateChange, 10), + } + mu.Lock() + latestFakeEdsBalancer = lb + mu.Unlock() + return lb +} + +func getLatestEdsBalancer() *fakeEDSBalancer { + mu.Lock() + defer mu.Unlock() + return latestFakeEdsBalancer +} + +type fakeSubConn struct{} + +func (*fakeSubConn) UpdateAddresses([]resolver.Address) { + panic("implement me") +} + +func (*fakeSubConn) Connect() { + panic("implement me") +} + +func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) { + startupTimeout = 500 * time.Millisecond + defer func() { startupTimeout = defaultTimeout }() + + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err) + } + addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} + for i := 0; i < 3; i++ { + lb.HandleResolvedAddrs(addrs, nil) + select { + case nsc := <-cc.newSubConns: + if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { + t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout when geting new subconn result") + } + addrs = addrs[:2-i] + } +} + +func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { + startupTimeout = 500 * time.Millisecond + newEDSBalancer = newFakeEDSBalancer + defer func() { + startupTimeout = defaultTimeout + newEDSBalancer = edsbalancer.NewXDSBalancer + }() + + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err) + } + addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} + lb.HandleResolvedAddrs(addrs, nil) + + // verify fallback takes over + select { + case nsc := <-cc.newSubConns: + if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { + t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout when geting new subconn result") + } + + var cleanups []func() + defer func() { + for _, cleanup := range cleanups { + cleanup() + } + }() + // In the first iteration, an eds balancer takes over fallback balancer + // In the second iteration, a new xds client takes over previous one. + for i := 0; i < 2; i++ { + addr, td, cleanup := setupServer(t) + cleanups = append(cleanups, cleanup) + workingBalancerConfig, _ := json.Marshal(&testBalancerConfig{ + BalancerName: addr, + ChildPolicy: []lbPolicy{fakeBalancerA}, + FallbackPolicy: []lbPolicy{fakeBalancerA}, + }) + + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) + + var j int + for j = 0; j < 10; j++ { + if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations + select { + case gotEDS := <-edsLB.edsChan: + if !reflect.DeepEqual(gotEDS, testClusterLoadAssignmentWithoutEndpoints) { + t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignmentWithoutEndpoints) + } + case <-time.After(time.Second): + t.Fatal("haven't got EDS update after 1s") + } + break + } + time.Sleep(100 * time.Millisecond) + } + if j == 10 { + t.Fatal("edsBalancer instance has not been created or updated after 1s") + } + } +} + +// switch child policy, lb stays the same +// cds->eds or eds -> cds, restart xdsClient, lb stays the same +func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) { + newEDSBalancer = newFakeEDSBalancer + defer func() { + newEDSBalancer = edsbalancer.NewXDSBalancer + }() + + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + + var cleanups []func() + defer func() { + for _, cleanup := range cleanups { + cleanup() + } + }() + for _, test := range []struct { + cfg *testBalancerConfig + responseToSend *xdspb.DiscoveryResponse + expectedChildPolicy *loadBalancingConfig + }{ + { + cfg: &testBalancerConfig{ + ChildPolicy: []lbPolicy{fakeBalancerA}, + }, + responseToSend: testEDSRespWithoutEndpoints, + expectedChildPolicy: &loadBalancingConfig{ + Name: string(fakeBalancerA), + Config: json.RawMessage(`{}`), + }, + }, + { + cfg: &testBalancerConfig{ + ChildPolicy: []lbPolicy{fakeBalancerB}, + }, + expectedChildPolicy: &loadBalancingConfig{ + Name: string(fakeBalancerB), + Config: json.RawMessage(`{}`), + }, + }, + { + cfg: &testBalancerConfig{}, + responseToSend: testCDSResp, + expectedChildPolicy: &loadBalancingConfig{ + Name: "ROUND_ROBIN", + }, + }, + } { + addr, td, cleanup := setupServer(t) + cleanups = append(cleanups, cleanup) + test.cfg.BalancerName = addr + workingBalancerConfig, _ := json.Marshal(test.cfg) + + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + if test.responseToSend != nil { + td.sendResp(&response{resp: test.responseToSend}) + } + var i int + for i = 0; i < 10; i++ { + if edsLB := getLatestEdsBalancer(); edsLB != nil { + select { + case childPolicy := <-edsLB.childPolicy: + if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) { + t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy) + } + case <-time.After(time.Second): + t.Fatal("haven't got policy update after 1s") + } + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("edsBalancer instance has not been created or updated after 1s") + } + } +} + +// not in fallback mode, overwrite fallback info. +// in fallback mode, update config or switch balancer. +func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) { + newEDSBalancer = newFakeEDSBalancer + defer func() { + newEDSBalancer = edsbalancer.NewXDSBalancer + }() + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + + addr, td, cleanup := setupServer(t) + + cfg := &testBalancerConfig{ + BalancerName: addr, + ChildPolicy: []lbPolicy{fakeBalancerA}, + FallbackPolicy: []lbPolicy{fakeBalancerA}, + } + workingBalancerConfig, _ := json.Marshal(cfg) + + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + + cfg.FallbackPolicy = []lbPolicy{fakeBalancerB} + workingBalancerConfig, _ = json.Marshal(cfg) + + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + + td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) + + var i int + for i = 0; i < 10; i++ { + if edsLB := getLatestEdsBalancer(); edsLB != nil { + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") + } + + cleanup() + + addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}} + lb.HandleResolvedAddrs(addrs, nil) + + // verify fallback balancer B takes over + select { + case nsc := <-cc.newSubConns: + if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) { + t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB)) + } + case <-time.After(5 * time.Second): + t.Fatalf("timeout when geting new subconn result") + } + + cfg.FallbackPolicy = []lbPolicy{fakeBalancerA} + workingBalancerConfig, _ = json.Marshal(cfg) + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + + // verify fallback balancer A takes over + select { + case nsc := <-cc.newSubConns: + if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) { + t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA)) + } + case <-time.After(2 * time.Second): + t.Fatalf("timeout when geting new subconn result") + } +} + +func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) { + newEDSBalancer = newFakeEDSBalancer + defer func() { + newEDSBalancer = edsbalancer.NewXDSBalancer + }() + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + + addr, td, cleanup := setupServer(t) + defer cleanup() + cfg := &testBalancerConfig{ + BalancerName: addr, + ChildPolicy: []lbPolicy{fakeBalancerA}, + FallbackPolicy: []lbPolicy{fakeBalancerA}, + } + workingBalancerConfig, _ := json.Marshal(cfg) + + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + + td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) + + expectedScStateChange := &scStateChange{ + sc: &fakeSubConn{}, + state: connectivity.Ready, + } + + var i int + for i = 0; i < 10; i++ { + if edsLB := getLatestEdsBalancer(); edsLB != nil { + lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + select { + case scsc := <-edsLB.subconnStateChange: + if !reflect.DeepEqual(scsc, expectedScStateChange) { + t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) + } + case <-time.After(time.Second): + t.Fatal("haven't got subconn state change after 1s") + } + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") + } + + // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record + // to make sure there's a new one created and get the pointer to it. + lbABuilder.clearLastBalancer() + cleanup() + + // switch to fallback + // fallback balancer A takes over + for i = 0; i < 10; i++ { + if fblb := lbABuilder.getLastBalancer(); fblb != nil { + lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + select { + case scsc := <-fblb.subconnStateChange: + if !reflect.DeepEqual(scsc, expectedScStateChange) { + t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) + } + case <-time.After(time.Second): + t.Fatal("haven't got subconn state change after 1s") + } + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("balancerA instance has not been created after 1s") + } +} + +func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) { + newEDSBalancer = newFakeEDSBalancer + defer func() { + newEDSBalancer = edsbalancer.NewXDSBalancer + }() + builder := balancer.Get("xds") + cc := newTestClientConn() + lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer) + if !ok { + t.Fatalf("unable to type assert to *xdsBalancer") + } + defer lb.Close() + + addr, td, cleanup := setupServer(t) + defer cleanup() + cfg := &testBalancerConfig{ + BalancerName: addr, + ChildPolicy: []lbPolicy{fakeBalancerA}, + FallbackPolicy: []lbPolicy{fakeBalancerA}, + } + workingBalancerConfig, _ := json.Marshal(cfg) + + if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil { + t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err) + } + + td.sendResp(&response{resp: testEDSRespWithoutEndpoints}) + + expectedScStateChange := &scStateChange{ + sc: &fakeSubConn{}, + state: connectivity.Ready, + } + + var i int + for i = 0; i < 10; i++ { + if edsLB := getLatestEdsBalancer(); edsLB != nil { + lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + select { + case scsc := <-edsLB.subconnStateChange: + if !reflect.DeepEqual(scsc, expectedScStateChange) { + t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) + } + case <-time.After(time.Second): + t.Fatal("haven't got subconn state change after 1s") + } + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s") + } + + // lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record + // to make sure there's a new one created and get the pointer to it. + lbABuilder.clearLastBalancer() + cleanup() + + // switch to fallback + // fallback balancer A takes over + for i = 0; i < 10; i++ { + if fblb := lbABuilder.getLastBalancer(); fblb != nil { + lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state) + select { + case scsc := <-fblb.subconnStateChange: + if !reflect.DeepEqual(scsc, expectedScStateChange) { + t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange) + } + case <-time.After(time.Second): + t.Fatal("haven't got subconn state change after 1s") + } + break + } + time.Sleep(100 * time.Millisecond) + } + if i == 10 { + t.Fatal("balancerA instance has not been created after 1s") + } +} + +func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) { + tesCfg := &testBalancerConfig{ + BalancerName: "fake.foo.bar", + ChildPolicy: []lbPolicy{fakeBalancerC, fakeBalancerA, fakeBalancerB}, // selects fakeBalancerA + FallbackPolicy: []lbPolicy{fakeBalancerC, fakeBalancerB, fakeBalancerA}, // selects fakeBalancerB + } + js, _ := json.Marshal(tesCfg) + var xdsCfg xdsConfig + if err := json.Unmarshal(js, &xdsCfg); err != nil { + t.Fatal("unable to unmarshal balancer config into xds config") + } + wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)} + if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) { + t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy) + } + wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)} + if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) { + t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy) + } +} diff --git a/go.mod b/go.mod index 04188077..ca771c41 100644 --- a/go.mod +++ b/go.mod @@ -4,9 +4,13 @@ require ( cloud.google.com/go v0.26.0 // indirect github.com/BurntSushi/toml v0.3.1 // indirect github.com/client9/misspell v0.3.4 + github.com/envoyproxy/go-control-plane v0.6.7 + github.com/gogo/googleapis v1.1.0 // indirect + github.com/gogo/protobuf v1.2.0 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/mock v1.1.1 github.com/golang/protobuf v1.2.0 + github.com/lyft/protoc-gen-validate v0.0.13 // indirect golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be diff --git a/go.sum b/go.sum index a79939d9..a4dcc0b0 100644 --- a/go.sum +++ b/go.sum @@ -4,12 +4,20 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/envoyproxy/go-control-plane v0.6.7 h1:fyr1xdpt6v/HvZL70++avYT0HggJQD6/dHgNk0+uJjM= +github.com/envoyproxy/go-control-plane v0.6.7/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= +github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI= +github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= +github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/lyft/protoc-gen-validate v0.0.13 h1:KNt/RhmQTOLr7Aj8PsJ7mTronaFyx80mRTT9qF261dA= +github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=