xds: resolver test cleanup (#3233)

This PR also adds a new testutils directory with a fake client and a
channel which supports a timed receive operation. In follow up PRs, we
can move other common test stuff like the fake server etc to this
directory and cleanup more tests.
This commit is contained in:
Easwar Swaminathan
2019-12-10 11:20:06 -08:00
committed by GitHub
parent a4e9d9553d
commit 11399004a9
3 changed files with 207 additions and 115 deletions

View File

@ -24,13 +24,16 @@ import (
"fmt"
"net"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
xdsinternal "google.golang.org/grpc/xds/internal"
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
"google.golang.org/grpc/xds/internal/testutils"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
)
@ -55,62 +58,26 @@ var (
// event through a channel.
type testClientConn struct {
resolver.ClientConn
stateCh chan struct{}
errorCh chan struct{}
gotState resolver.State
stateCh *testutils.Channel
errorCh *testutils.Channel
}
func (t *testClientConn) UpdateState(s resolver.State) {
t.gotState = s
t.stateCh <- struct{}{}
t.stateCh.Send(s)
}
func (t *testClientConn) ReportError(err error) {
t.errorCh <- struct{}{}
t.errorCh.Send(err)
}
func (*testClientConn) ParseServiceConfig(_ string) *serviceconfig.ParseResult {
return &serviceconfig.ParseResult{}
// TODO: Uncomment this once we have a "experimental_cds" balancer.
// return internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC)
func (t *testClientConn) ParseServiceConfig(jsonSC string) *serviceconfig.ParseResult {
return internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(jsonSC)
}
func newTestClientConn() *testClientConn {
return &testClientConn{
stateCh: make(chan struct{}, 1),
errorCh: make(chan struct{}, 1),
}
}
// fakeXDSClient is a fake implementation of the xdsClientInterface interface.
// All it does is to store the parameters received in the watch call and
// signals various events by sending on channels.
type fakeXDSClient struct {
gotTarget string
gotCallback func(xdsclient.ServiceUpdate, error)
suCh chan struct{}
cancel chan struct{}
done chan struct{}
}
func (f *fakeXDSClient) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() {
f.gotTarget = target
f.gotCallback = callback
f.suCh <- struct{}{}
return func() {
f.cancel <- struct{}{}
}
}
func (f *fakeXDSClient) Close() {
f.done <- struct{}{}
}
func newFakeXDSClient() *fakeXDSClient {
return &fakeXDSClient{
suCh: make(chan struct{}, 1),
cancel: make(chan struct{}, 1),
done: make(chan struct{}, 1),
stateCh: testutils.NewChannel(),
errorCh: testutils.NewChannel(),
}
}
@ -130,7 +97,7 @@ func getXDSClientMakerFunc(wantOpts xdsclient.Options) func(xdsclient.Options) (
if len(gotOpts.DialOpts) != len(wantOpts.DialOpts) {
return nil, fmt.Errorf("got len(DialOpts): %v, want: %v", len(gotOpts.DialOpts), len(wantOpts.DialOpts))
}
return newFakeXDSClient(), nil
return testutils.NewXDSClient(), nil
}
}
@ -237,6 +204,8 @@ type setupOpts struct {
}
func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, func()) {
t.Helper()
oldConfigMaker := newXDSConfig
newXDSConfig = func() *bootstrap.Config { return opts.config }
oldClientMaker := newXDSClient
@ -259,107 +228,93 @@ func testSetup(t *testing.T, opts setupOpts) (*xdsResolver, *testClientConn, fun
return r.(*xdsResolver), tcc, cancel
}
// waitForWatchService waits for the WatchService method to be called on the
// xdsClient within a reasonable amount of time, and also verifies that the
// watch is called with the expected target.
func waitForWatchService(t *testing.T, xdsC *testutils.XDSClient, wantTarget string) {
t.Helper()
gotTarget, err := xdsC.WaitForWatchService()
if err != nil {
t.Fatalf("xdsClient.WatchService failed with error: %v", err)
}
if gotTarget != wantTarget {
t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", gotTarget, wantTarget)
}
}
// TestXDSResolverWatchCallbackAfterClose tests the case where a service update
// from the underlying xdsClient is received after the resolver is closed.
func TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
fakeXDSClient := newFakeXDSClient()
xdsC := testutils.NewXDSClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
config: &validConfig,
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return fakeXDSClient, nil },
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
})
defer cancel()
// Wait for the WatchService method to be called on the xdsClient.
<-fakeXDSClient.suCh
if fakeXDSClient.gotTarget != targetStr {
t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", fakeXDSClient.gotTarget, targetStr)
}
waitForWatchService(t, xdsC, targetStr)
// Call the watchAPI callback after closing the resolver.
// Call the watchAPI callback after closing the resolver, and make sure no
// update is triggerred on the ClientConn.
xdsR.Close()
fakeXDSClient.gotCallback(xdsclient.ServiceUpdate{Cluster: cluster}, nil)
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
case <-tcc.stateCh:
timer.Stop()
t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed")
xdsC.InvokeWatchServiceCb(cluster, nil)
if gotVal, gotErr := tcc.stateCh.Receive(); gotErr != testutils.ErrRecvTimeout {
t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal)
}
}
// TestXDSResolverBadServiceUpdate tests the case the xdsClient returns a bad
// service update.
func TestXDSResolverBadServiceUpdate(t *testing.T) {
fakeXDSClient := newFakeXDSClient()
xdsC := testutils.NewXDSClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
config: &validConfig,
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return fakeXDSClient, nil },
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
})
defer cancel()
defer xdsR.Close()
defer func() {
cancel()
xdsR.Close()
}()
// Wait for the WatchService method to be called on the xdsClient.
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
t.Fatal("Timeout when waiting for WatchService to be called on xdsClient")
case <-fakeXDSClient.suCh:
if !timer.Stop() {
<-timer.C
}
}
if fakeXDSClient.gotTarget != targetStr {
t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", fakeXDSClient.gotTarget, targetStr)
}
waitForWatchService(t, xdsC, targetStr)
// Invoke the watchAPI callback with a bad service update.
fakeXDSClient.gotCallback(xdsclient.ServiceUpdate{Cluster: ""}, errors.New("bad serviceupdate"))
// Wait for the ReportError method to be called on the ClientConn.
timer = time.NewTimer(1 * time.Second)
select {
case <-timer.C:
t.Fatal("Timeout when waiting for UpdateState to be called on the ClientConn")
case <-tcc.errorCh:
timer.Stop()
// Invoke the watchAPI callback with a bad service update and wait for the
// ReportError method to be called on the ClientConn.
suErr := errors.New("bad serviceupdate")
xdsC.InvokeWatchServiceCb("", suErr)
if gotErrVal, gotErr := tcc.errorCh.Receive(); gotErr != nil || gotErrVal != suErr {
t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr)
}
}
// TestXDSResolverGoodServiceUpdate tests the happy case where the resolver
// gets a good service update from the xdsClient.
func TestXDSResolverGoodServiceUpdate(t *testing.T) {
fakeXDSClient := newFakeXDSClient()
xdsC := testutils.NewXDSClient()
xdsR, tcc, cancel := testSetup(t, setupOpts{
config: &validConfig,
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return fakeXDSClient, nil },
xdsClientFunc: func(_ xdsclient.Options) (xdsClientInterface, error) { return xdsC, nil },
})
defer cancel()
defer xdsR.Close()
defer func() {
cancel()
xdsR.Close()
}()
// Wait for the WatchService method to be called on the xdsClient.
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
t.Fatal("Timeout when waiting for WatchService to be called on xdsClient")
case <-fakeXDSClient.suCh:
if !timer.Stop() {
<-timer.C
}
waitForWatchService(t, xdsC, targetStr)
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchServiceCb(cluster, nil)
gotState, err := tcc.stateCh.Receive()
if err != nil {
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
}
if fakeXDSClient.gotTarget != targetStr {
t.Fatalf("xdsClient.WatchService() called with target: %v, want %v", fakeXDSClient.gotTarget, targetStr)
rState := gotState.(resolver.State)
if gotClient := rState.Attributes.Value(xdsinternal.XDSClientID); gotClient != xdsC {
t.Fatalf("ClientConn.UpdateState got xdsClient: %v, want %v", gotClient, xdsC)
}
// Invoke the watchAPI callback with a good service update.
fakeXDSClient.gotCallback(xdsclient.ServiceUpdate{Cluster: cluster}, nil)
// Wait for the UpdateState method to be called on the ClientConn.
timer = time.NewTimer(1 * time.Second)
select {
case <-timer.C:
t.Fatal("Timeout when waiting for ReportError to be called on the ClientConn")
case <-tcc.stateCh:
timer.Stop()
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
}

View File

@ -0,0 +1,70 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package testutils provides multiple utility types, for use in xds tests.
package testutils
import (
"errors"
"time"
)
// ErrRecvTimeout is an error to indicate that a receive operation on the
// channel timed out.
var ErrRecvTimeout = errors.New("timed out when waiting for value on channel")
const (
// DefaultChanRecvTimeout is the default timeout for receive operations on the
// underlying channel.
DefaultChanRecvTimeout = 1 * time.Second
// DefaultChanBufferSize is the default buffer size of the underlying channel.
DefaultChanBufferSize = 1
)
// Channel wraps a generic channel and provides a timed receive operation.
type Channel struct {
ch chan interface{}
}
// Send sends value on the underlying channel.
func (cwt *Channel) Send(value interface{}) {
cwt.ch <- value
}
// TimedReceive returns the value received on the underlying channel, or
// ErrRecvTimeout if timeout amount of time elapsed.
func (cwt *Channel) TimedReceive(timeout time.Duration) (interface{}, error) {
timer := time.NewTimer(timeout)
select {
case <-timer.C:
return nil, ErrRecvTimeout
case got := <-cwt.ch:
timer.Stop()
return got, nil
}
}
// Receive returns the value received on the underlying channel, or
// ErrRecvTimeout if DefaultChanRecvTimeout amount of time elapses.
func (cwt *Channel) Receive() (interface{}, error) {
return cwt.TimedReceive(DefaultChanRecvTimeout)
}
// NewChannel returns a new Channel.
func NewChannel() *Channel {
return &Channel{ch: make(chan interface{}, DefaultChanBufferSize)}
}

View File

@ -0,0 +1,67 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import (
xdsclient "google.golang.org/grpc/xds/internal/client"
)
// XDSClient is a fake implementation of an xds client. It exposes a bunch of
// channels to signal the occurrence of various events.
type XDSClient struct {
serviceCb func(xdsclient.ServiceUpdate, error)
suWatchCh *Channel
closeCh *Channel
suCancelCh *Channel
}
// WatchService registers a LDS/RDS watch.
func (xdsC *XDSClient) WatchService(target string, callback func(xdsclient.ServiceUpdate, error)) func() {
xdsC.serviceCb = callback
xdsC.suWatchCh.Send(target)
return func() {
xdsC.suCancelCh.Send(nil)
}
}
// WaitForWatchService waits for WatchService to be invoked on this client
// within a reasonable timeout.
func (xdsC *XDSClient) WaitForWatchService() (string, error) {
val, err := xdsC.suWatchCh.Receive()
return val.(string), err
}
// InvokeWatchServiceCb invokes the registered service watch callback.
func (xdsC *XDSClient) InvokeWatchServiceCb(cluster string, err error) {
xdsC.serviceCb(xdsclient.ServiceUpdate{Cluster: cluster}, err)
}
// Close closes the xds client.
func (xdsC *XDSClient) Close() {
xdsC.closeCh.Send(nil)
}
// NewXDSClient returns a new fake xds client.
func NewXDSClient() *XDSClient {
return &XDSClient{
suWatchCh: NewChannel(),
closeCh: NewChannel(),
suCancelCh: NewChannel(),
}
}