From cba9cfed6435d24c8f654547997964c9d0fc1f04 Mon Sep 17 00:00:00 2001
From: lyuxuan <yuxuanli@google.com>
Date: Thu, 28 Feb 2019 15:12:40 -0800
Subject: [PATCH] xds Balancer: implementation (#2614)

---
 balancer/xds/edsbalancer/edsbalancer.go |  58 ++
 balancer/xds/xds.go                     | 586 +++++++++++++++++++-
 balancer/xds/xds_client.go              | 264 +++++++++
 balancer/xds/xds_client_test.go         | 465 ++++++++++++++++
 balancer/xds/xds_test.go                | 706 ++++++++++++++++++++++++
 go.mod                                  |   4 +
 go.sum                                  |   8 +
 7 files changed, 2089 insertions(+), 2 deletions(-)
 create mode 100644 balancer/xds/edsbalancer/edsbalancer.go
 create mode 100644 balancer/xds/xds_client.go
 create mode 100644 balancer/xds/xds_client_test.go
 create mode 100644 balancer/xds/xds_test.go

diff --git a/balancer/xds/edsbalancer/edsbalancer.go b/balancer/xds/edsbalancer/edsbalancer.go
new file mode 100644
index 00000000..6c1da917
--- /dev/null
+++ b/balancer/xds/edsbalancer/edsbalancer.go
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package edsbalancer
+
+// TODO: this file is used as a place holder. It should be deleted after edsbalancer implementation
+// is merged.
+
+import (
+	"encoding/json"
+
+	v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/connectivity"
+	"google.golang.org/grpc/resolver"
+)
+
+type dummyEdsBalancer struct{}
+
+func (d *dummyEdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
+	panic("implement me")
+}
+
+func (d *dummyEdsBalancer) HandleResolvedAddrs([]resolver.Address, error) {
+	panic("implement me")
+}
+
+func (d *dummyEdsBalancer) Close() {
+	panic("implement me")
+}
+
+func (d *dummyEdsBalancer) HandleEDSResponse(edsResp *v2.ClusterLoadAssignment) {
+	panic("implement me")
+}
+
+func (d *dummyEdsBalancer) HandleChildPolicy(name string, config json.RawMessage) {
+	panic("implement me")
+}
+
+// NewXDSBalancer creates an edsBalancer
+func NewXDSBalancer(cc balancer.ClientConn) interface{} {
+	return &dummyEdsBalancer{}
+}
diff --git a/balancer/xds/xds.go b/balancer/xds/xds.go
index 2bd77d48..53f4c6e5 100644
--- a/balancer/xds/xds.go
+++ b/balancer/xds/xds.go
@@ -1,4 +1,5 @@
 /*
+ *
  * Copyright 2019 gRPC authors.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,7 +13,588 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
+ *
  */
 
-// Package xds implements xds balancer.
-package xds // import "google.golang.org/grpc/balancer/xds"
+// Package xds implements a balancer that communicates with a remote balancer using the Envoy xDS
+// protocol.
+package xds
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"reflect"
+	"sync"
+	"time"
+
+	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	"github.com/gogo/protobuf/proto"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/balancer/xds/edsbalancer"
+	"google.golang.org/grpc/connectivity"
+	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/resolver"
+)
+
+const (
+	defaultTimeout = 10 * time.Second
+	xdsName        = "xds"
+)
+
+var (
+	// This field is for testing purpose.
+	// TODO: if later we make startupTimeout configurable through BuildOptions(maybe?), then we can remove
+	// this field and configure through BuildOptions instead.
+	startupTimeout = defaultTimeout
+	newEDSBalancer = edsbalancer.NewXDSBalancer
+)
+
+func init() {
+	balancer.Register(newXDSBalancerBuilder())
+}
+
+type xdsBalancerBuilder struct{}
+
+func newXDSBalancerBuilder() balancer.Builder {
+	return &xdsBalancerBuilder{}
+}
+
+func (b *xdsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+	ctx, cancel := context.WithCancel(context.Background())
+	x := &xdsBalancer{
+		ctx:             ctx,
+		cancel:          cancel,
+		buildOpts:       opts,
+		startupTimeout:  startupTimeout,
+		connStateMgr:    &connStateMgr{},
+		startup:         true,
+		grpcUpdate:      make(chan interface{}),
+		xdsClientUpdate: make(chan interface{}),
+		timer:           createDrainedTimer(), // initialized a timer that won't fire without reset
+	}
+	x.cc = &xdsClientConn{
+		updateState: x.connStateMgr.updateState,
+		ClientConn:  cc,
+	}
+	go x.run()
+	return x
+}
+
+func (b *xdsBalancerBuilder) Name() string {
+	return xdsName
+}
+
+// EdsBalancer defines the interface that edsBalancer must implement to communicate with xdsBalancer.
+type EdsBalancer interface {
+	balancer.Balancer
+	// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
+	HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
+	// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
+	HandleChildPolicy(name string, config json.RawMessage)
+}
+
+// xdsBalancer manages xdsClient and the actual balancer that does load balancing (either edsBalancer,
+// or fallback LB).
+type xdsBalancer struct {
+	cc                balancer.ClientConn // *xdsClientConn
+	buildOpts         balancer.BuildOptions
+	startupTimeout    time.Duration
+	xdsStaleTimeout   *time.Duration
+	connStateMgr      *connStateMgr
+	ctx               context.Context
+	cancel            context.CancelFunc
+	startup           bool // startup indicates whether this xdsBalancer is in startup stage.
+	inFallbackMonitor bool
+
+	// xdsBalancer continuously monitor the channels below, and will handle events from them in sync.
+	grpcUpdate      chan interface{}
+	xdsClientUpdate chan interface{}
+	timer           *time.Timer
+	noSubConnAlert  <-chan struct{}
+
+	client           *client    // may change when passed a different service config
+	config           *xdsConfig // may change when passed a different service config
+	xdsLB            EdsBalancer
+	fallbackLB       balancer.Balancer
+	fallbackInitData *addressUpdate // may change when HandleResolved address is called
+}
+
+func (x *xdsBalancer) startNewXDSClient(u *xdsConfig) {
+	// If the xdsBalancer is in startup stage, then we need to apply the startup timeout for the first
+	// xdsClient to get a response from the traffic director.
+	if x.startup {
+		x.startFallbackMonitoring()
+	}
+
+	// Whenever service config gives a new traffic director name, we need to create an xds client to
+	// connect to it. However, previous xds client should not be closed until the new one successfully
+	// connects to the traffic director (i.e. get an ADS response from the traffic director). Therefore,
+	// we let each new client to be responsible to close its immediate predecessor. In this way,
+	// xdsBalancer does not to implement complex synchronization to achieve the same purpose.
+	prevClient := x.client
+	// haveGotADS is true means, this xdsClient has got ADS response from director in the past, which
+	// means it can close previous client if it hasn't and it now can send lose contact signal for
+	// fallback monitoring.
+	var haveGotADS bool
+
+	// set up callbacks for the xds client.
+	newADS := func(ctx context.Context, resp proto.Message) error {
+		if !haveGotADS {
+			if prevClient != nil {
+				prevClient.close()
+			}
+			haveGotADS = true
+		}
+		return x.newADSResponse(ctx, resp)
+	}
+	loseContact := func(ctx context.Context) {
+		// loseContact signal is only useful when the current xds client has received ADS response before,
+		// and has not been closed by later xds client.
+		if haveGotADS {
+			select {
+			case <-ctx.Done():
+				return
+			default:
+			}
+			x.loseContact(ctx)
+		}
+	}
+	exitCleanup := func() {
+		// Each xds client is responsible to close its predecessor if there's one. There are two paths
+		// for a xds client to close its predecessor:
+		// 1. Once it receives its first ADS response.
+		// 2. It hasn't received its first ADS response yet, but its own successor has received ADS
+		//    response (which triggers the exit of it). Therefore, it needs to close its predecessor if
+		//    it has one.
+		// Here the exitCleanup is for the 2nd path.
+		if !haveGotADS && prevClient != nil {
+			prevClient.close()
+		}
+	}
+	x.client = newXDSClient(u.BalancerName, x.cc.Target(), u.ChildPolicy == nil, x.buildOpts, newADS, loseContact, exitCleanup)
+	go x.client.run()
+}
+
+// run gets executed in a goroutine once xdsBalancer is created. It monitors updates from grpc,
+// xdsClient and load balancer. It synchronizes the operations that happen inside xdsBalancer. It
+// exits when xdsBalancer is closed.
+func (x *xdsBalancer) run() {
+	for {
+		select {
+		case update := <-x.grpcUpdate:
+			x.handleGRPCUpdate(update)
+		case update := <-x.xdsClientUpdate:
+			x.handleXDSClientUpdate(update)
+		case <-x.timer.C: // x.timer.C will block if we are not in fallback monitoring stage.
+			x.switchFallback()
+		case <-x.noSubConnAlert: // x.noSubConnAlert will block if we are not in fallback monitoring stage.
+			x.switchFallback()
+		case <-x.ctx.Done():
+			if x.client != nil {
+				x.client.close()
+			}
+			if x.xdsLB != nil {
+				x.xdsLB.Close()
+			}
+			if x.fallbackLB != nil {
+				x.fallbackLB.Close()
+			}
+			return
+		}
+	}
+}
+
+func (x *xdsBalancer) handleGRPCUpdate(update interface{}) {
+	switch u := update.(type) {
+	case *addressUpdate:
+		if x.fallbackLB != nil {
+			x.fallbackLB.HandleResolvedAddrs(u.addrs, u.err)
+		}
+		x.fallbackInitData = u
+	case *subConnStateUpdate:
+		if x.xdsLB != nil {
+			x.xdsLB.HandleSubConnStateChange(u.sc, u.state)
+		}
+		if x.fallbackLB != nil {
+			x.fallbackLB.HandleSubConnStateChange(u.sc, u.state)
+		}
+	case *xdsConfig:
+		if x.config == nil {
+			// The first time we get config, we just need to start the xdsClient.
+			x.startNewXDSClient(u)
+			x.config = u
+			return
+		}
+		// With a different BalancerName, we need to create a new xdsClient.
+		// If current or previous ChildPolicy is nil, then we also need to recreate a new xdsClient.
+		// This is because with nil ChildPolicy xdsClient will do CDS request, while non-nil won't.
+		if u.BalancerName != x.config.BalancerName || (u.ChildPolicy == nil) != (x.config.ChildPolicy == nil) {
+			x.startNewXDSClient(u)
+		}
+		// We will update the xdsLB with the new child policy, if we got a different one and it's not nil.
+		// The nil case will be handled when the CDS response gets processed, we will update xdsLB at that time.
+		if !reflect.DeepEqual(u.ChildPolicy, x.config.ChildPolicy) && u.ChildPolicy != nil && x.xdsLB != nil {
+			x.xdsLB.HandleChildPolicy(u.ChildPolicy.Name, u.ChildPolicy.Config)
+		}
+		if !reflect.DeepEqual(u.FallBackPolicy, x.config.FallBackPolicy) && x.fallbackLB != nil {
+			x.fallbackLB.Close()
+			x.startFallBackBalancer(u)
+		}
+		x.config = u
+	default:
+		// unreachable path
+		panic("wrong update type")
+	}
+}
+
+func (x *xdsBalancer) handleXDSClientUpdate(update interface{}) {
+	switch u := update.(type) {
+	case *cdsResp:
+		select {
+		case <-u.ctx.Done():
+			return
+		default:
+		}
+		x.cancelFallbackAndSwitchEDSBalancerIfNecessary()
+		// TODO: Get the optional xds record stale timeout from OutlierDetection message. If not exist,
+		// reset to 0.
+		// x.xdsStaleTimeout = u.OutlierDetection.TO_BE_DEFINED_AND_ADDED
+		x.xdsLB.HandleChildPolicy(u.resp.LbPolicy.String(), nil)
+	case *edsResp:
+		select {
+		case <-u.ctx.Done():
+			return
+		default:
+		}
+		x.cancelFallbackAndSwitchEDSBalancerIfNecessary()
+		x.xdsLB.HandleEDSResponse(u.resp)
+	case *loseContact:
+		select {
+		case <-u.ctx.Done():
+			return
+		default:
+		}
+		// if we are already doing fallback monitoring, then we ignore new loseContact signal.
+		if x.inFallbackMonitor {
+			return
+		}
+		x.inFallbackMonitor = true
+		x.startFallbackMonitoring()
+	default:
+		panic("unexpected xds client update type")
+	}
+}
+
+type connStateMgr struct {
+	mu       sync.Mutex
+	curState connectivity.State
+	notify   chan struct{}
+}
+
+func (c *connStateMgr) updateState(s connectivity.State) {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	c.curState = s
+	if s != connectivity.Ready && c.notify != nil {
+		close(c.notify)
+		c.notify = nil
+	}
+}
+
+func (c *connStateMgr) notifyWhenNotReady() <-chan struct{} {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.curState != connectivity.Ready {
+		ch := make(chan struct{})
+		close(ch)
+		return ch
+	}
+	c.notify = make(chan struct{})
+	return c.notify
+}
+
+// xdsClientConn wraps around the balancer.ClientConn passed in from grpc. The wrapping is to add
+// functionality to get notification when no subconn is in READY state.
+// TODO: once we have the change that keeps both edsbalancer and fallback balancer alive at the same
+// time, we need to make sure to synchronize updates from both entities on the ClientConn.
+type xdsClientConn struct {
+	updateState func(s connectivity.State)
+	balancer.ClientConn
+}
+
+func (w *xdsClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
+	w.updateState(s)
+	w.ClientConn.UpdateBalancerState(s, p)
+}
+
+type addressUpdate struct {
+	addrs []resolver.Address
+	err   error
+}
+
+type subConnStateUpdate struct {
+	sc    balancer.SubConn
+	state connectivity.State
+}
+
+func (x *xdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
+	update := &subConnStateUpdate{
+		sc:    sc,
+		state: state,
+	}
+	select {
+	case x.grpcUpdate <- update:
+	case <-x.ctx.Done():
+	}
+}
+
+func (x *xdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
+	update := &addressUpdate{
+		addrs: addrs,
+		err:   err,
+	}
+	select {
+	case x.grpcUpdate <- update:
+	case <-x.ctx.Done():
+	}
+}
+
+// TODO: once the API is merged, check whether we need to change the function name/signature here.
+func (x *xdsBalancer) HandleBalancerConfig(config json.RawMessage) error {
+	var cfg xdsConfig
+	if err := json.Unmarshal(config, &cfg); err != nil {
+		return errors.New("unable to unmarshal balancer config into xds config")
+	}
+
+	select {
+	case x.grpcUpdate <- &cfg:
+	case <-x.ctx.Done():
+	}
+	return nil
+}
+
+type cdsResp struct {
+	ctx  context.Context
+	resp *xdspb.Cluster
+}
+
+type edsResp struct {
+	ctx  context.Context
+	resp *xdspb.ClusterLoadAssignment
+}
+
+func (x *xdsBalancer) newADSResponse(ctx context.Context, resp proto.Message) error {
+	var update interface{}
+	switch u := resp.(type) {
+	case *xdspb.Cluster:
+		if u.GetName() != x.cc.Target() {
+			return fmt.Errorf("unmatched service name, got %s, want %s", u.GetName(), x.cc.Target())
+		}
+		if u.GetType() != xdspb.Cluster_EDS {
+			return fmt.Errorf("unexpected service discovery type, got %v, want %v", u.GetType(), xdspb.Cluster_EDS)
+		}
+		update = &cdsResp{ctx: ctx, resp: u}
+	case *xdspb.ClusterLoadAssignment:
+		// nothing to check
+		update = &edsResp{ctx: ctx, resp: u}
+	default:
+		grpclog.Warningf("xdsBalancer: got a response that's neither CDS nor EDS, type = %T", u)
+	}
+
+	select {
+	case x.xdsClientUpdate <- update:
+	case <-x.ctx.Done():
+	case <-ctx.Done():
+	}
+
+	return nil
+}
+
+type loseContact struct {
+	ctx context.Context
+}
+
+func (x *xdsBalancer) loseContact(ctx context.Context) {
+	select {
+	case x.xdsClientUpdate <- &loseContact{ctx: ctx}:
+	case <-x.ctx.Done():
+	case <-ctx.Done():
+	}
+}
+
+func (x *xdsBalancer) switchFallback() {
+	if x.xdsLB != nil {
+		x.xdsLB.Close()
+		x.xdsLB = nil
+	}
+	x.startFallBackBalancer(x.config)
+	x.cancelFallbackMonitoring()
+}
+
+// x.cancelFallbackAndSwitchEDSBalancerIfNecessary() will be no-op if we have a working xds client.
+// It will cancel fallback monitoring if we are in fallback monitoring stage.
+// If there's no running edsBalancer currently, it will create one and initialize it. Also, it will
+// shutdown the fallback balancer if there's one running.
+func (x *xdsBalancer) cancelFallbackAndSwitchEDSBalancerIfNecessary() {
+	// xDS update will cancel fallback monitoring if we are in fallback monitoring stage.
+	x.cancelFallbackMonitoring()
+
+	// xDS update will switch balancer back to edsBalancer if we are in fallback.
+	if x.xdsLB == nil {
+		if x.fallbackLB != nil {
+			x.fallbackLB.Close()
+			x.fallbackLB = nil
+		}
+		x.xdsLB = newEDSBalancer(x.cc).(EdsBalancer)
+		if x.config.ChildPolicy != nil {
+			x.xdsLB.HandleChildPolicy(x.config.ChildPolicy.Name, x.config.ChildPolicy.Config)
+		}
+	}
+}
+
+func (x *xdsBalancer) startFallBackBalancer(c *xdsConfig) {
+	if c.FallBackPolicy == nil {
+		x.startFallBackBalancer(&xdsConfig{
+			FallBackPolicy: &loadBalancingConfig{
+				Name: "round_robin",
+			},
+		})
+		return
+	}
+	// builder will always be non-nil, since when parse JSON into xdsConfig, we check whether the specified
+	// balancer is registered or not.
+	builder := balancer.Get(c.FallBackPolicy.Name)
+	x.fallbackLB = builder.Build(x.cc, x.buildOpts)
+	if x.fallbackInitData != nil {
+		// TODO: uncomment when HandleBalancerConfig API is merged.
+		//x.fallbackLB.HandleBalancerConfig(c.FallBackPolicy.Config)
+		x.fallbackLB.HandleResolvedAddrs(x.fallbackInitData.addrs, x.fallbackInitData.err)
+	}
+}
+
+// There are three ways that could lead to fallback:
+// 1. During startup (i.e. the first xds client is just created and attempts to contact the traffic
+//    director), fallback if it has not received any response from the director within the configured
+//    timeout.
+// 2. After xds client loses contact with the remote, fallback if all connections to the backends are
+//    lost (i.e. not in state READY).
+// 3. After xds client loses contact with the remote, fallback if the stale eds timeout has been
+//    configured through CDS and is timed out.
+func (x *xdsBalancer) startFallbackMonitoring() {
+	if x.startup {
+		x.startup = false
+		x.timer.Reset(x.startupTimeout)
+		return
+	}
+
+	x.noSubConnAlert = x.connStateMgr.notifyWhenNotReady()
+	if x.xdsStaleTimeout != nil {
+		if !x.timer.Stop() {
+			<-x.timer.C
+		}
+		x.timer.Reset(*x.xdsStaleTimeout)
+	}
+}
+
+// There are two cases where fallback monitoring should be canceled:
+// 1. xDS client returns a new ADS message.
+// 2. fallback has been triggered.
+func (x *xdsBalancer) cancelFallbackMonitoring() {
+	if !x.timer.Stop() {
+		select {
+		case <-x.timer.C:
+			// For cases where some fallback condition happens along with the timeout, but timeout loses
+			// the race, so we need to drain the x.timer.C. thus we don't trigger fallback again.
+		default:
+			// if the timer timeout leads us here, then there's no thing to drain from x.timer.C.
+		}
+	}
+	x.noSubConnAlert = nil
+	x.inFallbackMonitor = false
+}
+
+func (x *xdsBalancer) Close() {
+	x.cancel()
+}
+
+func createDrainedTimer() *time.Timer {
+	timer := time.NewTimer(0 * time.Millisecond)
+	// make sure initially the timer channel is blocking until reset.
+	if !timer.Stop() {
+		<-timer.C
+	}
+	return timer
+}
+
+type xdsConfig struct {
+	BalancerName   string
+	ChildPolicy    *loadBalancingConfig
+	FallBackPolicy *loadBalancingConfig
+}
+
+// When unmarshalling json to xdsConfig, we iterate through the childPolicy/fallbackPolicy lists
+// and select the first LB policy which has been registered to be stored in the returned xdsConfig.
+func (p *xdsConfig) UnmarshalJSON(data []byte) error {
+	var val map[string]json.RawMessage
+	if err := json.Unmarshal(data, &val); err != nil {
+		return err
+	}
+	for k, v := range val {
+		switch k {
+		case "BalancerName":
+			if err := json.Unmarshal(v, &p.BalancerName); err != nil {
+				return err
+			}
+		case "ChildPolicy":
+			var lbcfgs []*loadBalancingConfig
+			if err := json.Unmarshal(v, &lbcfgs); err != nil {
+				return err
+			}
+			for _, lbcfg := range lbcfgs {
+				if balancer.Get(lbcfg.Name) != nil {
+					p.ChildPolicy = lbcfg
+					break
+				}
+			}
+		case "FallbackPolicy":
+			var lbcfgs []*loadBalancingConfig
+			if err := json.Unmarshal(v, &lbcfgs); err != nil {
+				return err
+			}
+			for _, lbcfg := range lbcfgs {
+				if balancer.Get(lbcfg.Name) != nil {
+					p.FallBackPolicy = lbcfg
+					break
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func (p *xdsConfig) MarshalJSON() ([]byte, error) {
+	return nil, nil
+}
+
+type loadBalancingConfig struct {
+	Name   string
+	Config json.RawMessage
+}
+
+func (l *loadBalancingConfig) MarshalJSON() ([]byte, error) {
+	return nil, nil
+}
+
+func (l *loadBalancingConfig) UnmarshalJSON(data []byte) error {
+	var cfg map[string]json.RawMessage
+	if err := json.Unmarshal(data, &cfg); err != nil {
+		return err
+	}
+	for name, config := range cfg {
+		l.Name = name
+		l.Config = config
+	}
+	return nil
+}
diff --git a/balancer/xds/xds_client.go b/balancer/xds/xds_client.go
new file mode 100644
index 00000000..d18d9033
--- /dev/null
+++ b/balancer/xds/xds_client.go
@@ -0,0 +1,264 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package xds
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+	xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
+	"github.com/gogo/protobuf/proto"
+	"github.com/gogo/protobuf/types"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/grpclog"
+	"google.golang.org/grpc/internal/backoff"
+	"google.golang.org/grpc/internal/channelz"
+)
+
+const (
+	grpcHostname     = "com.googleapis.trafficdirector.grpc_hostname"
+	cdsType          = "type.googleapis.com/envoy.api.v2.Cluster"
+	edsType          = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
+	endpointRequired = "endpoints_required"
+)
+
+var (
+	defaultBackoffConfig = backoff.Exponential{
+		MaxDelay: 120 * time.Second,
+	}
+)
+
+// client is responsible for connecting to the specified traffic director, passing the received
+// ADS response from the traffic director, and sending notification when communication with the
+// traffic director is lost.
+type client struct {
+	ctx          context.Context
+	cancel       context.CancelFunc
+	cli          xdsdiscoverypb.AggregatedDiscoveryServiceClient
+	opts         balancer.BuildOptions
+	balancerName string // the traffic director name
+	serviceName  string // the user dial target name
+	enableCDS    bool
+	newADS       func(ctx context.Context, resp proto.Message) error
+	loseContact  func(ctx context.Context)
+	cleanup      func()
+	backoff      backoff.Strategy
+
+	mu sync.Mutex
+	cc *grpc.ClientConn
+}
+
+func (c *client) run() {
+	c.dial()
+	c.makeADSCall()
+}
+
+func (c *client) close() {
+	c.cancel()
+	c.mu.Lock()
+	if c.cc != nil {
+		c.cc.Close()
+	}
+	c.mu.Unlock()
+	c.cleanup()
+}
+
+func (c *client) dial() {
+	var dopts []grpc.DialOption
+	if creds := c.opts.DialCreds; creds != nil {
+		if err := creds.OverrideServerName(c.balancerName); err == nil {
+			dopts = append(dopts, grpc.WithTransportCredentials(creds))
+		} else {
+			grpclog.Warningf("xds: failed to override the server name in the credentials: %v, using Insecure", err)
+			dopts = append(dopts, grpc.WithInsecure())
+		}
+	} else {
+		dopts = append(dopts, grpc.WithInsecure())
+	}
+	if c.opts.Dialer != nil {
+		dopts = append(dopts, grpc.WithContextDialer(c.opts.Dialer))
+	}
+	// Explicitly set pickfirst as the balancer.
+	dopts = append(dopts, grpc.WithBalancerName(grpc.PickFirstBalancerName))
+	if channelz.IsOn() {
+		dopts = append(dopts, grpc.WithChannelzParentID(c.opts.ChannelzParentID))
+	}
+
+	cc, err := grpc.DialContext(c.ctx, c.balancerName, dopts...)
+	// Since this is a non-blocking dial, so if it fails, it due to some serious error (not network
+	// related) error.
+	if err != nil {
+		grpclog.Fatalf("xds: failed to dial: %v", err)
+	}
+	c.mu.Lock()
+	select {
+	case <-c.ctx.Done():
+		cc.Close()
+	default:
+		// only assign c.cc when xds client has not been closed, to prevent ClientConn leak.
+		c.cc = cc
+	}
+	c.mu.Unlock()
+}
+
+func (c *client) newCDSRequest() *xdspb.DiscoveryRequest {
+	cdsReq := &xdspb.DiscoveryRequest{
+		Node: &xdscorepb.Node{
+			Metadata: &types.Struct{
+				Fields: map[string]*types.Value{
+					grpcHostname: {
+						Kind: &types.Value_StringValue{StringValue: c.serviceName},
+					},
+				},
+			},
+		},
+		TypeUrl: cdsType,
+	}
+	return cdsReq
+}
+
+func (c *client) newEDSRequest() *xdspb.DiscoveryRequest {
+	edsReq := &xdspb.DiscoveryRequest{
+		Node: &xdscorepb.Node{
+			Metadata: &types.Struct{
+				Fields: map[string]*types.Value{
+					endpointRequired: {
+						Kind: &types.Value_BoolValue{BoolValue: c.enableCDS},
+					},
+				},
+			},
+		},
+		ResourceNames: []string{c.serviceName},
+		TypeUrl:       edsType,
+	}
+	return edsReq
+}
+
+func (c *client) makeADSCall() {
+	c.cli = xdsdiscoverypb.NewAggregatedDiscoveryServiceClient(c.cc)
+	retryCount := 0
+	var doRetry bool
+
+	for {
+		select {
+		case <-c.ctx.Done():
+			return
+		default:
+		}
+
+		if doRetry {
+			backoffTimer := time.NewTimer(c.backoff.Backoff(retryCount))
+			select {
+			case <-backoffTimer.C:
+			case <-c.ctx.Done():
+				backoffTimer.Stop()
+				return
+			}
+			retryCount++
+		}
+
+		firstRespReceived := c.adsCallAttempt()
+		if firstRespReceived {
+			retryCount = 0
+			doRetry = false
+		} else {
+			doRetry = true
+		}
+		c.loseContact(c.ctx)
+	}
+}
+
+func (c *client) adsCallAttempt() (firstRespReceived bool) {
+	firstRespReceived = false
+	ctx, cancel := context.WithCancel(c.ctx)
+	defer cancel()
+	st, err := c.cli.StreamAggregatedResources(ctx, grpc.WaitForReady(true))
+	if err != nil {
+		grpclog.Infof("xds: failed to initial ADS streaming RPC due to %v", err)
+		return
+	}
+	if c.enableCDS {
+		if err := st.Send(c.newCDSRequest()); err != nil {
+			// current stream is broken, start a new one.
+			return
+		}
+	}
+	if err := st.Send(c.newEDSRequest()); err != nil {
+		// current stream is broken, start a new one.
+		return
+	}
+	expectCDS := c.enableCDS
+	for {
+		resp, err := st.Recv()
+		if err != nil {
+			// current stream is broken, start a new one.
+			return
+		}
+		firstRespReceived = true
+		resources := resp.GetResources()
+		if len(resources) < 1 {
+			grpclog.Warning("xds: ADS response contains 0 resource info.")
+			// start a new call as server misbehaves by sending a ADS response with 0 resource info.
+			return
+		}
+		if resp.GetTypeUrl() == cdsType && !c.enableCDS {
+			grpclog.Warning("xds: received CDS response in custom plugin mode.")
+			// start a new call as we receive CDS response when in EDS-only mode.
+			return
+		}
+		var adsResp types.DynamicAny
+		if err := types.UnmarshalAny(&resources[0], &adsResp); err != nil {
+			grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
+			return
+		}
+		switch adsResp.Message.(type) {
+		case *xdspb.Cluster:
+			expectCDS = false
+		case *xdspb.ClusterLoadAssignment:
+			if expectCDS {
+				grpclog.Warningf("xds: expecting CDS response, got EDS response instead.")
+				return
+			}
+		}
+		if err := c.newADS(c.ctx, adsResp.Message); err != nil {
+			grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
+			return
+		}
+	}
+}
+func newXDSClient(balancerName string, serviceName string, enableCDS bool, opts balancer.BuildOptions, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
+	c := &client{
+		balancerName: balancerName,
+		serviceName:  serviceName,
+		enableCDS:    enableCDS,
+		opts:         opts,
+		newADS:       newADS,
+		loseContact:  loseContact,
+		cleanup:      exitCleanup,
+		backoff:      defaultBackoffConfig,
+	}
+
+	c.ctx, c.cancel = context.WithCancel(context.Background())
+
+	return c
+}
diff --git a/balancer/xds/xds_client_test.go b/balancer/xds/xds_client_test.go
new file mode 100644
index 00000000..37dc8797
--- /dev/null
+++ b/balancer/xds/xds_client_test.go
@@ -0,0 +1,465 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package xds
+
+import (
+	"context"
+	"errors"
+	"io"
+	"net"
+	"testing"
+	"time"
+
+	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	xdscorepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
+	xdsendpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
+	xdsdiscoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
+	"github.com/gogo/protobuf/proto"
+	"github.com/gogo/protobuf/types"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+var (
+	testServiceName = "test/foo"
+	testCDSReq      = &xdspb.DiscoveryRequest{
+		Node: &xdscorepb.Node{
+			Metadata: &types.Struct{
+				Fields: map[string]*types.Value{
+					grpcHostname: {
+						Kind: &types.Value_StringValue{StringValue: testServiceName},
+					},
+				},
+			},
+		},
+		TypeUrl: cdsType,
+	}
+	testEDSReq = &xdspb.DiscoveryRequest{
+		Node: &xdscorepb.Node{
+			Metadata: &types.Struct{
+				Fields: map[string]*types.Value{
+					endpointRequired: {
+						Kind: &types.Value_BoolValue{BoolValue: true},
+					},
+				},
+			},
+		},
+		ResourceNames: []string{testServiceName},
+		TypeUrl:       edsType,
+	}
+	testEDSReqWithoutEndpoints = &xdspb.DiscoveryRequest{
+		Node: &xdscorepb.Node{
+			Metadata: &types.Struct{
+				Fields: map[string]*types.Value{
+					endpointRequired: {
+						Kind: &types.Value_BoolValue{BoolValue: false},
+					},
+				},
+			},
+		},
+		ResourceNames: []string{testServiceName},
+		TypeUrl:       edsType,
+	}
+	testCluster = &xdspb.Cluster{
+		Name:     testServiceName,
+		Type:     xdspb.Cluster_EDS,
+		LbPolicy: xdspb.Cluster_ROUND_ROBIN,
+	}
+	marshaledCluster, _ = proto.Marshal(testCluster)
+	testCDSResp         = &xdspb.DiscoveryResponse{
+		Resources: []types.Any{
+			{
+				TypeUrl: cdsType,
+				Value:   marshaledCluster,
+			},
+		},
+		TypeUrl: cdsType,
+	}
+	testClusterLoadAssignment = &xdspb.ClusterLoadAssignment{
+		ClusterName: testServiceName,
+		Endpoints: []xdsendpointpb.LocalityLbEndpoints{
+			{
+				Locality: &xdscorepb.Locality{
+					Region:  "asia-east1",
+					Zone:    "1",
+					SubZone: "sa",
+				},
+				LbEndpoints: []xdsendpointpb.LbEndpoint{
+					{
+						HostIdentifier: &xdsendpointpb.LbEndpoint_Endpoint{
+							Endpoint: &xdsendpointpb.Endpoint{
+								Address: &xdscorepb.Address{
+									Address: &xdscorepb.Address_SocketAddress{
+										SocketAddress: &xdscorepb.SocketAddress{
+											Address: "1.1.1.1",
+											PortSpecifier: &xdscorepb.SocketAddress_PortValue{
+												PortValue: 10001,
+											},
+											ResolverName: "dns",
+										},
+									},
+								},
+								HealthCheckConfig: nil,
+							},
+						},
+						Metadata: &xdscorepb.Metadata{
+							FilterMetadata: map[string]*types.Struct{
+								"xx.lb": {
+									Fields: map[string]*types.Value{
+										"endpoint_name": {
+											Kind: &types.Value_StringValue{
+												StringValue: "some.endpoint.name",
+											},
+										},
+									},
+								},
+							},
+						},
+					},
+				},
+				LoadBalancingWeight: &types.UInt32Value{
+					Value: 1,
+				},
+				Priority: 0,
+			},
+		},
+	}
+	marshaledClusterLoadAssignment, _ = proto.Marshal(testClusterLoadAssignment)
+	testEDSResp                       = &xdspb.DiscoveryResponse{
+		Resources: []types.Any{
+			{
+				TypeUrl: edsType,
+				Value:   marshaledClusterLoadAssignment,
+			},
+		},
+		TypeUrl: edsType,
+	}
+	testClusterLoadAssignmentWithoutEndpoints = &xdspb.ClusterLoadAssignment{
+		ClusterName: testServiceName,
+		Endpoints: []xdsendpointpb.LocalityLbEndpoints{
+			{
+				Locality: &xdscorepb.Locality{
+					SubZone: "sa",
+				},
+				LoadBalancingWeight: &types.UInt32Value{
+					Value: 128,
+				},
+				Priority: 0,
+			},
+		},
+		Policy: nil,
+	}
+	marshaledClusterLoadAssignmentWithoutEndpoints, _ = proto.Marshal(testClusterLoadAssignmentWithoutEndpoints)
+	testEDSRespWithoutEndpoints                       = &xdspb.DiscoveryResponse{
+		Resources: []types.Any{
+			{
+				TypeUrl: edsType,
+				Value:   marshaledClusterLoadAssignmentWithoutEndpoints,
+			},
+		},
+		TypeUrl: edsType,
+	}
+)
+
+type testTrafficDirector struct {
+	reqChan  chan *request
+	respChan chan *response
+}
+
+type request struct {
+	req *xdspb.DiscoveryRequest
+	err error
+}
+
+type response struct {
+	resp *xdspb.DiscoveryResponse
+	err  error
+}
+
+func (ttd *testTrafficDirector) StreamAggregatedResources(s xdsdiscoverypb.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
+	for {
+		req, err := s.Recv()
+		if err != nil {
+			ttd.reqChan <- &request{
+				req: nil,
+				err: err,
+			}
+			if err == io.EOF {
+				return nil
+			}
+			return err
+		}
+		ttd.reqChan <- &request{
+			req: req,
+			err: nil,
+		}
+		if req.TypeUrl == edsType {
+			break
+		}
+	}
+
+	for {
+		select {
+		case resp := <-ttd.respChan:
+			if resp.err != nil {
+				return resp.err
+			}
+			if err := s.Send(resp.resp); err != nil {
+				return err
+			}
+		case <-s.Context().Done():
+			return s.Context().Err()
+		}
+	}
+}
+
+func (ttd *testTrafficDirector) IncrementalAggregatedResources(xdsdiscoverypb.AggregatedDiscoveryService_IncrementalAggregatedResourcesServer) error {
+	return status.Error(codes.Unimplemented, "")
+}
+
+func (ttd *testTrafficDirector) sendResp(resp *response) {
+	ttd.respChan <- resp
+}
+
+func (ttd *testTrafficDirector) getReq() *request {
+	return <-ttd.reqChan
+}
+
+func newTestTrafficDirector() *testTrafficDirector {
+	return &testTrafficDirector{
+		reqChan:  make(chan *request, 10),
+		respChan: make(chan *response, 10),
+	}
+}
+
+type testConfig struct {
+	doCDS                bool
+	expectedRequests     []*xdspb.DiscoveryRequest
+	responsesToSend      []*xdspb.DiscoveryResponse
+	expectedADSResponses []proto.Message
+	adsErr               error
+	svrErr               error
+}
+
+func setupServer(t *testing.T) (addr string, td *testTrafficDirector, cleanup func()) {
+	lis, err := net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatalf("listen failed due to: %v", err)
+	}
+	svr := grpc.NewServer()
+	td = newTestTrafficDirector()
+	xdsdiscoverypb.RegisterAggregatedDiscoveryServiceServer(svr, td)
+	go svr.Serve(lis)
+	return lis.Addr().String(), td, func() {
+		svr.Stop()
+		lis.Close()
+	}
+}
+
+func (s) TestXdsClientResponseHandling(t *testing.T) {
+	for _, test := range []*testConfig{
+		{
+			doCDS:                true,
+			expectedRequests:     []*xdspb.DiscoveryRequest{testCDSReq, testEDSReq},
+			responsesToSend:      []*xdspb.DiscoveryResponse{testCDSResp, testEDSResp},
+			expectedADSResponses: []proto.Message{testCluster, testClusterLoadAssignment},
+		},
+		{
+			doCDS:                false,
+			expectedRequests:     []*xdspb.DiscoveryRequest{testEDSReqWithoutEndpoints},
+			responsesToSend:      []*xdspb.DiscoveryResponse{testEDSRespWithoutEndpoints},
+			expectedADSResponses: []proto.Message{testClusterLoadAssignmentWithoutEndpoints},
+		},
+	} {
+		testXdsClientResponseHandling(t, test)
+	}
+}
+
+func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
+	addr, td, cleanup := setupServer(t)
+	defer cleanup()
+	adsChan := make(chan proto.Message, 10)
+	newADS := func(ctx context.Context, i proto.Message) error {
+		adsChan <- i
+		return nil
+	}
+	client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, func(context.Context) {}, func() {})
+	defer client.close()
+	go client.run()
+
+	for _, expectedReq := range test.expectedRequests {
+		req := td.getReq()
+		if req.err != nil {
+			t.Fatalf("ads RPC failed with err: %v", req.err)
+		}
+		if !proto.Equal(req.req, expectedReq) {
+			t.Fatalf("got ADS request %T %v, expected: %T %v", req.req, req.req, expectedReq, expectedReq)
+		}
+	}
+
+	for i, resp := range test.responsesToSend {
+		td.sendResp(&response{resp: resp})
+		ads := <-adsChan
+		if !proto.Equal(ads, test.expectedADSResponses[i]) {
+			t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i])
+		}
+	}
+}
+
+func (s) TestXdsClientLoseContact(t *testing.T) {
+	for _, test := range []*testConfig{
+		{
+			doCDS:           true,
+			responsesToSend: []*xdspb.DiscoveryResponse{},
+		},
+		{
+			doCDS:           false,
+			responsesToSend: []*xdspb.DiscoveryResponse{testEDSRespWithoutEndpoints},
+		},
+	} {
+		testXdsClientLoseContactRemoteClose(t, test)
+	}
+
+	for _, test := range []*testConfig{
+		{
+			doCDS:           false,
+			responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp}, // CDS response when in custom mode.
+		},
+		{
+			doCDS:           true,
+			responsesToSend: []*xdspb.DiscoveryResponse{{}}, // response with 0 resources is an error case.
+		},
+		{
+			doCDS:           true,
+			responsesToSend: []*xdspb.DiscoveryResponse{testCDSResp},
+			adsErr:          errors.New("some ads parsing error from xdsBalancer"),
+		},
+	} {
+		testXdsClientLoseContactADSRelatedErrorOccur(t, test)
+	}
+}
+
+func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
+	addr, td, cleanup := setupServer(t)
+	defer cleanup()
+	adsChan := make(chan proto.Message, 10)
+	newADS := func(ctx context.Context, i proto.Message) error {
+		adsChan <- i
+		return nil
+	}
+	contactChan := make(chan *loseContact, 10)
+	loseContactFunc := func(context.Context) {
+		contactChan <- &loseContact{}
+	}
+	client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
+	defer client.close()
+	go client.run()
+
+	// make sure server side get the request (i.e stream created successfully on client side)
+	td.getReq()
+
+	for _, resp := range test.responsesToSend {
+		td.sendResp(&response{resp: resp})
+		// make sure client side receives it
+		<-adsChan
+	}
+	cleanup()
+
+	select {
+	case <-contactChan:
+	case <-time.After(2 * time.Second):
+		t.Fatal("time out when expecting lost contact signal")
+	}
+}
+
+func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig) {
+	addr, td, cleanup := setupServer(t)
+	defer cleanup()
+
+	adsChan := make(chan proto.Message, 10)
+	newADS := func(ctx context.Context, i proto.Message) error {
+		adsChan <- i
+		return test.adsErr
+	}
+	contactChan := make(chan *loseContact, 10)
+	loseContactFunc := func(context.Context) {
+		contactChan <- &loseContact{}
+	}
+	client := newXDSClient(addr, testServiceName, test.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
+	defer client.close()
+	go client.run()
+
+	// make sure server side get the request (i.e stream created successfully on client side)
+	td.getReq()
+
+	for _, resp := range test.responsesToSend {
+		td.sendResp(&response{resp: resp})
+	}
+
+	select {
+	case <-contactChan:
+	case <-time.After(2 * time.Second):
+		t.Fatal("time out when expecting lost contact signal")
+	}
+}
+
+func (s) TestXdsClientExponentialRetry(t *testing.T) {
+	cfg := &testConfig{
+		svrErr: status.Errorf(codes.Aborted, "abort the stream to trigger retry"),
+	}
+	addr, td, cleanup := setupServer(t)
+	defer cleanup()
+
+	adsChan := make(chan proto.Message, 10)
+	newADS := func(ctx context.Context, i proto.Message) error {
+		adsChan <- i
+		return nil
+	}
+	contactChan := make(chan *loseContact, 10)
+	loseContactFunc := func(context.Context) {
+		contactChan <- &loseContact{}
+	}
+	client := newXDSClient(addr, testServiceName, cfg.doCDS, balancer.BuildOptions{}, newADS, loseContactFunc, func() {})
+	defer client.close()
+	go client.run()
+
+	var secondRetry, thirdRetry time.Time
+	for i := 0; i < 3; i++ {
+		// make sure server side get the request (i.e stream created successfully on client side)
+		td.getReq()
+		td.sendResp(&response{err: cfg.svrErr})
+
+		select {
+		case <-contactChan:
+			if i == 1 {
+				secondRetry = time.Now()
+			}
+			if i == 2 {
+				thirdRetry = time.Now()
+			}
+		case <-time.After(2 * time.Second):
+			t.Fatal("time out when expecting lost contact signal")
+		}
+	}
+	if thirdRetry.Sub(secondRetry) < 1*time.Second {
+		t.Fatalf("interval between second and third retry is %v, expected > 1s", thirdRetry.Sub(secondRetry))
+	}
+}
diff --git a/balancer/xds/xds_test.go b/balancer/xds/xds_test.go
new file mode 100644
index 00000000..b5eb04a4
--- /dev/null
+++ b/balancer/xds/xds_test.go
@@ -0,0 +1,706 @@
+/*
+ *
+ * Copyright 2019 gRPC authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package xds
+
+import (
+	"encoding/json"
+	"reflect"
+	"sync"
+	"testing"
+	"time"
+
+	"google.golang.org/grpc/internal/grpctest"
+	"google.golang.org/grpc/internal/leakcheck"
+
+	xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
+	"google.golang.org/grpc/balancer"
+	"google.golang.org/grpc/balancer/xds/edsbalancer"
+	"google.golang.org/grpc/connectivity"
+	"google.golang.org/grpc/resolver"
+)
+
+var lbABuilder *balancerABuilder
+
+func init() {
+	lbABuilder = &balancerABuilder{}
+	balancer.Register(lbABuilder)
+	balancer.Register(&balancerBBuilder{})
+}
+
+type s struct{}
+
+func (s) Teardown(t *testing.T) {
+	leakcheck.Check(t)
+}
+
+func Test(t *testing.T) {
+	grpctest.RunSubTests(t, s{})
+}
+
+type lbPolicy string
+
+const (
+	fakeBalancerA lbPolicy = "fake_balancer_A"
+	fakeBalancerB lbPolicy = "fake_balancer_B"
+	fakeBalancerC lbPolicy = "fake_balancer_C"
+)
+
+var (
+	testBalancerNameFooBar      = "foo.bar"
+	testBalancerConfigFooBar, _ = json.Marshal(&testBalancerConfig{
+		BalancerName:   testBalancerNameFooBar,
+		ChildPolicy:    []lbPolicy{fakeBalancerA},
+		FallbackPolicy: []lbPolicy{fakeBalancerA},
+	})
+	specialAddrForBalancerA = resolver.Address{Addr: "this.is.balancer.A"}
+	specialAddrForBalancerB = resolver.Address{Addr: "this.is.balancer.B"}
+
+	// mu protects the access of latestFakeEdsBalancer
+	mu                    sync.Mutex
+	latestFakeEdsBalancer *fakeEDSBalancer
+)
+
+type testBalancerConfig struct {
+	BalancerName   string
+	ChildPolicy    []lbPolicy
+	FallbackPolicy []lbPolicy
+}
+
+func (l *lbPolicy) UnmarshalJSON(b []byte) error {
+	// no need to implement, not used.
+	return nil
+}
+
+func (l lbPolicy) MarshalJSON() ([]byte, error) {
+	m := make(map[string]struct{})
+	m[string(l)] = struct{}{}
+	return json.Marshal(m)
+}
+
+type balancerABuilder struct {
+	mu           sync.Mutex
+	lastBalancer *balancerA
+}
+
+func (b *balancerABuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+	b.mu.Lock()
+	b.lastBalancer = &balancerA{cc: cc, subconnStateChange: make(chan *scStateChange, 10)}
+	b.mu.Unlock()
+	return b.lastBalancer
+}
+
+func (b *balancerABuilder) Name() string {
+	return string(fakeBalancerA)
+}
+
+func (b *balancerABuilder) getLastBalancer() *balancerA {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	return b.lastBalancer
+}
+
+func (b *balancerABuilder) clearLastBalancer() {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	b.lastBalancer = nil
+}
+
+type balancerBBuilder struct{}
+
+func (b *balancerBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
+	return &balancerB{cc: cc}
+}
+
+func (*balancerBBuilder) Name() string {
+	return string(fakeBalancerB)
+}
+
+type balancerA struct {
+	cc                 balancer.ClientConn
+	subconnStateChange chan *scStateChange
+}
+
+func (b *balancerA) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
+	b.subconnStateChange <- &scStateChange{sc: sc, state: state}
+}
+
+func (b *balancerA) HandleResolvedAddrs(addrs []resolver.Address, err error) {
+	_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerA), balancer.NewSubConnOptions{})
+}
+
+func (b *balancerA) Close() {}
+
+type balancerB struct {
+	cc balancer.ClientConn
+}
+
+func (balancerB) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
+	panic("implement me")
+}
+
+func (b *balancerB) HandleResolvedAddrs(addrs []resolver.Address, err error) {
+	_, _ = b.cc.NewSubConn(append(addrs, specialAddrForBalancerB), balancer.NewSubConnOptions{})
+}
+
+func (balancerB) Close() {}
+
+func newTestClientConn() *testClientConn {
+	return &testClientConn{
+		newSubConns: make(chan []resolver.Address, 10),
+	}
+}
+
+type testClientConn struct {
+	newSubConns chan []resolver.Address
+}
+
+func (t *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
+	t.newSubConns <- addrs
+	return nil, nil
+}
+
+func (testClientConn) RemoveSubConn(balancer.SubConn) {
+}
+
+func (testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
+}
+
+func (testClientConn) ResolveNow(resolver.ResolveNowOption) {}
+
+func (testClientConn) Target() string {
+	return testServiceName
+}
+
+type scStateChange struct {
+	sc    balancer.SubConn
+	state connectivity.State
+}
+
+type fakeEDSBalancer struct {
+	cc                 balancer.ClientConn
+	edsChan            chan *xdspb.ClusterLoadAssignment
+	childPolicy        chan *loadBalancingConfig
+	fallbackPolicy     chan *loadBalancingConfig
+	subconnStateChange chan *scStateChange
+}
+
+func (f *fakeEDSBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
+	f.subconnStateChange <- &scStateChange{sc: sc, state: state}
+}
+
+func (f *fakeEDSBalancer) HandleResolvedAddrs([]resolver.Address, error) {
+	panic("implement me")
+}
+
+func (f *fakeEDSBalancer) Close() {
+	mu.Lock()
+	defer mu.Unlock()
+	latestFakeEdsBalancer = nil
+}
+
+func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) {
+	f.edsChan <- edsResp
+}
+
+func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) {
+	f.childPolicy <- &loadBalancingConfig{
+		Name:   name,
+		Config: config,
+	}
+}
+
+func newFakeEDSBalancer(cc balancer.ClientConn) interface{} {
+	lb := &fakeEDSBalancer{
+		cc:                 cc,
+		edsChan:            make(chan *xdspb.ClusterLoadAssignment, 10),
+		childPolicy:        make(chan *loadBalancingConfig, 10),
+		fallbackPolicy:     make(chan *loadBalancingConfig, 10),
+		subconnStateChange: make(chan *scStateChange, 10),
+	}
+	mu.Lock()
+	latestFakeEdsBalancer = lb
+	mu.Unlock()
+	return lb
+}
+
+func getLatestEdsBalancer() *fakeEDSBalancer {
+	mu.Lock()
+	defer mu.Unlock()
+	return latestFakeEdsBalancer
+}
+
+type fakeSubConn struct{}
+
+func (*fakeSubConn) UpdateAddresses([]resolver.Address) {
+	panic("implement me")
+}
+
+func (*fakeSubConn) Connect() {
+	panic("implement me")
+}
+
+func (s) TestXdsBalanceHandleResolvedAddrs(t *testing.T) {
+	startupTimeout = 500 * time.Millisecond
+	defer func() { startupTimeout = defaultTimeout }()
+
+	builder := balancer.Get("xds")
+	cc := newTestClientConn()
+	lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
+	if !ok {
+		t.Fatalf("unable to type assert to *xdsBalancer")
+	}
+	defer lb.Close()
+	if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err)
+	}
+	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
+	for i := 0; i < 3; i++ {
+		lb.HandleResolvedAddrs(addrs, nil)
+		select {
+		case nsc := <-cc.newSubConns:
+			if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
+				t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
+			}
+		case <-time.After(2 * time.Second):
+			t.Fatalf("timeout when geting new subconn result")
+		}
+		addrs = addrs[:2-i]
+	}
+}
+
+func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
+	startupTimeout = 500 * time.Millisecond
+	newEDSBalancer = newFakeEDSBalancer
+	defer func() {
+		startupTimeout = defaultTimeout
+		newEDSBalancer = edsbalancer.NewXDSBalancer
+	}()
+
+	builder := balancer.Get("xds")
+	cc := newTestClientConn()
+	lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
+	if !ok {
+		t.Fatalf("unable to type assert to *xdsBalancer")
+	}
+	defer lb.Close()
+	if err := lb.HandleBalancerConfig(json.RawMessage(testBalancerConfigFooBar)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(testBalancerConfigFooBar), err)
+	}
+	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
+	lb.HandleResolvedAddrs(addrs, nil)
+
+	// verify fallback takes over
+	select {
+	case nsc := <-cc.newSubConns:
+		if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
+			t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
+		}
+	case <-time.After(2 * time.Second):
+		t.Fatalf("timeout when geting new subconn result")
+	}
+
+	var cleanups []func()
+	defer func() {
+		for _, cleanup := range cleanups {
+			cleanup()
+		}
+	}()
+	// In the first iteration, an eds balancer takes over fallback balancer
+	// In the second iteration, a new xds client takes over previous one.
+	for i := 0; i < 2; i++ {
+		addr, td, cleanup := setupServer(t)
+		cleanups = append(cleanups, cleanup)
+		workingBalancerConfig, _ := json.Marshal(&testBalancerConfig{
+			BalancerName:   addr,
+			ChildPolicy:    []lbPolicy{fakeBalancerA},
+			FallbackPolicy: []lbPolicy{fakeBalancerA},
+		})
+
+		if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+			t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+		}
+		td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
+
+		var j int
+		for j = 0; j < 10; j++ {
+			if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations
+				select {
+				case gotEDS := <-edsLB.edsChan:
+					if !reflect.DeepEqual(gotEDS, testClusterLoadAssignmentWithoutEndpoints) {
+						t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignmentWithoutEndpoints)
+					}
+				case <-time.After(time.Second):
+					t.Fatal("haven't got EDS update after 1s")
+				}
+				break
+			}
+			time.Sleep(100 * time.Millisecond)
+		}
+		if j == 10 {
+			t.Fatal("edsBalancer instance has not been created or updated after 1s")
+		}
+	}
+}
+
+// switch child policy, lb stays the same
+// cds->eds or eds -> cds, restart xdsClient, lb stays the same
+func (s) TestXdsBalanceHandleBalancerConfigChildPolicyUpdate(t *testing.T) {
+	newEDSBalancer = newFakeEDSBalancer
+	defer func() {
+		newEDSBalancer = edsbalancer.NewXDSBalancer
+	}()
+
+	builder := balancer.Get("xds")
+	cc := newTestClientConn()
+	lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
+	if !ok {
+		t.Fatalf("unable to type assert to *xdsBalancer")
+	}
+	defer lb.Close()
+
+	var cleanups []func()
+	defer func() {
+		for _, cleanup := range cleanups {
+			cleanup()
+		}
+	}()
+	for _, test := range []struct {
+		cfg                 *testBalancerConfig
+		responseToSend      *xdspb.DiscoveryResponse
+		expectedChildPolicy *loadBalancingConfig
+	}{
+		{
+			cfg: &testBalancerConfig{
+				ChildPolicy: []lbPolicy{fakeBalancerA},
+			},
+			responseToSend: testEDSRespWithoutEndpoints,
+			expectedChildPolicy: &loadBalancingConfig{
+				Name:   string(fakeBalancerA),
+				Config: json.RawMessage(`{}`),
+			},
+		},
+		{
+			cfg: &testBalancerConfig{
+				ChildPolicy: []lbPolicy{fakeBalancerB},
+			},
+			expectedChildPolicy: &loadBalancingConfig{
+				Name:   string(fakeBalancerB),
+				Config: json.RawMessage(`{}`),
+			},
+		},
+		{
+			cfg:            &testBalancerConfig{},
+			responseToSend: testCDSResp,
+			expectedChildPolicy: &loadBalancingConfig{
+				Name: "ROUND_ROBIN",
+			},
+		},
+	} {
+		addr, td, cleanup := setupServer(t)
+		cleanups = append(cleanups, cleanup)
+		test.cfg.BalancerName = addr
+		workingBalancerConfig, _ := json.Marshal(test.cfg)
+
+		if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+			t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+		}
+		if test.responseToSend != nil {
+			td.sendResp(&response{resp: test.responseToSend})
+		}
+		var i int
+		for i = 0; i < 10; i++ {
+			if edsLB := getLatestEdsBalancer(); edsLB != nil {
+				select {
+				case childPolicy := <-edsLB.childPolicy:
+					if !reflect.DeepEqual(childPolicy, test.expectedChildPolicy) {
+						t.Fatalf("got childPolicy %v, want %v", childPolicy, test.expectedChildPolicy)
+					}
+				case <-time.After(time.Second):
+					t.Fatal("haven't got policy update after 1s")
+				}
+				break
+			}
+			time.Sleep(100 * time.Millisecond)
+		}
+		if i == 10 {
+			t.Fatal("edsBalancer instance has not been created or updated after 1s")
+		}
+	}
+}
+
+// not in fallback mode, overwrite fallback info.
+// in fallback mode, update config or switch balancer.
+func (s) TestXdsBalanceHandleBalancerConfigFallbackUpdate(t *testing.T) {
+	newEDSBalancer = newFakeEDSBalancer
+	defer func() {
+		newEDSBalancer = edsbalancer.NewXDSBalancer
+	}()
+	builder := balancer.Get("xds")
+	cc := newTestClientConn()
+	lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
+	if !ok {
+		t.Fatalf("unable to type assert to *xdsBalancer")
+	}
+	defer lb.Close()
+
+	addr, td, cleanup := setupServer(t)
+
+	cfg := &testBalancerConfig{
+		BalancerName:   addr,
+		ChildPolicy:    []lbPolicy{fakeBalancerA},
+		FallbackPolicy: []lbPolicy{fakeBalancerA},
+	}
+	workingBalancerConfig, _ := json.Marshal(cfg)
+
+	if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+	}
+
+	cfg.FallbackPolicy = []lbPolicy{fakeBalancerB}
+	workingBalancerConfig, _ = json.Marshal(cfg)
+
+	if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+	}
+
+	td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
+
+	var i int
+	for i = 0; i < 10; i++ {
+		if edsLB := getLatestEdsBalancer(); edsLB != nil {
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+	if i == 10 {
+		t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
+	}
+
+	cleanup()
+
+	addrs := []resolver.Address{{Addr: "1.1.1.1:10001"}, {Addr: "2.2.2.2:10002"}, {Addr: "3.3.3.3:10003"}}
+	lb.HandleResolvedAddrs(addrs, nil)
+
+	// verify fallback balancer B takes over
+	select {
+	case nsc := <-cc.newSubConns:
+		if !reflect.DeepEqual(append(addrs, specialAddrForBalancerB), nsc) {
+			t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerB))
+		}
+	case <-time.After(5 * time.Second):
+		t.Fatalf("timeout when geting new subconn result")
+	}
+
+	cfg.FallbackPolicy = []lbPolicy{fakeBalancerA}
+	workingBalancerConfig, _ = json.Marshal(cfg)
+	if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+	}
+
+	// verify fallback balancer A takes over
+	select {
+	case nsc := <-cc.newSubConns:
+		if !reflect.DeepEqual(append(addrs, specialAddrForBalancerA), nsc) {
+			t.Fatalf("got new subconn address %v, want %v", nsc, append(addrs, specialAddrForBalancerA))
+		}
+	case <-time.After(2 * time.Second):
+		t.Fatalf("timeout when geting new subconn result")
+	}
+}
+
+func (s) TestXdsBalancerHandlerSubConnStateChange(t *testing.T) {
+	newEDSBalancer = newFakeEDSBalancer
+	defer func() {
+		newEDSBalancer = edsbalancer.NewXDSBalancer
+	}()
+	builder := balancer.Get("xds")
+	cc := newTestClientConn()
+	lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
+	if !ok {
+		t.Fatalf("unable to type assert to *xdsBalancer")
+	}
+	defer lb.Close()
+
+	addr, td, cleanup := setupServer(t)
+	defer cleanup()
+	cfg := &testBalancerConfig{
+		BalancerName:   addr,
+		ChildPolicy:    []lbPolicy{fakeBalancerA},
+		FallbackPolicy: []lbPolicy{fakeBalancerA},
+	}
+	workingBalancerConfig, _ := json.Marshal(cfg)
+
+	if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+	}
+
+	td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
+
+	expectedScStateChange := &scStateChange{
+		sc:    &fakeSubConn{},
+		state: connectivity.Ready,
+	}
+
+	var i int
+	for i = 0; i < 10; i++ {
+		if edsLB := getLatestEdsBalancer(); edsLB != nil {
+			lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
+			select {
+			case scsc := <-edsLB.subconnStateChange:
+				if !reflect.DeepEqual(scsc, expectedScStateChange) {
+					t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
+				}
+			case <-time.After(time.Second):
+				t.Fatal("haven't got subconn state change after 1s")
+			}
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+	if i == 10 {
+		t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
+	}
+
+	// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
+	// to make sure there's a new one created and get the pointer to it.
+	lbABuilder.clearLastBalancer()
+	cleanup()
+
+	// switch to fallback
+	// fallback balancer A takes over
+	for i = 0; i < 10; i++ {
+		if fblb := lbABuilder.getLastBalancer(); fblb != nil {
+			lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
+			select {
+			case scsc := <-fblb.subconnStateChange:
+				if !reflect.DeepEqual(scsc, expectedScStateChange) {
+					t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
+				}
+			case <-time.After(time.Second):
+				t.Fatal("haven't got subconn state change after 1s")
+			}
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+	if i == 10 {
+		t.Fatal("balancerA instance has not been created after 1s")
+	}
+}
+
+func (s) TestXdsBalancerFallbackSignalFromEdsBalancer(t *testing.T) {
+	newEDSBalancer = newFakeEDSBalancer
+	defer func() {
+		newEDSBalancer = edsbalancer.NewXDSBalancer
+	}()
+	builder := balancer.Get("xds")
+	cc := newTestClientConn()
+	lb, ok := builder.Build(cc, balancer.BuildOptions{}).(*xdsBalancer)
+	if !ok {
+		t.Fatalf("unable to type assert to *xdsBalancer")
+	}
+	defer lb.Close()
+
+	addr, td, cleanup := setupServer(t)
+	defer cleanup()
+	cfg := &testBalancerConfig{
+		BalancerName:   addr,
+		ChildPolicy:    []lbPolicy{fakeBalancerA},
+		FallbackPolicy: []lbPolicy{fakeBalancerA},
+	}
+	workingBalancerConfig, _ := json.Marshal(cfg)
+
+	if err := lb.HandleBalancerConfig(json.RawMessage(workingBalancerConfig)); err != nil {
+		t.Fatalf("failed to HandleBalancerConfig(%v), due to err: %v", string(workingBalancerConfig), err)
+	}
+
+	td.sendResp(&response{resp: testEDSRespWithoutEndpoints})
+
+	expectedScStateChange := &scStateChange{
+		sc:    &fakeSubConn{},
+		state: connectivity.Ready,
+	}
+
+	var i int
+	for i = 0; i < 10; i++ {
+		if edsLB := getLatestEdsBalancer(); edsLB != nil {
+			lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
+			select {
+			case scsc := <-edsLB.subconnStateChange:
+				if !reflect.DeepEqual(scsc, expectedScStateChange) {
+					t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
+				}
+			case <-time.After(time.Second):
+				t.Fatal("haven't got subconn state change after 1s")
+			}
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+	if i == 10 {
+		t.Fatal("edsBalancer instance has not been created and assigned to lb.xdsLB after 1s")
+	}
+
+	// lbAbuilder has a per binary record what's the last balanceA created. We need to clear the record
+	// to make sure there's a new one created and get the pointer to it.
+	lbABuilder.clearLastBalancer()
+	cleanup()
+
+	// switch to fallback
+	// fallback balancer A takes over
+	for i = 0; i < 10; i++ {
+		if fblb := lbABuilder.getLastBalancer(); fblb != nil {
+			lb.HandleSubConnStateChange(expectedScStateChange.sc, expectedScStateChange.state)
+			select {
+			case scsc := <-fblb.subconnStateChange:
+				if !reflect.DeepEqual(scsc, expectedScStateChange) {
+					t.Fatalf("got subconn state change %v, want %v", scsc, expectedScStateChange)
+				}
+			case <-time.After(time.Second):
+				t.Fatal("haven't got subconn state change after 1s")
+			}
+			break
+		}
+		time.Sleep(100 * time.Millisecond)
+	}
+	if i == 10 {
+		t.Fatal("balancerA instance has not been created after 1s")
+	}
+}
+
+func (s) TestXdsBalancerConfigParsingSelectingLBPolicy(t *testing.T) {
+	tesCfg := &testBalancerConfig{
+		BalancerName:   "fake.foo.bar",
+		ChildPolicy:    []lbPolicy{fakeBalancerC, fakeBalancerA, fakeBalancerB}, // selects fakeBalancerA
+		FallbackPolicy: []lbPolicy{fakeBalancerC, fakeBalancerB, fakeBalancerA}, // selects fakeBalancerB
+	}
+	js, _ := json.Marshal(tesCfg)
+	var xdsCfg xdsConfig
+	if err := json.Unmarshal(js, &xdsCfg); err != nil {
+		t.Fatal("unable to unmarshal balancer config into xds config")
+	}
+	wantChildPolicy := &loadBalancingConfig{Name: string(fakeBalancerA), Config: json.RawMessage(`{}`)}
+	if !reflect.DeepEqual(xdsCfg.ChildPolicy, wantChildPolicy) {
+		t.Fatalf("got child policy %v, want %v", xdsCfg.ChildPolicy, wantChildPolicy)
+	}
+	wantFallbackPolicy := &loadBalancingConfig{Name: string(fakeBalancerB), Config: json.RawMessage(`{}`)}
+	if !reflect.DeepEqual(xdsCfg.FallBackPolicy, wantFallbackPolicy) {
+		t.Fatalf("got fallback policy %v, want %v", xdsCfg.FallBackPolicy, wantFallbackPolicy)
+	}
+}
diff --git a/go.mod b/go.mod
index 04188077..ca771c41 100644
--- a/go.mod
+++ b/go.mod
@@ -4,9 +4,13 @@ require (
 	cloud.google.com/go v0.26.0 // indirect
 	github.com/BurntSushi/toml v0.3.1 // indirect
 	github.com/client9/misspell v0.3.4
+	github.com/envoyproxy/go-control-plane v0.6.7
+	github.com/gogo/googleapis v1.1.0 // indirect
+	github.com/gogo/protobuf v1.2.0
 	github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
 	github.com/golang/mock v1.1.1
 	github.com/golang/protobuf v1.2.0
+	github.com/lyft/protoc-gen-validate v0.0.13 // indirect
 	golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3
 	golang.org/x/net v0.0.0-20180826012351-8a410e7b638d
 	golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be
diff --git a/go.sum b/go.sum
index a79939d9..a4dcc0b0 100644
--- a/go.sum
+++ b/go.sum
@@ -4,12 +4,20 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
 github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
+github.com/envoyproxy/go-control-plane v0.6.7 h1:fyr1xdpt6v/HvZL70++avYT0HggJQD6/dHgNk0+uJjM=
+github.com/envoyproxy/go-control-plane v0.6.7/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
+github.com/gogo/googleapis v1.1.0 h1:kFkMAZBNAn4j7K0GiZr8cRYzejq68VbheufiV3YuyFI=
+github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
+github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI=
+github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
 github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
 github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8=
 github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
 github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
 github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/lyft/protoc-gen-validate v0.0.13 h1:KNt/RhmQTOLr7Aj8PsJ7mTronaFyx80mRTT9qF261dA=
+github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3 h1:x/bBzNauLQAlE3fLku/xy92Y8QwKX5HZymrMz2IiKFc=
 golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I=