From a9601d93f498eec9b35b3e148162d79d8aa60602 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 3 Apr 2020 13:09:57 -0700 Subject: [PATCH] xds: update nonce even if the ACK/NACK is not sent on wire (#3497) This can happen when the watch is canceled while the response is on wire. Also, tag ACK/NACK with the stream so nonce for a new stream doesn't get updated by a ACK from the previous stream. --- xds/internal/client/types.go | 6 +- xds/internal/client/v2client.go | 29 ++++-- xds/internal/client/v2client_ack_test.go | 111 +++++++++++++++++++++++ 3 files changed, 137 insertions(+), 9 deletions(-) diff --git a/xds/internal/client/types.go b/xds/internal/client/types.go index be9a366f..84a446d0 100644 --- a/xds/internal/client/types.go +++ b/xds/internal/client/types.go @@ -72,8 +72,12 @@ func (wi *watchInfo) stopTimer() { type ackInfo struct { typeURL string - version string // Nack if version is an empty string. + version string // NACK if version is an empty string. nonce string + // ACK/NACK are tagged with the stream it's for. When the stream is down, + // all the ACK/NACK for this stream will be dropped, and the version/nonce + // won't be updated. + stream adsStream } type ldsUpdate struct { diff --git a/xds/internal/client/v2client.go b/xds/internal/client/v2client.go index faaa0e19..53ac82ab 100644 --- a/xds/internal/client/v2client.go +++ b/xds/internal/client/v2client.go @@ -255,11 +255,25 @@ func (v2c *v2Client) processWatchInfo(t *watchInfo) (target []string, typeURL, v // processAckInfo pulls the fields needed by the ack request from a ackInfo. // // If no active watch is found for this ack, it returns false for send. -func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, version, nonce string, send bool) { +func (v2c *v2Client) processAckInfo(t *ackInfo, stream adsStream) (target []string, typeURL, version, nonce string, send bool) { + if t.stream != stream { + // If ACK's stream isn't the current sending stream, this means the ACK + // was pushed to queue before the old stream broke, and a new stream has + // been started since. Return immediately here so we don't update the + // nonce for the new stream. + return + } typeURL = t.typeURL v2c.mu.Lock() defer v2c.mu.Unlock() + + // Update the nonce no matter if we are going to send the ACK request on + // wire. We may not send the request if the watch is canceled. But the nonce + // needs to be updated so the next request will have the right nonce. + nonce = t.nonce + v2c.nonceMap[typeURL] = nonce + wi, ok := v2c.watchMap[typeURL] if !ok { // We don't send the request ack if there's no active watch (this can be @@ -267,13 +281,10 @@ func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, versi // canceled while the ackInfo is in queue), because there's no resource // name. And if we send a request with empty resource name list, the // server may treat it as a wild card and send us everything. - return // This returns all zero values, and false for send. + return nil, "", "", "", false } send = true - version = t.version - nonce = t.nonce - target = wi.target if version == "" { // This is a nack, get the previous acked version. version = v2c.versionMap[typeURL] @@ -283,8 +294,8 @@ func (v2c *v2Client) processAckInfo(t *ackInfo) (target []string, typeURL, versi } else { v2c.versionMap[typeURL] = version } - v2c.nonceMap[typeURL] = nonce - return + target = wi.target + return target, typeURL, version, nonce, send } // send is a separate goroutine for sending watch requests on the xds stream. @@ -327,7 +338,7 @@ func (v2c *v2Client) send() { case *watchInfo: target, typeURL, version, nonce, send = v2c.processWatchInfo(t) case *ackInfo: - target, typeURL, version, nonce, send = v2c.processAckInfo(t) + target, typeURL, version, nonce, send = v2c.processAckInfo(t, stream) } if !send { continue @@ -381,6 +392,7 @@ func (v2c *v2Client) recv(stream adsStream) bool { typeURL: typeURL, version: "", nonce: resp.GetNonce(), + stream: stream, }) v2c.logger.Warningf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce(), respHandleErr) continue @@ -389,6 +401,7 @@ func (v2c *v2Client) recv(stream adsStream) bool { typeURL: typeURL, version: resp.GetVersionInfo(), nonce: resp.GetNonce(), + stream: stream, }) v2c.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", typeURL, resp.GetVersionInfo(), resp.GetNonce()) success = true diff --git a/xds/internal/client/v2client_ack_test.go b/xds/internal/client/v2client_ack_test.go index e2e7f1b4..1fce1f5e 100644 --- a/xds/internal/client/v2client_ack_test.go +++ b/xds/internal/client/v2client_ack_test.go @@ -98,6 +98,10 @@ func startXDS(t *testing.T, xdsname string, v2c *v2Client, reqChan *testutils.Ch // // It also waits and checks that the ack request contains the given version, and // the generated nonce. +// +// TODO: make this and other helper function either consistently return error, +// and fatal() in the test code, or all call t.Fatal(), and mark them as +// helper(). func sendGoodResp(t *testing.T, xdsname string, fakeServer *fakeserver.Server, version int, goodResp *xdspb.DiscoveryResponse, wantReq *xdspb.DiscoveryRequest, callbackCh *testutils.Channel) (nonce string) { nonce = sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodResp, version) t.Logf("Good %s response pushed to fakeServer...", xdsname) @@ -263,3 +267,110 @@ func (s) TestV2ClientAckNackAfterNewWatch(t *testing.T) { sendGoodResp(t, "LDS", fakeServer, versionLDS, goodLDSResponse1, goodLDSRequest, cbLDS) versionLDS++ } + +// TestV2ClientAckNewWatchAfterCancel verifies the new request for a new watch +// after the previous watch is canceled, has the right version. +func (s) TestV2ClientAckNewWatchAfterCancel(t *testing.T) { + var versionCDS = 3000 + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start a CDS watch. + callbackCh := testutils.NewChannel() + cancel := v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err) + callbackCh.Send(struct{}{}) + }) + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { + t.Fatal(err) + } + t.Logf("FakeServer received %s request...", "CDS") + + // Send a good CDS response, this function waits for the ACK with the right + // version. + nonce := sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh) + + // Cancel the CDS watch, and start a new one. The new watch should have the + // version from the response above. + cancel() + v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err) + callbackCh.Send(struct{}{}) + }) + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS), nonce); err != nil { + t.Fatalf("Failed to receive %s request: %v", "CDS", err) + } + versionCDS++ + + // Send a bad response with the next version. + sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest) + versionCDS++ + + // send another good response, and check for ack, with the new version. + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh) + versionCDS++ +} + +// TestV2ClientAckCancelResponseRace verifies if the response and ACK request +// race with cancel (which means the ACK request will not be sent on wire, +// because there's no active watch), the nonce will still be updated, and the +// new request with the new watch will have the correct nonce. +func (s) TestV2ClientAckCancelResponseRace(t *testing.T) { + var versionCDS = 3000 + + fakeServer, cc, cleanup := startServerAndGetCC(t) + defer cleanup() + v2c := newV2Client(cc, goodNodeProto, func(int) time.Duration { return 0 }, nil) + defer v2c.close() + t.Log("Started xds v2Client...") + + // Start a CDS watch. + callbackCh := testutils.NewChannel() + cancel := v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err) + callbackCh.Send(struct{}{}) + }) + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, "", ""); err != nil { + t.Fatalf("Failed to receive %s request: %v", "CDS", err) + } + t.Logf("FakeServer received %s request...", "CDS") + + // send another good response, and check for ack, with the new version. + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh) + versionCDS++ + + // Cancel the watch before the next response is sent. This mimics the case + // watch is canceled while response is on wire. + cancel() + + // Send a good response. + nonce := sendXDSRespWithVersion(fakeServer.XDSResponseChan, goodCDSResponse1, versionCDS) + t.Logf("Good %s response pushed to fakeServer...", "CDS") + + // Expect no ACK because watch was canceled. + if req, err := fakeServer.XDSRequestChan.Receive(); err != testutils.ErrRecvTimeout { + t.Fatalf("Got unexpected xds request after watch is canceled: %v", req) + } + + // Start a new watch. The new watch should have the nonce from the response + // above, and version from the first good response. + v2c.watchCDS(goodClusterName1, func(u CDSUpdate, err error) { + t.Logf("Received %s callback with ldsUpdate {%+v} and error {%v}", "CDS", u, err) + callbackCh.Send(struct{}{}) + }) + if err := compareXDSRequest(fakeServer.XDSRequestChan, goodCDSRequest, strconv.Itoa(versionCDS-1), nonce); err != nil { + t.Fatalf("Failed to receive %s request: %v", "CDS", err) + } + + // Send a bad response with the next version. + sendBadResp(t, "CDS", fakeServer, versionCDS, goodCDSRequest) + versionCDS++ + + // send another good response, and check for ack, with the new version. + sendGoodResp(t, "CDS", fakeServer, versionCDS, goodCDSResponse1, goodCDSRequest, callbackCh) + versionCDS++ +}