xds: Minor refactoring, prep for CDS LB policy (#3211)

This commit is contained in:
Easwar Swaminathan
2019-11-26 11:11:54 -08:00
committed by GitHub
parent d8e3da36ac
commit 7e7050be6a
5 changed files with 53 additions and 42 deletions

View File

@ -57,8 +57,8 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
} }
var returnUpdate cdsUpdate var returnUpdate CDSUpdate
localCache := make(map[string]cdsUpdate) localCache := make(map[string]CDSUpdate)
for _, r := range resp.GetResources() { for _, r := range resp.GetResources() {
var resource ptypes.DynamicAny var resource ptypes.DynamicAny
if err := ptypes.UnmarshalAny(r, &resource); err != nil { if err := ptypes.UnmarshalAny(r, &resource); err != nil {
@ -75,8 +75,8 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
// If the Cluster message in the CDS response did not contain a // If the Cluster message in the CDS response did not contain a
// serviceName, we will just use the clusterName for EDS. // serviceName, we will just use the clusterName for EDS.
if update.serviceName == "" { if update.ServiceName == "" {
update.serviceName = cluster.GetName() update.ServiceName = cluster.GetName()
} }
localCache[cluster.GetName()] = update localCache[cluster.GetName()] = update
if cluster.GetName() == wi.target[0] { if cluster.GetName() == wi.target[0] {
@ -86,7 +86,7 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.cdsCache = localCache v2c.cdsCache = localCache
var err error var err error
if returnUpdate.serviceName == "" { if returnUpdate.ServiceName == "" {
err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp)
} }
wi.stopTimer() wi.stopTimer()
@ -94,8 +94,8 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
return nil return nil
} }
func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) { func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) {
emptyUpdate := cdsUpdate{serviceName: "", doLRS: false} emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false}
switch { switch {
case cluster.GetType() != xdspb.Cluster_EDS: case cluster.GetType() != xdspb.Cluster_EDS:
return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster)
@ -105,8 +105,8 @@ func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) {
return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster)
} }
return cdsUpdate{ return CDSUpdate{
serviceName: cluster.GetEdsClusterConfig().GetServiceName(), ServiceName: cluster.GetEdsClusterConfig().GetServiceName(),
doLRS: cluster.GetLrsServer().GetSelf() != nil, EnableLRS: cluster.GetLrsServer().GetSelf() != nil,
}, nil }, nil
} }

View File

@ -39,11 +39,11 @@ const (
serviceName2 = "bar-service" serviceName2 = "bar-service"
) )
func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate { func (v2c *v2Client) cloneCDSCacheForTesting() map[string]CDSUpdate {
v2c.mu.Lock() v2c.mu.Lock()
defer v2c.mu.Unlock() defer v2c.mu.Unlock()
cloneCache := make(map[string]cdsUpdate) cloneCache := make(map[string]CDSUpdate)
for k, v := range v2c.cdsCache { for k, v := range v2c.cdsCache {
cloneCache[k] = v cloneCache[k] = v
} }
@ -51,11 +51,11 @@ func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate {
} }
func TestValidateCluster(t *testing.T) { func TestValidateCluster(t *testing.T) {
emptyUpdate := cdsUpdate{serviceName: "", doLRS: false} emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false}
tests := []struct { tests := []struct {
name string name string
cluster *xdspb.Cluster cluster *xdspb.Cluster
wantUpdate cdsUpdate wantUpdate CDSUpdate
wantErr bool wantErr bool
}{ }{
{ {
@ -138,12 +138,12 @@ func TestValidateCluster(t *testing.T) {
}, },
LbPolicy: xdspb.Cluster_ROUND_ROBIN, LbPolicy: xdspb.Cluster_ROUND_ROBIN,
}, },
wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: false}, wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: false},
}, },
{ {
name: "happiest-case", name: "happiest-case",
cluster: goodCluster1, cluster: goodCluster1,
wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: true}, wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: true},
}, },
} }
@ -177,7 +177,7 @@ func TestCDSHandleResponse(t *testing.T) {
name string name string
cdsResponse *xdspb.DiscoveryResponse cdsResponse *xdspb.DiscoveryResponse
wantErr bool wantErr bool
wantUpdate *cdsUpdate wantUpdate *CDSUpdate
wantUpdateErr bool wantUpdateErr bool
}{ }{
// Badly marshaled CDS response. // Badly marshaled CDS response.
@ -201,7 +201,7 @@ func TestCDSHandleResponse(t *testing.T) {
name: "no-cluster", name: "no-cluster",
cdsResponse: &xdspb.DiscoveryResponse{}, cdsResponse: &xdspb.DiscoveryResponse{},
wantErr: false, wantErr: false,
wantUpdate: &cdsUpdate{}, wantUpdate: &CDSUpdate{},
wantUpdateErr: true, wantUpdateErr: true,
}, },
// Response contains one good cluster we are not interested in. // Response contains one good cluster we are not interested in.
@ -209,7 +209,7 @@ func TestCDSHandleResponse(t *testing.T) {
name: "one-uninteresting-cluster", name: "one-uninteresting-cluster",
cdsResponse: goodCDSResponse2, cdsResponse: goodCDSResponse2,
wantErr: false, wantErr: false,
wantUpdate: &cdsUpdate{}, wantUpdate: &CDSUpdate{},
wantUpdateErr: true, wantUpdateErr: true,
}, },
// Response contains one cluster and it is good. // Response contains one cluster and it is good.
@ -217,18 +217,18 @@ func TestCDSHandleResponse(t *testing.T) {
name: "one-good-cluster", name: "one-good-cluster",
cdsResponse: goodCDSResponse1, cdsResponse: goodCDSResponse1,
wantErr: false, wantErr: false,
wantUpdate: &cdsUpdate{serviceName: serviceName1, doLRS: true}, wantUpdate: &CDSUpdate{ServiceName: serviceName1, EnableLRS: true},
wantUpdateErr: false, wantUpdateErr: false,
}, },
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
gotUpdateCh := make(chan cdsUpdate, 1) gotUpdateCh := make(chan CDSUpdate, 1)
gotUpdateErrCh := make(chan error, 1) gotUpdateErrCh := make(chan error, 1)
// Register a watcher, to trigger the v2Client to send an CDS request. // Register a watcher, to trigger the v2Client to send an CDS request.
cancelWatch := v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) { cancelWatch := v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) {
t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) t.Logf("in v2c.watchCDS callback, CDSUpdate: %+v, err: %v", u, err)
gotUpdateCh <- u gotUpdateCh <- u
gotUpdateErrCh <- err gotUpdateErrCh <- err
}) })
@ -295,7 +295,7 @@ type cdsTestOp struct {
// wantOpErr specfies whether the main operation should return an error. // wantOpErr specfies whether the main operation should return an error.
wantOpErr bool wantOpErr bool
// wantCDSCache is the expected rdsCache at the end of an operation. // wantCDSCache is the expected rdsCache at the end of an operation.
wantCDSCache map[string]cdsUpdate wantCDSCache map[string]CDSUpdate
// wantWatchCallback specifies if the watch callback should be invoked. // wantWatchCallback specifies if the watch callback should be invoked.
wantWatchCallback bool wantWatchCallback bool
} }
@ -322,8 +322,8 @@ func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) {
// Register a watcher if required, and use a channel to signal the // Register a watcher if required, and use a channel to signal the
// successful invocation of the callback. // successful invocation of the callback.
if cdsTestOp.target != "" { if cdsTestOp.target != "" {
v2c.watchCDS(cdsTestOp.target, func(u cdsUpdate, err error) { v2c.watchCDS(cdsTestOp.target, func(u CDSUpdate, err error) {
t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err) t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err)
callbackCh <- struct{}{} callbackCh <- struct{}{}
}) })
t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target) t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target)
@ -369,7 +369,7 @@ func TestCDSCaching(t *testing.T) {
{ {
target: clusterName1, target: clusterName1,
responseToSend: &fakexds.Response{Resp: goodCDSResponse1}, responseToSend: &fakexds.Response{Resp: goodCDSResponse1},
wantCDSCache: map[string]cdsUpdate{ wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true}, clusterName1: {serviceName1, true},
}, },
wantWatchCallback: true, wantWatchCallback: true,
@ -378,7 +378,7 @@ func TestCDSCaching(t *testing.T) {
// one received in the previous response). This should be cached. // one received in the previous response). This should be cached.
{ {
responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources}, responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources},
wantCDSCache: map[string]cdsUpdate{ wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true}, clusterName1: {serviceName1, true},
clusterName2: {serviceName2, false}, clusterName2: {serviceName2, false},
}, },
@ -389,7 +389,7 @@ func TestCDSCaching(t *testing.T) {
// callback to be invoked with the new serviceName. // callback to be invoked with the new serviceName.
{ {
target: clusterName2, target: clusterName2,
wantCDSCache: map[string]cdsUpdate{ wantCDSCache: map[string]CDSUpdate{
clusterName1: {serviceName1, true}, clusterName1: {serviceName1, true},
clusterName2: {serviceName2, false}, clusterName2: {serviceName2, false},
}, },
@ -399,7 +399,7 @@ func TestCDSCaching(t *testing.T) {
{ {
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}}, responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}},
wantOpErr: false, wantOpErr: false,
wantCDSCache: map[string]cdsUpdate{}, wantCDSCache: map[string]CDSUpdate{},
wantWatchCallback: true, wantWatchCallback: true,
}, },
} }
@ -438,10 +438,10 @@ func TestCDSWatchExpiryTimer(t *testing.T) {
t.Log("Started xds v2Client...") t.Log("Started xds v2Client...")
cdsCallbackCh := make(chan error, 1) cdsCallbackCh := make(chan error, 1)
v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) { v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) {
t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err) t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err)
if u.serviceName != "" { if u.ServiceName != "" {
cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.serviceName) cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName)
} }
if err == nil { if err == nil {
cdsCallbackCh <- errors.New("received nil error in cdsCallback") cdsCallbackCh <- errors.New("received nil error in cdsCallback")

View File

@ -154,6 +154,12 @@ func (c *Client) WatchService(serviceName string, callback func(ServiceUpdate, e
} }
} }
// WatchCluster uses CDS to discover information about the provided
// clusterName.
func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) {
return c.v2c.watchCDS(clusterName, cdsCb)
}
// WatchEDS watches the ghost. // WatchEDS watches the ghost.
func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) {
return c.v2c.watchEDS(clusterName, edsCb) return c.v2c.watchEDS(clusterName, edsCb)

View File

@ -86,10 +86,15 @@ type rdsUpdate struct {
} }
type rdsCallback func(rdsUpdate, error) type rdsCallback func(rdsUpdate, error)
type cdsUpdate struct { // CDSUpdate contains information from a received CDS response, which is of
serviceName string // interest to the registered CDS watcher.
doLRS bool type CDSUpdate struct {
// ServiceName is the service name corresponding to the clusterName which
// is being watched for through CDS.
ServiceName string
// EnableLRS indicates whether or not load should be reported through LRS.
EnableLRS bool
} }
type cdsCallback func(cdsUpdate, error) type cdsCallback func(CDSUpdate, error)
type edsCallback func(*EDSUpdate, error) type edsCallback func(*EDSUpdate, error)

