diff --git a/xds/internal/client/cds.go b/xds/internal/client/cds.go index ae40e8ec..2449f3df 100644 --- a/xds/internal/client/cds.go +++ b/xds/internal/client/cds.go @@ -57,8 +57,8 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp) } - var returnUpdate cdsUpdate - localCache := make(map[string]cdsUpdate) + var returnUpdate CDSUpdate + localCache := make(map[string]CDSUpdate) for _, r := range resp.GetResources() { var resource ptypes.DynamicAny if err := ptypes.UnmarshalAny(r, &resource); err != nil { @@ -75,8 +75,8 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { // If the Cluster message in the CDS response did not contain a // serviceName, we will just use the clusterName for EDS. - if update.serviceName == "" { - update.serviceName = cluster.GetName() + if update.ServiceName == "" { + update.ServiceName = cluster.GetName() } localCache[cluster.GetName()] = update if cluster.GetName() == wi.target[0] { @@ -86,7 +86,7 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { v2c.cdsCache = localCache var err error - if returnUpdate.serviceName == "" { + if returnUpdate.ServiceName == "" { err = fmt.Errorf("xds: CDS target %s not found in received response %+v", wi.target, resp) } wi.stopTimer() @@ -94,8 +94,8 @@ func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error { return nil } -func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) { - emptyUpdate := cdsUpdate{serviceName: "", doLRS: false} +func validateCluster(cluster *xdspb.Cluster) (CDSUpdate, error) { + emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false} switch { case cluster.GetType() != xdspb.Cluster_EDS: return emptyUpdate, fmt.Errorf("xds: unexpected cluster type %v in response: %+v", cluster.GetType(), cluster) @@ -105,8 +105,8 @@ func validateCluster(cluster *xdspb.Cluster) (cdsUpdate, error) { return emptyUpdate, fmt.Errorf("xds: unexpected lbPolicy %v in response: %+v", cluster.GetLbPolicy(), cluster) } - return cdsUpdate{ - serviceName: cluster.GetEdsClusterConfig().GetServiceName(), - doLRS: cluster.GetLrsServer().GetSelf() != nil, + return CDSUpdate{ + ServiceName: cluster.GetEdsClusterConfig().GetServiceName(), + EnableLRS: cluster.GetLrsServer().GetSelf() != nil, }, nil } diff --git a/xds/internal/client/cds_test.go b/xds/internal/client/cds_test.go index 484b5b21..ab4655dd 100644 --- a/xds/internal/client/cds_test.go +++ b/xds/internal/client/cds_test.go @@ -39,11 +39,11 @@ const ( serviceName2 = "bar-service" ) -func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate { +func (v2c *v2Client) cloneCDSCacheForTesting() map[string]CDSUpdate { v2c.mu.Lock() defer v2c.mu.Unlock() - cloneCache := make(map[string]cdsUpdate) + cloneCache := make(map[string]CDSUpdate) for k, v := range v2c.cdsCache { cloneCache[k] = v } @@ -51,11 +51,11 @@ func (v2c *v2Client) cloneCDSCacheForTesting() map[string]cdsUpdate { } func TestValidateCluster(t *testing.T) { - emptyUpdate := cdsUpdate{serviceName: "", doLRS: false} + emptyUpdate := CDSUpdate{ServiceName: "", EnableLRS: false} tests := []struct { name string cluster *xdspb.Cluster - wantUpdate cdsUpdate + wantUpdate CDSUpdate wantErr bool }{ { @@ -138,12 +138,12 @@ func TestValidateCluster(t *testing.T) { }, LbPolicy: xdspb.Cluster_ROUND_ROBIN, }, - wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: false}, + wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: false}, }, { name: "happiest-case", cluster: goodCluster1, - wantUpdate: cdsUpdate{serviceName: serviceName1, doLRS: true}, + wantUpdate: CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, }, } @@ -177,7 +177,7 @@ func TestCDSHandleResponse(t *testing.T) { name string cdsResponse *xdspb.DiscoveryResponse wantErr bool - wantUpdate *cdsUpdate + wantUpdate *CDSUpdate wantUpdateErr bool }{ // Badly marshaled CDS response. @@ -201,7 +201,7 @@ func TestCDSHandleResponse(t *testing.T) { name: "no-cluster", cdsResponse: &xdspb.DiscoveryResponse{}, wantErr: false, - wantUpdate: &cdsUpdate{}, + wantUpdate: &CDSUpdate{}, wantUpdateErr: true, }, // Response contains one good cluster we are not interested in. @@ -209,7 +209,7 @@ func TestCDSHandleResponse(t *testing.T) { name: "one-uninteresting-cluster", cdsResponse: goodCDSResponse2, wantErr: false, - wantUpdate: &cdsUpdate{}, + wantUpdate: &CDSUpdate{}, wantUpdateErr: true, }, // Response contains one cluster and it is good. @@ -217,18 +217,18 @@ func TestCDSHandleResponse(t *testing.T) { name: "one-good-cluster", cdsResponse: goodCDSResponse1, wantErr: false, - wantUpdate: &cdsUpdate{serviceName: serviceName1, doLRS: true}, + wantUpdate: &CDSUpdate{ServiceName: serviceName1, EnableLRS: true}, wantUpdateErr: false, }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - gotUpdateCh := make(chan cdsUpdate, 1) + gotUpdateCh := make(chan CDSUpdate, 1) gotUpdateErrCh := make(chan error, 1) // Register a watcher, to trigger the v2Client to send an CDS request. - cancelWatch := v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) { - t.Logf("in v2c.watchCDS callback, cdsUpdate: %+v, err: %v", u, err) + cancelWatch := v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) { + t.Logf("in v2c.watchCDS callback, CDSUpdate: %+v, err: %v", u, err) gotUpdateCh <- u gotUpdateErrCh <- err }) @@ -295,7 +295,7 @@ type cdsTestOp struct { // wantOpErr specfies whether the main operation should return an error. wantOpErr bool // wantCDSCache is the expected rdsCache at the end of an operation. - wantCDSCache map[string]cdsUpdate + wantCDSCache map[string]CDSUpdate // wantWatchCallback specifies if the watch callback should be invoked. wantWatchCallback bool } @@ -322,8 +322,8 @@ func testCDSCaching(t *testing.T, cdsTestOps []cdsTestOp, errCh chan error) { // Register a watcher if required, and use a channel to signal the // successful invocation of the callback. if cdsTestOp.target != "" { - v2c.watchCDS(cdsTestOp.target, func(u cdsUpdate, err error) { - t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err) + v2c.watchCDS(cdsTestOp.target, func(u CDSUpdate, err error) { + t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) callbackCh <- struct{}{} }) t.Logf("Registered a watcher for CDS target: %v...", cdsTestOp.target) @@ -369,7 +369,7 @@ func TestCDSCaching(t *testing.T) { { target: clusterName1, responseToSend: &fakexds.Response{Resp: goodCDSResponse1}, - wantCDSCache: map[string]cdsUpdate{ + wantCDSCache: map[string]CDSUpdate{ clusterName1: {serviceName1, true}, }, wantWatchCallback: true, @@ -378,7 +378,7 @@ func TestCDSCaching(t *testing.T) { // one received in the previous response). This should be cached. { responseToSend: &fakexds.Response{Resp: cdsResponseWithMultipleResources}, - wantCDSCache: map[string]cdsUpdate{ + wantCDSCache: map[string]CDSUpdate{ clusterName1: {serviceName1, true}, clusterName2: {serviceName2, false}, }, @@ -389,7 +389,7 @@ func TestCDSCaching(t *testing.T) { // callback to be invoked with the new serviceName. { target: clusterName2, - wantCDSCache: map[string]cdsUpdate{ + wantCDSCache: map[string]CDSUpdate{ clusterName1: {serviceName1, true}, clusterName2: {serviceName2, false}, }, @@ -399,7 +399,7 @@ func TestCDSCaching(t *testing.T) { { responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}}, wantOpErr: false, - wantCDSCache: map[string]cdsUpdate{}, + wantCDSCache: map[string]CDSUpdate{}, wantWatchCallback: true, }, } @@ -438,10 +438,10 @@ func TestCDSWatchExpiryTimer(t *testing.T) { t.Log("Started xds v2Client...") cdsCallbackCh := make(chan error, 1) - v2c.watchCDS(clusterName1, func(u cdsUpdate, err error) { - t.Logf("Received callback with cdsUpdate {%+v} and error {%v}", u, err) - if u.serviceName != "" { - cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.serviceName) + v2c.watchCDS(clusterName1, func(u CDSUpdate, err error) { + t.Logf("Received callback with CDSUpdate {%+v} and error {%v}", u, err) + if u.ServiceName != "" { + cdsCallbackCh <- fmt.Errorf("received serviceName %v in cdsCallback, wanted empty string", u.ServiceName) } if err == nil { cdsCallbackCh <- errors.New("received nil error in cdsCallback") diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index 5776fbdf..67efc447 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -154,6 +154,12 @@ func (c *Client) WatchService(serviceName string, callback func(ServiceUpdate, e } } +// WatchCluster uses CDS to discover information about the provided +// clusterName. +func (c *Client) WatchCluster(clusterName string, cdsCb func(CDSUpdate, error)) (cancel func()) { + return c.v2c.watchCDS(clusterName, cdsCb) +} + // WatchEDS watches the ghost. func (c *Client) WatchEDS(clusterName string, edsCb func(*EDSUpdate, error)) (cancel func()) { return c.v2c.watchEDS(clusterName, edsCb) diff --git a/xds/internal/client/types.go b/xds/internal/client/types.go index fed39ded..04e21749 100644 --- a/xds/internal/client/types.go +++ b/xds/internal/client/types.go @@ -86,10 +86,15 @@ type rdsUpdate struct { } type rdsCallback func(rdsUpdate, error) -type cdsUpdate struct { - serviceName string - doLRS bool +// CDSUpdate contains information from a received CDS response, which is of +// interest to the registered CDS watcher. +type CDSUpdate struct { + // ServiceName is the service name corresponding to the clusterName which + // is being watched for through CDS. + ServiceName string + // EnableLRS indicates whether or not load should be reported through LRS. + EnableLRS bool } -type cdsCallback func(cdsUpdate, error) +type cdsCallback func(CDSUpdate, error) type edsCallback func(*EDSUpdate, error) diff --git a/xds/internal/client/v2client.go b/xds/internal/client/v2client.go index b659db13..1baf614a 100644 --- a/xds/internal/client/v2client.go +++ b/xds/internal/client/v2client.go @@ -74,7 +74,7 @@ type v2Client struct { // unrequested resources. // https://github.com/envoyproxy/envoy/blob/master/api/xds_protocol.rst#resource-hints rdsCache map[string]string - // rdsCache maintains a mapping of {clusterName --> cdsUpdate} from + // rdsCache maintains a mapping of {clusterName --> CDSUpdate} from // validated cluster configurations received in CDS responses. We cache all // valid cluster configurations, whether or not we are interested in them // when we received them (because we could become interested in them in the @@ -83,7 +83,7 @@ type v2Client struct { // resource_names field. As per the latest spec, the server should resend // the response when the request changes, even if it had sent the same // resource earlier (when not asked for). Protected by the above mutex. - cdsCache map[string]cdsUpdate + cdsCache map[string]CDSUpdate } // newV2Client creates a new v2Client initialized with the passed arguments. @@ -95,7 +95,7 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int) watchCh: buffer.NewUnbounded(), watchMap: make(map[resourceType]*watchInfo), rdsCache: make(map[string]string), - cdsCache: make(map[string]cdsUpdate), + cdsCache: make(map[string]CDSUpdate), } v2c.ctx, v2c.cancelCtx = context.WithCancel(context.Background()) @@ -416,7 +416,7 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) { } wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() { v2c.mu.Lock() - wi.callback.(cdsCallback)(cdsUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) + wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target)) v2c.mu.Unlock() }) case edsResource: