xds: eds balancer handleEDSUpdate cleanup (#3639)

This commit is contained in:
Menghan Li
2020-05-27 09:55:12 -07:00
committed by GitHub
parent 5cd45224f8
commit e786c2dfb2
5 changed files with 40 additions and 65 deletions

View File

@ -23,7 +23,6 @@ import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
@ -37,8 +36,7 @@ import (
)
const (
defaultTimeout = 10 * time.Second
edsName = "eds_experimental"
edsName = "eds_experimental"
)
var (
@ -62,13 +60,13 @@ func (b *edsBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOp
cc: cc,
buildOpts: opts,
grpcUpdate: make(chan interface{}),
xdsClientUpdate: make(chan interface{}),
xdsClientUpdate: make(chan *edsUpdate),
childPolicyUpdate: buffer.NewUnbounded(),
}
loadStore := lrs.NewStore()
x.logger = grpclog.NewPrefixLogger(loggingPrefix(x))
x.edsImpl = newEDSBalancer(x.cc, x.enqueueChildBalancerState, loadStore, x.logger)
x.client = newXDSClientWrapper(x.handleEDSUpdate, x.loseContact, x.buildOpts, loadStore, x.logger)
x.client = newXDSClientWrapper(x.handleEDSUpdate, x.buildOpts, loadStore, x.logger)
x.logger.Infof("Created")
go x.run()
return x
@ -117,7 +115,7 @@ type edsBalancer struct {
// edsBalancer continuously monitor the channels below, and will handle events from them in sync.
grpcUpdate chan interface{}
xdsClientUpdate chan interface{}
xdsClientUpdate chan *edsUpdate
childPolicyUpdate *buffer.Unbounded
client *xdsclientWrapper // may change when passed a different service config
@ -140,12 +138,8 @@ func (x *edsBalancer) run() {
u := update.(*balancerStateWithPriority)
x.edsImpl.updateState(u.priority, u.s)
case <-x.ctx.Done():
if x.client != nil {
x.client.close()
}
if x.edsImpl != nil {
x.edsImpl.close()
}
x.client.close()
x.edsImpl.close()
return
}
}
@ -154,9 +148,7 @@ func (x *edsBalancer) run() {
func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
switch u := update.(type) {
case *subConnStateUpdate:
if x.edsImpl != nil {
x.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState)
}
x.edsImpl.handleSubConnStateChange(u.sc, u.state.ConnectivityState)
case *balancer.ClientConnState:
x.logger.Infof("Receive update from resolver, balancer config: %+v", u.BalancerConfig)
cfg, _ := u.BalancerConfig.(*EDSConfig)
@ -174,7 +166,7 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
// We will update the edsImpl with the new child policy, if we got a
// different one.
if x.edsImpl != nil && !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) {
if !cmp.Equal(cfg.ChildPolicy, x.config.ChildPolicy) {
if cfg.ChildPolicy != nil {
x.edsImpl.handleChildPolicy(cfg.ChildPolicy.Name, cfg.ChildPolicy.Config)
} else {
@ -189,17 +181,13 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) {
}
}
func (x *edsBalancer) handleXDSClientUpdate(update interface{}) {
switch u := update.(type) {
// TODO: this func should accept (xdsclient.EndpointsUpdate, error), and process
// the error, instead of having a separate loseContact signal.
case xdsclient.EndpointsUpdate:
x.edsImpl.handleEDSResponse(u)
case *loseContact:
// loseContact can be useful for going into fallback.
default:
panic("unexpected xds client update type")
func (x *edsBalancer) handleXDSClientUpdate(update *edsUpdate) {
if update.err != nil {
// TODO: handle errors from EDS callback. E.g. if CDS resource is
// removed, the EDS watch should be canceled.
return
}
x.edsImpl.handleEDSResponse(update.resp)
}
type subConnStateUpdate struct {
@ -233,24 +221,14 @@ func (x *edsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return nil
}
func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate) error {
// TODO: this function should take (resp, error), and send them together on
// the channel. There doesn't need to be a separate `loseContact` function.
select {
case x.xdsClientUpdate <- resp:
case <-x.ctx.Done():
}
return nil
type edsUpdate struct {
resp xdsclient.EndpointsUpdate
err error
}
type loseContact struct {
}
// TODO: delete loseContact when handleEDSUpdate takes (resp, error).
func (x *edsBalancer) loseContact() {
func (x *edsBalancer) handleEDSUpdate(resp xdsclient.EndpointsUpdate, err error) {
select {
case x.xdsClientUpdate <- &loseContact{}:
case x.xdsClientUpdate <- &edsUpdate{resp: resp, err: err}:
case <-x.ctx.Done():
}
}

View File

@ -412,7 +412,7 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) {
for i := 0; i < 5; i++ {
_, err := p0.Pick(balancer.PickInfo{})
if err != testutils.ErrTestConstPicker {
t.Fatalf("picker.Pick, got err %q, want err %q", err, testutils.ErrTestConstPicker)
t.Fatalf("picker.Pick, got err %+v, want err %+v", err, testutils.ErrTestConstPicker)
}
}

View File

@ -108,10 +108,12 @@ func (f *fakeEDSBalancer) handleChildPolicy(name string, config json.RawMessage)
f.childPolicy.Send(&loadBalancingConfig{Name: name, Config: config})
}
func (f *fakeEDSBalancer) close() {}
func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) {}
func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {}
func (f *fakeEDSBalancer) close() {}
func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error {
val, err := f.childPolicy.Receive()
if err != nil {

View File

@ -50,8 +50,7 @@ var (
type xdsclientWrapper struct {
logger *grpclog.PrefixLogger
newEDSUpdate func(xdsclient.EndpointsUpdate) error
loseContact func()
newEDSUpdate func(xdsclient.EndpointsUpdate, error)
bbo balancer.BuildOptions
loadStore lrs.Store
@ -78,11 +77,10 @@ type xdsclientWrapper struct {
//
// The given callbacks won't be called until the underlying xds_client is
// working and sends updates.
func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate) error, loseContact func(), bbo balancer.BuildOptions, loadStore lrs.Store, logger *grpclog.PrefixLogger) *xdsclientWrapper {
func newXDSClientWrapper(newEDSUpdate func(xdsclient.EndpointsUpdate, error), bbo balancer.BuildOptions, loadStore lrs.Store, logger *grpclog.PrefixLogger) *xdsclientWrapper {
return &xdsclientWrapper{
logger: logger,
newEDSUpdate: newEDSUpdate,
loseContact: loseContact,
bbo: bbo,
loadStore: loadStore,
}
@ -188,16 +186,8 @@ func (c *xdsclientWrapper) startEndpointsWatch(nameToWatch string) {
c.cancelEndpointsWatch()
}
cancelEDSWatch := c.xdsclient.WatchEndpoints(c.edsServiceName, func(update xdsclient.EndpointsUpdate, err error) {
if err != nil {
// TODO: this should trigger a call to `c.loseContact`, when the
// error indicates "lose contact".
c.logger.Warningf("Watch error from xds-client %p: %v", c.xdsclient, err)
return
}
c.logger.Infof("Watch update from xds-client %p, content: %+v", c.xdsclient, update)
if err := c.newEDSUpdate(update); err != nil {
c.logger.Warningf("xds: processing new EDS update failed due to %v.", err)
}
c.newEDSUpdate(update, err)
})
c.logger.Infof("Watch started on resource name %v with xds-client %p", c.edsServiceName, c.xdsclient)
c.cancelEndpointsWatch = func() {

View File

@ -64,7 +64,7 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
}
defer cleanup()
cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
defer cw.close()
for _, test := range []struct {
@ -159,12 +159,11 @@ func (s) TestClientWrapperWatchEDS(t *testing.T) {
// edsBalancer with the received error.
func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
edsRespChan := testutils.NewChannel()
newEDS := func(update xdsclient.EndpointsUpdate) error {
edsRespChan.Send(update)
return nil
newEDS := func(update xdsclient.EndpointsUpdate, err error) {
edsRespChan.Send(&edsUpdate{resp: update, err: err})
}
cw := newXDSClientWrapper(newEDS, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
cw := newXDSClientWrapper(newEDS, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
defer cw.close()
xdsC := fakeclient.NewClient()
@ -176,14 +175,20 @@ func (s) TestClientWrapperHandleUpdateError(t *testing.T) {
if gotCluster != testEDSClusterName {
t.Fatalf("xdsClient.WatchEndpoints() called with cluster: %v, want %v", gotCluster, testEDSClusterName)
}
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, errors.New("EDS watch callback error"))
watchErr := errors.New("EDS watch callback error")
xdsC.InvokeWatchEDSCallback(xdsclient.EndpointsUpdate{}, watchErr)
// The callback is called with an error, expect no update from edsRespChan.
//
// TODO: check for loseContact() when errors indicating "lose contact" are
// handled correctly.
if gotUpdate, gotErr := edsRespChan.Receive(); gotErr != testutils.ErrRecvTimeout {
t.Fatalf("edsBalancer got edsUpdate {%+v, %v}, when none was expected", gotUpdate, gotErr)
gotUpdate, err := edsRespChan.Receive()
if err != nil {
t.Fatalf("edsBalancer failed to get edsUpdate %v", err)
}
update := gotUpdate.(*edsUpdate)
if !cmp.Equal(update.resp, (xdsclient.EndpointsUpdate{})) || update.err != watchErr {
t.Fatalf("want update {nil, %v}, got %+v", watchErr, update)
}
}
@ -198,7 +203,7 @@ func (s) TestClientWrapperGetsXDSClientInAttributes(t *testing.T) {
}
defer func() { xdsclientNew = oldxdsclientNew }()
cw := newXDSClientWrapper(nil, nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
cw := newXDSClientWrapper(nil, balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}, nil, nil)
defer cw.close()
// Verify that the eds watch is registered for the expected resource name.