View File

@ -74,7 +74,7 @@ type v2Client struct {
// unrequested resources. // unrequested resources.
// https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints // https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints
rdsCache map[string]string rdsCache map[string]string
// rdsCache maintains a mapping of {clusterName --> cdsUpdate} from // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from
// validated cluster configurations received in CDS responses. We cache all // validated cluster configurations received in CDS responses. We cache all
// valid cluster configurations, whether or not we are interested in them // valid cluster configurations, whether or not we are interested in them
// when we received them (because we could become interested in them in the // when we received them (because we could become interested in them in the
@ -83,7 +83,7 @@ type v2Client struct {
// resource_names field. As per the latest spec, the server should resend // resource_names field. As per the latest spec, the server should resend
// the response when the request changes, even if it had sent the same // the response when the request changes, even if it had sent the same
// resource earlier (when not asked for). Protected by the above mutex. // resource earlier (when not asked for). Protected by the above mutex.
cdsCache map[string]cdsUpdate cdsCache map[string]CDSUpdate
} }
// newV2Client creates a new v2Client initialized with the passed arguments. // newV2Client creates a new v2Client initialized with the passed arguments.
@ -95,7 +95,7 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int)
watchCh: buffer.NewUnbounded(), watchCh: buffer.NewUnbounded(),
watchMap: make(map[resourceType]*watchInfo), watchMap: make(map[resourceType]*watchInfo),
rdsCache: make(map[string]string), rdsCache: make(map[string]string),
cdsCache: make(map[string]cdsUpdate), cdsCache: make(map[string]CDSUpdate),
} }
v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background())
@ -416,7 +416,7 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
} }
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
v2c.mu.Lock() v2c.mu.Lock()
wi.callback.(cdsCallback)(cdsUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target))
v2c.mu.Unlock() v2c.mu.Unlock()
}) })
case edsResource: case edsResource: