xds: support ack/nack (#3227)

The client will send a request with version/nonce after receiving a
response, to ack/nack.

Ack versions for different xds types are independent.

Some other changes
- merge sendRequests to one shared function, with fields for version/nonce
- deleted enum for xds types, and always use const URL string
This commit is contained in:
Menghan Li
2019-12-12 13:39:56 -08:00
committed by GitHub
parent 032a3799b2
commit df18b5433b
11 changed files with 575 additions and 279 deletions

View File

@ -23,36 +23,15 @@ import (
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"
)
// newCDSRequest generates an CDS request proto for the provided clusterName,
// to be sent out on the wire.
func (v2c *v2Client) newCDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: clusterURL,
ResourceNames: clusterName,
}
}
// sendCDS sends an CDS request for provided clusterName on the provided
// stream.
func (v2c *v2Client) sendCDS(stream adsStream, clusterName []string) bool {
if err := stream.Send(v2c.newCDSRequest(clusterName)); err != nil {
grpclog.Warningf("xds: CDS request for resource %v failed: %v", clusterName, err)
return false
}
return true
}
// handleCDSResponse processes an CDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleCDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[cdsResource]
wi := v2c.watchMap[cdsURL]
if wi == nil {
return fmt.Errorf("xds: no CDS watcher found when handling CDS response: %+v", resp)
}

View File

@ -397,7 +397,7 @@ func TestCDSCaching(t *testing.T) {
},
// Push an empty CDS response. This should clear the cache.
{
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: clusterURL}},
responseToSend: &fakexds.Response{Resp: &xdspb.DiscoveryResponse{TypeUrl: cdsURL}},
wantOpErr: false,
wantCDSCache: map[string]CDSUpdate{},
wantWatchCallback: true,
@ -466,11 +466,11 @@ var (
badlyMarshaledCDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
goodCluster1 = &xdspb.Cluster{
Name: clusterName1,
@ -508,32 +508,32 @@ var (
goodCDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster1,
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
goodCDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster2,
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
cdsResponseWithMultipleResources = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster1,
},
{
TypeUrl: clusterURL,
TypeUrl: cdsURL,
Value: marshaledCluster2,
},
},
TypeUrl: clusterURL,
TypeUrl: cdsURL,
}
)

View File

@ -93,7 +93,11 @@ func TestNew(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if _, err := New(test.opts); (err != nil) != test.wantErr {
c, err := New(test.opts)
if err == nil {
defer c.Close()
}
if (err != nil) != test.wantErr {
t.Fatalf("New(%+v) = %v, wantErr: %v", test.opts, err, test.wantErr)
}
})
@ -260,6 +264,7 @@ func TestWatchServiceWithClientClose(t *testing.T) {
if err != nil {
t.Fatalf("New returned error: %v", err)
}
defer xdsClient.Close()
t.Log("Created an xdsClient...")
callbackCh := make(chan error, 1)

View File

@ -164,30 +164,11 @@ func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
return u
}
// newEDSRequest generates an EDS request proto for the provided clusterName, to
// be sent out on the wire.
func (v2c *v2Client) newEDSRequest(clusterName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: endpointURL,
ResourceNames: clusterName,
}
}
// sendEDS sends an EDS request for provided clusterName on the provided stream.
func (v2c *v2Client) sendEDS(stream adsStream, clusterName []string) bool {
if err := stream.Send(v2c.newEDSRequest(clusterName)); err != nil {
grpclog.Warningf("xds: EDS request for resource %v failed: %v", clusterName, err)
return false
}
return true
}
func (v2c *v2Client) handleEDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[edsResource]
wi := v2c.watchMap[edsURL]
if wi == nil {
return fmt.Errorf("xds: no EDS watcher found when handling EDS response: %+v", resp)
}

View File

@ -120,11 +120,11 @@ var (
badlyMarshaledEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: endpointURL,
TypeUrl: edsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
badResourceTypeInEDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
@ -133,7 +133,7 @@ var (
Value: marshaledConnMgr1,
},
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
goodEDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
@ -145,7 +145,7 @@ var (
return a
}(),
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
goodEDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
@ -157,7 +157,7 @@ var (
return a
}(),
},
TypeUrl: endpointURL,
TypeUrl: edsURL,
}
)
@ -283,6 +283,7 @@ func TestEDSHandleResponseWithoutWatch(t *testing.T) {
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
if v2c.handleEDSResponse(goodEDSResponse1) == nil {
t.Fatal("v2c.handleEDSResponse() succeeded, should have failed")

View File

@ -21,39 +21,18 @@ package client
import (
"fmt"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
httppb "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
"github.com/golang/protobuf/ptypes"
)
// newLDSRequest generates an LDS request proto for the provided target, to be
// sent out on the wire.
func (v2c *v2Client) newLDSRequest(target []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: listenerURL,
ResourceNames: target,
}
}
// sendLDS sends an LDS request for provided target on the provided stream.
func (v2c *v2Client) sendLDS(stream adsStream, target []string) bool {
if err := stream.Send(v2c.newLDSRequest(target)); err != nil {
grpclog.Warningf("xds: LDS request for resource %v failed: %v", target, err)
return false
}
return true
}
// handleLDSResponse processes an LDS response received from the xDS server. On
// receipt of a good response, it also invokes the registered watcher callback.
func (v2c *v2Client) handleLDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi := v2c.watchMap[ldsResource]
wi := v2c.watchMap[ldsURL]
if wi == nil {
return fmt.Errorf("xds: no LDS watcher found when handling LDS response: %+v", resp)
}

View File

@ -23,31 +23,10 @@ import (
"net"
"strings"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/grpclog"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/ptypes"
)
// newRDSRequest generates an RDS request proto for the provided routeName, to
// be sent out on the wire.
func (v2c *v2Client) newRDSRequest(routeName []string) *xdspb.DiscoveryRequest {
return &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: routeURL,
ResourceNames: routeName,
}
}
// sendRDS sends an RDS request for provided routeName on the provided stream.
func (v2c *v2Client) sendRDS(stream adsStream, routeName []string) bool {
if err := stream.Send(v2c.newRDSRequest(routeName)); err != nil {
grpclog.Warningf("xds: RDS request for resource %v failed: %v", routeName, err)
return false
}
return true
}
// handleRDSResponse processes an RDS response received from the xDS server. On
// receipt of a good response, it caches validated resources and also invokes
// the registered watcher callback.
@ -55,12 +34,12 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if v2c.watchMap[ldsResource] == nil {
if v2c.watchMap[ldsURL] == nil {
return fmt.Errorf("xds: unexpected RDS response when no LDS watcher is registered: %+v", resp)
}
target := v2c.watchMap[ldsResource].target[0]
target := v2c.watchMap[ldsURL].target[0]
wi := v2c.watchMap[rdsResource]
wi := v2c.watchMap[rdsURL]
if wi == nil {
return fmt.Errorf("xds: no RDS watcher found when handling RDS response: %+v", resp)
}

View File

@ -27,20 +27,10 @@ import (
type adsStream adsgrpc.AggregatedDiscoveryService_StreamAggregatedResourcesClient
const (
listenerURL = "type.googleapis.com/envoy.api.v2.Listener"
routeURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration"
clusterURL = "type.googleapis.com/envoy.api.v2.Cluster"
endpointURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
// resourceType is an enum to represent the different xDS resources.
type resourceType int
const (
ldsResource resourceType = iota
rdsResource
cdsResource
edsResource
ldsURL = "type.googleapis.com/envoy.api.v2.Listener"
rdsURL = "type.googleapis.com/envoy.api.v2.RouteConfiguration"
cdsURL = "type.googleapis.com/envoy.api.v2.Cluster"
edsURL = "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"
)
// watchState is an enum to represent the state of a watch call.
@ -54,9 +44,10 @@ const (
// watchInfo holds all the information about a watch call.
type watchInfo struct {
wType resourceType
target []string
state watchState
typeURL string
target []string
state watchState
callback interface{}
expiryTimer *time.Timer
}
@ -76,6 +67,12 @@ func (wi *watchInfo) stopTimer() {
}
}
type ackInfo struct {
typeURL string
version string // Nack if version is an empty string.
nonce string
}
type ldsUpdate struct {
routeName string
}

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/buffer"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
adsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
)
@ -51,9 +52,9 @@ type v2Client struct {
nodeProto *corepb.Node
backoff func(int) time.Duration
// watchCh in the channel onto which watchInfo objects are pushed by the
// sendCh in the channel onto which watchInfo objects are pushed by the
// watch API, and it is read and acted upon by the send() goroutine.
watchCh *buffer.Unbounded
sendCh *buffer.Unbounded
mu sync.Mutex
// Message specific watch infos, protected by the above mutex. These are
@ -62,7 +63,12 @@ type v2Client struct {
// messages. When the user of this client object cancels a watch call,
// these are set to nil. All accesses to the map protected and any value
// inside the map should be protected with the above mutex.
watchMap map[resourceType]*watchInfo
watchMap map[string]*watchInfo
// ackMap contains the version that was acked (the version in the ack
// request that was sent on wire). The key is typeURL, the value is the
// version string, becaues the versions for different resource types
// should be independent.
ackMap map[string]string
// rdsCache maintains a mapping of {routeConfigName --> clusterName} from
// validated route configurations received in RDS responses. We cache all
// valid route configurations, whether or not we are interested in them
@ -92,8 +98,9 @@ func newV2Client(cc *grpc.ClientConn, nodeProto *corepb.Node, backoff func(int)
cc: cc,
nodeProto: nodeProto,
backoff: backoff,
watchCh: buffer.NewUnbounded(),
watchMap: make(map[resourceType]*watchInfo),
sendCh: buffer.NewUnbounded(),
watchMap: make(map[string]*watchInfo),
ackMap: make(map[string]string),
rdsCache: make(map[string]string),
cdsCache: make(map[string]CDSUpdate),
}
@ -153,6 +160,31 @@ func (v2c *v2Client) run() {
}
}
// sendRequest sends a request for provided typeURL and resource on the provided
// stream.
//
// version is the ack version to be sent with the request
// - If this is the new request (not an ack/nack), version will be an empty
// string
// - If this is an ack, version will be the version from the response
// - If this is a nack, version will be the previous acked version (from
// ackMap). If there was no ack before, it will be an empty string
func (v2c *v2Client) sendRequest(stream adsStream, resourceNames []string, typeURL, version, nonce string) bool {
req := &xdspb.DiscoveryRequest{
Node: v2c.nodeProto,
TypeUrl: typeURL,
ResourceNames: resourceNames,
VersionInfo: version,
ResponseNonce: nonce,
// TODO: populate ErrorDetails for nack.
}
if err := stream.Send(req); err != nil {
grpclog.Warningf("xds: request (type %s) for resource %v failed: %v", typeURL, resourceNames, err)
return false
}
return true
}
// sendExisting sends out xDS requests for registered watchers when recovering
// from a broken stream.
//
@ -164,30 +196,80 @@ func (v2c *v2Client) sendExisting(stream adsStream) bool {
v2c.mu.Lock()
defer v2c.mu.Unlock()
for wType, wi := range v2c.watchMap {
switch wType {
case ldsResource:
if !v2c.sendLDS(stream, wi.target) {
return false
}
case rdsResource:
if !v2c.sendRDS(stream, wi.target) {
return false
}
case cdsResource:
if !v2c.sendCDS(stream, wi.target) {
return false
}
case edsResource:
if !v2c.sendEDS(stream, wi.target) {
return false
}
// Reset the ack versions when the stream restarts.
v2c.ackMap = make(map[string]string)
for typeURL, wi := range v2c.watchMap {
if !v2c.sendRequest(stream, wi.target, typeURL, "", "") {
return false
}
}
return true
}
// processWatchInfo pulls the fields needed by the request from a watchInfo.
//
// It also calls callback with cached response, and updates the watch map in
// v2c.
//
// If the watch was already canceled, it returns false for send
func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, version, nonce string, send bool) {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if t.state == watchCancelled {
return // This returns all zero values, and false for send.
}
t.state = watchStarted
send = true
typeURL = t.typeURL
target = t.target
v2c.checkCacheAndUpdateWatchMap(t)
// TODO: if watch is called again with the same resource names,
// there's no need to send another request.
//
// TODO: should we reset version (for ack) when a new watch is
// started? Or do this only if the resource names are different
// (so we send a new request)?
return
}
// processAckInfo pulls the fields needed by the ack request from a ackInfo.
//
// If no active watch is found for this ack, it returns false for send.
func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) {
typeURL = t.typeURL
v2c.mu.Lock()
defer v2c.mu.Unlock()
wi, ok := v2c.watchMap[typeURL]
if !ok {
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackInfo is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
grpclog.Warningf("xds: ack (type %s) not sent because there's no active watch for the type", typeURL)
return // This returns all zero values, and false for send.
}
send = true
version = t.version
nonce = t.nonce
target = wi.target
if version == "" {
// This is a nack, get the previous acked version.
version = v2c.ackMap[typeURL]
// version will still be an empty string if typeURL isn't
// found in ackMap, this can happen if there wasn't any ack
// before.
} else {
v2c.ackMap[typeURL] = version
}
return
}
// send reads watch infos from update channel and sends out actual xDS requests
// on the provided ADS stream.
func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
@ -199,36 +281,25 @@ func (v2c *v2Client) send(stream adsStream, done chan struct{}) {
select {
case <-v2c.ctx.Done():
return
case u := <-v2c.watchCh.Get():
v2c.watchCh.Load()
wi := u.(*watchInfo)
v2c.mu.Lock()
if wi.state == watchCancelled {
v2c.mu.Unlock()
case u := <-v2c.sendCh.Get():
v2c.sendCh.Load()
var (
target []string
typeURL, version, nonce string
send bool
)
switch t := u.(type) {
case *watchInfo:
target, typeURL, version, nonce, send = v2c.processWatchInfo(t)
case *ackInfo:
target, typeURL, version, nonce, send = v2c.processAckInfo(t)
}
if !send {
continue
}
wi.state = watchStarted
target := wi.target
v2c.checkCacheAndUpdateWatchMap(wi)
v2c.mu.Unlock()
switch wi.wType {
case ldsResource:
if !v2c.sendLDS(stream, target) {
return
}
case rdsResource:
if !v2c.sendRDS(stream, target) {
return
}
case cdsResource:
if !v2c.sendCDS(stream, target) {
return
}
case edsResource:
if !v2c.sendEDS(stream, target) {
return
}
if !v2c.sendRequest(stream, target, typeURL, version, nonce) {
return
}
case <-done:
return
@ -247,30 +318,36 @@ func (v2c *v2Client) recv(stream adsStream) bool {
grpclog.Warningf("xds: ADS stream recv failed: %v", err)
return success
}
var respHandleErr error
switch resp.GetTypeUrl() {
case listenerURL:
if err := v2c.handleLDSResponse(resp); err != nil {
grpclog.Warningf("xds: LDS response handler failed: %v", err)
return success
}
case routeURL:
if err := v2c.handleRDSResponse(resp); err != nil {
grpclog.Warningf("xds: RDS response handler failed: %v", err)
return success
}
case clusterURL:
if err := v2c.handleCDSResponse(resp); err != nil {
grpclog.Warningf("xds: CDS response handler failed: %v", err)
return success
}
case endpointURL:
if err := v2c.handleEDSResponse(resp); err != nil {
grpclog.Warningf("xds: EDS response handler failed: %v", err)
return success
}
case ldsURL:
respHandleErr = v2c.handleLDSResponse(resp)
case rdsURL:
respHandleErr = v2c.handleRDSResponse(resp)
case cdsURL:
respHandleErr = v2c.handleCDSResponse(resp)
case edsURL:
respHandleErr = v2c.handleEDSResponse(resp)
default:
grpclog.Warningf("xds: unknown response URL type: %v", resp.GetTypeUrl())
continue
}
typeURL := resp.GetTypeUrl()
if respHandleErr != nil {
grpclog.Warningf("xds: response (type %s) handler failed: %v", typeURL, respHandleErr)
v2c.sendCh.Put(&ackInfo{
typeURL: typeURL,
version: "",
nonce: resp.GetNonce(),
})
continue
}
v2c.sendCh.Put(&ackInfo{
typeURL: typeURL,
version: resp.GetVersionInfo(),
nonce: resp.GetNonce(),
})
success = true
}
}
@ -282,18 +359,11 @@ func (v2c *v2Client) recv(stream adsStream) bool {
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func()) {
wi := &watchInfo{wType: ldsResource, target: []string{target}, callback: ldsCb}
v2c.watchCh.Put(wi)
return func() {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if wi.state == watchEnqueued {
wi.state = watchCancelled
return
}
v2c.watchMap[ldsResource].cancel()
delete(v2c.watchMap, ldsResource)
}
return v2c.watch(&watchInfo{
typeURL: ldsURL,
target: []string{target},
callback: ldsCb,
})
}
// watchRDS registers an RDS watcher for the provided routeName. Updates
@ -303,21 +373,14 @@ func (v2c *v2Client) watchLDS(target string, ldsCb ldsCallback) (cancel func())
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func()) {
wi := &watchInfo{wType: rdsResource, target: []string{routeName}, callback: rdsCb}
v2c.watchCh.Put(wi)
return func() {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if wi.state == watchEnqueued {
wi.state = watchCancelled
return
}
v2c.watchMap[rdsResource].cancel()
delete(v2c.watchMap, rdsResource)
// TODO: Once a registered RDS watch is cancelled, we should send an
// RDS request with no resources. This will let the server know that we
// are no longer interested in this resource.
}
return v2c.watch(&watchInfo{
typeURL: rdsURL,
target: []string{routeName},
callback: rdsCb,
})
// TODO: Once a registered RDS watch is cancelled, we should send an RDS
// request with no resources. This will let the server know that we are no
// longer interested in this resource.
}
// watchCDS registers an CDS watcher for the provided clusterName. Updates
@ -327,18 +390,11 @@ func (v2c *v2Client) watchRDS(routeName string, rdsCb rdsCallback) (cancel func(
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel func()) {
wi := &watchInfo{wType: cdsResource, target: []string{clusterName}, callback: cdsCb}
v2c.watchCh.Put(wi)
return func() {
v2c.mu.Lock()
defer v2c.mu.Unlock()
if wi.state == watchEnqueued {
wi.state = watchCancelled
return
}
v2c.watchMap[cdsResource].cancel()
delete(v2c.watchMap, cdsResource)
}
return v2c.watch(&watchInfo{
typeURL: cdsURL,
target: []string{clusterName},
callback: cdsCb,
})
}
// watchEDS registers an EDS watcher for the provided clusterName. Updates
@ -348,8 +404,18 @@ func (v2c *v2Client) watchCDS(clusterName string, cdsCb cdsCallback) (cancel fun
// The provided callback should not block or perform any expensive operations
// or call other methods of the v2Client object.
func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel func()) {
wi := &watchInfo{wType: edsResource, target: []string{clusterName}, callback: edsCb}
v2c.watchCh.Put(wi)
return v2c.watch(&watchInfo{
typeURL: edsURL,
target: []string{clusterName},
callback: edsCb,
})
// TODO: Once a registered EDS watch is cancelled, we should send an EDS
// request with no resources. This will let the server know that we are no
// longer interested in this resource.
}
func (v2c *v2Client) watch(wi *watchInfo) (cancel func()) {
v2c.sendCh.Put(wi)
return func() {
v2c.mu.Lock()
defer v2c.mu.Unlock()
@ -357,11 +423,9 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun
wi.state = watchCancelled
return
}
v2c.watchMap[edsResource].cancel()
delete(v2c.watchMap, edsResource)
// TODO: Once a registered EDS watch is cancelled, we should send an
// EDS request with no resources. This will let the server know that we
// are no longer interested in this resource.
v2c.watchMap[wi.typeURL].cancel()
delete(v2c.watchMap, wi.typeURL)
// TODO: should we reset ack version string when cancelling the watch?
}
}
@ -372,25 +436,25 @@ func (v2c *v2Client) watchEDS(clusterName string, edsCb edsCallback) (cancel fun
//
// Caller should hold v2c.mu
func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
if existing := v2c.watchMap[wi.wType]; existing != nil {
if existing := v2c.watchMap[wi.typeURL]; existing != nil {
existing.cancel()
}
v2c.watchMap[wi.wType] = wi
switch wi.wType {
v2c.watchMap[wi.typeURL] = wi
switch wi.typeURL {
// We need to grab the lock inside of the expiryTimer's afterFunc because
// we need to access the watchInfo, which is stored in the watchMap.
case ldsResource:
case ldsURL:
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
v2c.mu.Lock()
wi.callback.(ldsCallback)(ldsUpdate{}, fmt.Errorf("xds: LDS target %s not found", wi.target))
v2c.mu.Unlock()
})
case rdsResource:
case rdsURL:
routeName := wi.target[0]
if cluster := v2c.rdsCache[routeName]; cluster != "" {
var err error
if v2c.watchMap[ldsResource] == nil {
if v2c.watchMap[ldsURL] == nil {
cluster = ""
err = fmt.Errorf("xds: no LDS watcher found when handling RDS watch for route {%v} from cache", routeName)
}
@ -404,11 +468,11 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
wi.callback.(rdsCallback)(rdsUpdate{}, fmt.Errorf("xds: RDS target %s not found", wi.target))
v2c.mu.Unlock()
})
case cdsResource:
case cdsURL:
clusterName := wi.target[0]
if update, ok := v2c.cdsCache[clusterName]; ok {
var err error
if v2c.watchMap[cdsResource] == nil {
if v2c.watchMap[cdsURL] == nil {
err = fmt.Errorf("xds: no CDS watcher found when handling CDS watch for cluster {%v} from cache", clusterName)
}
wi.callback.(cdsCallback)(update, err)
@ -419,7 +483,7 @@ func (v2c *v2Client) checkCacheAndUpdateWatchMap(wi *watchInfo) {
wi.callback.(cdsCallback)(CDSUpdate{}, fmt.Errorf("xds: CDS target %s not found", wi.target))
v2c.mu.Unlock()
})
case edsResource:
case edsURL:
wi.expiryTimer = time.AfterFunc(defaultWatchExpiryTimeout, func() {
v2c.mu.Lock()
wi.callback.(edsCallback)(nil, fmt.Errorf("xds: EDS target %s not found", wi.target))

View File

@ -0,0 +1,294 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"fmt"
"strconv"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
anypb "github.com/golang/protobuf/ptypes/any"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/client/fakexds"
)
func emptyChanRecvWithTimeout(ch <-chan struct{}, d time.Duration) error {
timer := time.NewTimer(d)
select {
case <-timer.C:
return fmt.Errorf("timeout")
case <-ch:
timer.Stop()
return nil
}
}
func requestChanRecvWithTimeout(ch <-chan *fakexds.Request, d time.Duration) (*fakexds.Request, error) {
timer := time.NewTimer(d)
select {
case <-timer.C:
return nil, fmt.Errorf("timeout waiting for request")
case r := <-ch:
timer.Stop()
return r, nil
}
}
// compareXDSRequest reads requests from channel, compare it with want.
func compareXDSRequest(ch <-chan *fakexds.Request, d time.Duration, want *xdspb.DiscoveryRequest, version, nonce string) error {
r, err := requestChanRecvWithTimeout(ch, d)
if err != nil {
return err
}
if r.Err != nil {
return fmt.Errorf("unexpected error from request: %v", r.Err)
}
wantClone := proto.Clone(want).(*xdspb.DiscoveryRequest)
wantClone.VersionInfo = version
wantClone.ResponseNonce = nonce
if !cmp.Equal(r.Req, wantClone, cmp.Comparer(proto.Equal)) {
return fmt.Errorf("received request different from want, diff: %s", cmp.Diff(r.Req, wantClone))
}
return nil
}
func sendXDSRespWithVersion(ch chan<- *fakexds.Response, respWithoutVersion *xdspb.DiscoveryResponse, version int) (nonce string) {
respToSend := proto.Clone(respWithoutVersion).(*xdspb.DiscoveryResponse)
respToSend.VersionInfo = strconv.Itoa(version)
nonce = strconv.Itoa(int(time.Now().UnixNano()))
respToSend.Nonce = nonce
ch <- &fakexds.Response{Resp: respToSend}
return
}
// TestV2ClientAck verifies that valid responses are acked, and invalid ones are
// nacked.
//
// This test also verifies the version for different types are independent.
func TestV2ClientAck(t *testing.T) {
var (
versionLDS = 1000
versionRDS = 2000
versionCDS = 3000
versionEDS = 4000
)
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
cbRDS := startXDS(t, "RDS", v2c, fakeServer, goodRDSRequest)
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
cbCDS := startXDS(t, "CDS", v2c, fakeServer, goodCDSRequest)
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
cbEDS := startXDS(t, "EDS", v2c, fakeServer, goodEDSRequest)
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
// Send a bad response, and check for nack.
sendBadResp(t, "LDS", fakeServer, versionLDS, goodLDSRequest)
versionLDS++
sendBadResp(t, "RDS", fakeServer, versionRDS, goodRDSRequest)
versionRDS++
sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest)
versionCDS++
sendBadResp(t, "EDS", fakeServer, versionEDS, goodEDSRequest)
versionEDS++
// send another good response, and check for ack, with the new version.
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
sendGoodResp(t, "RDS", fakeServer, versionRDS, goodRDSResponse1, goodRDSRequest, cbRDS)
versionRDS++
sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, cbCDS)
versionCDS++
sendGoodResp(t, "EDS", fakeServer, versionEDS, goodEDSResponse1, goodEDSRequest, cbEDS)
versionEDS++
}
// startXDS calls watch to send the first request. It then sends a good response
// and checks for ack.
func startXDS(t *testing.T, xdsname string, v2c *v2Client, fakeServer *fakexds.Server, goodReq *xdspb.DiscoveryRequest) <-chan struct{} {
callbackCh := make(chan struct{}, 1)
switch xdsname {
case "LDS":
v2c.watchLDS(goodLDSTarget1, func(u ldsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
case "RDS":
v2c.watchRDS(goodRouteName1, func(u rdsUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
case "CDS":
v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
case "EDS":
v2c.watchEDS(goodEDSName, func(u *EDSUpdate, err error) {
t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", xdsname, u, err)
callbackCh <- struct{}{}
})
}
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodReq, "", ""); err != nil {
t.Fatalf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("FakeServer received %s request...", xdsname)
return callbackCh
}
// sendGoodResp sends the good response, with the given version, and a random
// nonce.
//
// It also waits and checks that the ack request contains the given version, and
// the generated nonce.
func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh <-chan struct{}) {
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, goodResp, version)
t.Logf("Good %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Good %s response acked", xdsname)
if err := emptyChanRecvWithTimeout(callbackCh, defaultTestTimeout); err != nil {
t.Errorf("Timeout when expecting %s update", xdsname)
}
t.Logf("Good %s response callback executed", xdsname)
}
// sendBadResp sends a bad response with the given version. This response will
// be nacked, so we expect a request with the previous version (version-1).
//
// But the nonce in request should be the new nonce.
func sendBadResp(t *testing.T, xdsname string, fakeServer *fakexds.Server, version int, wantReq *xdspb.DiscoveryRequest) {
var typeURL string
switch xdsname {
case "LDS":
typeURL = ldsURL
case "RDS":
typeURL = rdsURL
case "CDS":
typeURL = cdsURL
case "EDS":
typeURL = edsURL
}
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: typeURL,
}, version)
t.Logf("Bad %s response pushed to fakeServer...", xdsname)
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, wantReq, strconv.Itoa(version-1), nonce); err != nil {
t.Errorf("Failed to receive %s request: %v", xdsname, err)
}
t.Logf("Bad %s response nacked", xdsname)
}
// Test when the first response is invalid, and is nacked, the nack requests
// should have an empty version string.
func TestV2ClientAckFirstIsNack(t *testing.T) {
var versionLDS = 1000
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is an empty string, because this is the first
// response, and it's nacked (so there's no previous ack version).
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, "", nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}
// Test when a nack is sent after a new watch, we nack with the previous acked
// version (instead of resetting to empty string).
func TestV2ClientAckNackAfterNewWatch(t *testing.T) {
var versionLDS = 1000
fakeServer, sCleanup := fakexds.StartServer(t)
client, cCleanup := fakeServer.GetClientConn(t)
defer func() {
cCleanup()
sCleanup()
}()
v2c := newV2Client(client, goodNodeProto, func(int) time.Duration { return 0 })
defer v2c.close()
t.Log("Started xds v2Client...")
// Start the watch, send a good response, and check for ack.
cbLDS := startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
// Start a new watch.
cbLDS = startXDS(t, "LDS", v2c, fakeServer, goodLDSRequest)
// This is an invalid response after the new watch.
nonce := sendXDSRespWithVersion(fakeServer.ResponseChan, &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{{}},
TypeUrl: ldsURL,
}, versionLDS)
t.Logf("Bad response pushed to fakeServer...")
// The expected version string is the previous acked version.
if err := compareXDSRequest(fakeServer.RequestChan, defaultTestTimeout, goodLDSRequest, strconv.Itoa(versionLDS-1), nonce); err != nil {
t.Errorf("Failed to receive request: %v", err)
}
t.Logf("Bad response nacked")
versionLDS++
sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS)
versionLDS++
}

View File

@ -64,9 +64,24 @@ var (
}
goodLDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: listenerURL,
TypeUrl: ldsURL,
ResourceNames: []string{goodLDSTarget1},
}
goodRDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: rdsURL,
ResourceNames: []string{goodRouteName1},
}
goodCDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: cdsURL,
ResourceNames: []string{goodClusterName1},
}
goodEDSRequest = &xdspb.DiscoveryRequest{
Node: goodNodeProto,
TypeUrl: edsURL,
ResourceNames: []string{goodEDSName},
}
goodHTTPConnManager1 = &httppb.HttpConnectionManager{
RouteSpecifier: &httppb.HttpConnectionManager_Rds{
Rds: &httppb.Rds{
@ -130,7 +145,7 @@ var (
Name: goodLDSTarget1,
ApiListener: &listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener1,
},
},
@ -156,30 +171,30 @@ var (
goodLDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener1,
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
goodLDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener2,
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: listenerURL}
emptyLDSResponse = &xdspb.DiscoveryResponse{TypeUrl: ldsURL}
badlyMarshaledLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
badResourceTypeInLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
@ -188,55 +203,55 @@ var (
Value: marshaledConnMgr1,
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
ldsResponseWithMultipleResources = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener2,
},
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener1,
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
noAPIListenerLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledNoAPIListener,
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
goodBadUglyLDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener2,
},
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: marshaledListener1,
},
{
TypeUrl: listenerURL,
TypeUrl: ldsURL,
Value: badlyMarshaledAPIListener2,
},
},
TypeUrl: listenerURL,
TypeUrl: ldsURL,
}
badlyMarshaledRDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: routeURL,
TypeUrl: rdsURL,
Value: []byte{1, 2, 3, 4},
},
},
TypeUrl: routeURL,
TypeUrl: rdsURL,
}
badResourceTypeInRDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
@ -245,7 +260,7 @@ var (
Value: marshaledConnMgr1,
},
},
TypeUrl: routeURL,
TypeUrl: rdsURL,
}
emptyRouteConfig = &xdspb.RouteConfiguration{}
marshaledEmptyRouteConfig, _ = proto.Marshal(emptyRouteConfig)
@ -255,11 +270,11 @@ var (
noVirtualHostsInRDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: routeURL,
TypeUrl: rdsURL,
Value: marshaledEmptyRouteConfig,
},
},
TypeUrl: routeURL,
TypeUrl: rdsURL,
}
goodRouteConfig1 = &xdspb.RouteConfiguration{
Name: goodRouteName1,
@ -346,29 +361,29 @@ var (
goodRDSResponse1 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: routeURL,
TypeUrl: rdsURL,
Value: marshaledGoodRouteConfig1,
},
},
TypeUrl: routeURL,
TypeUrl: rdsURL,
}
goodRDSResponse2 = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: routeURL,
TypeUrl: rdsURL,
Value: marshaledGoodRouteConfig2,
},
},
TypeUrl: routeURL,
TypeUrl: rdsURL,
}
uninterestingRDSResponse = &xdspb.DiscoveryResponse{
Resources: []*anypb.Any{
{
TypeUrl: routeURL,
TypeUrl: rdsURL,
Value: marshaledUninterestingRouteConfig,
},
},
TypeUrl: routeURL,
TypeUrl: rdsURL,
}
)
@ -448,6 +463,8 @@ func TestV2ClientRetriesAfterBrokenStream(t *testing.T) {
case <-callbackCh:
timer.Stop()
}
// Read the ack, so the next request is sent after stream re-creation.
<-fakeServer.RequestChan
fakeServer.ResponseChan <- &fakexds.Response{Err: errors.New("RPC error")}
t.Log("Bad LDS response pushed to fakeServer...")