xdsrouting: remove env variable for routing (#3754)
This commit is contained in:
@ -195,6 +195,10 @@ func NewConfig() (*Config, error) {
|
||||
// 2. Environment variable "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" is set to
|
||||
// true.
|
||||
// The default value of the enum type "version.TransportAPI" is v2.
|
||||
//
|
||||
// TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and
|
||||
// GRPC_XDS_EXPERIMENTAL_V3_SUPPORT. Move all env variables into a separate
|
||||
// package.
|
||||
if v3Env := os.Getenv(v3SupportEnv); v3Env == "true" {
|
||||
if serverSupportsV3 {
|
||||
config.TransportAPI = version.TransportV3
|
||||
|
@ -49,10 +49,6 @@ type Route struct {
|
||||
}
|
||||
|
||||
type rdsUpdate struct {
|
||||
// weightedCluster is only set when routing is disabled (env variable
|
||||
// GRPC_XDS_EXPERIMENTAL_ROUTING is not true).
|
||||
weightedCluster map[string]uint32
|
||||
|
||||
routes []*Route
|
||||
}
|
||||
type rdsCallbackFunc func(rdsUpdate, error)
|
||||
|
@ -54,7 +54,7 @@ func (s) TestRDSWatch(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
|
||||
wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: wantUpdate,
|
||||
})
|
||||
@ -113,7 +113,7 @@ func (s) TestRDSTwoWatchSameResourceName(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: wantUpdate,
|
||||
})
|
||||
@ -179,8 +179,8 @@ func (s) TestRDSThreeWatchDifferentResourceName(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
|
||||
wantUpdate1 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "1": 1}}
|
||||
wantUpdate2 := rdsUpdate{weightedCluster: map[string]uint32{testCDSName + "2": 1}}
|
||||
wantUpdate1 := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "1": 1}}}}
|
||||
wantUpdate2 := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName + "1": wantUpdate1,
|
||||
testRDSName + "2": wantUpdate2,
|
||||
@ -219,7 +219,7 @@ func (s) TestRDSWatchAfterCache(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
|
||||
wantUpdate := rdsUpdate{weightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: wantUpdate,
|
||||
})
|
||||
|
@ -25,14 +25,7 @@ import (
|
||||
|
||||
// ServiceUpdate contains update about the service.
|
||||
type ServiceUpdate struct {
|
||||
// WeightedCluster is a map from cluster names (CDS resource to watch) to
|
||||
// their weights.
|
||||
//
|
||||
// This field is only set when routing is disabled (env variable
|
||||
// GRPC_XDS_EXPERIMENTAL_ROUTING is not true).
|
||||
WeightedCluster map[string]uint32
|
||||
|
||||
// Routes
|
||||
// Routes contain matchers+actions to route RPCs.
|
||||
Routes []*Route
|
||||
}
|
||||
|
||||
@ -126,8 +119,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) {
|
||||
return
|
||||
}
|
||||
w.serviceCb(ServiceUpdate{
|
||||
WeightedCluster: update.weightedCluster,
|
||||
Routes: update.routes,
|
||||
Routes: update.routes,
|
||||
}, nil)
|
||||
}
|
||||
|
||||
|
@ -58,7 +58,7 @@ func (s) TestServiceWatch(t *testing.T) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
})
|
||||
|
||||
wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
|
||||
if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
@ -70,7 +70,7 @@ func (s) TestServiceWatch(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
|
||||
@ -116,7 +116,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
})
|
||||
|
||||
wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
|
||||
if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
@ -128,7 +128,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
|
||||
@ -145,17 +145,17 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
|
||||
|
||||
// Another update for the old name.
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout {
|
||||
t.Errorf("unexpected serviceUpdate: %v, %v, want channel recv timeout", u, err)
|
||||
}
|
||||
|
||||
wantUpdate2 := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "2": 1}}
|
||||
wantUpdate2 := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}}
|
||||
// RDS update for the new name.
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName + "2": {weightedCluster: map[string]uint32{testCDSName + "2": 1}},
|
||||
testRDSName + "2": {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "2": 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) {
|
||||
@ -183,7 +183,7 @@ func (s) TestServiceWatchSecond(t *testing.T) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
})
|
||||
|
||||
wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
|
||||
if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
@ -195,7 +195,7 @@ func (s) TestServiceWatchSecond(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
|
||||
@ -226,7 +226,7 @@ func (s) TestServiceWatchSecond(t *testing.T) {
|
||||
testLDSName: {routeName: testRDSName},
|
||||
})
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
|
||||
@ -263,8 +263,8 @@ func (s) TestServiceWatchWithNoResponseFromServer(t *testing.T) {
|
||||
|
||||
callbackCh := testutils.NewChannel()
|
||||
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
|
||||
if su.WeightedCluster != nil {
|
||||
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.WeightedCluster))
|
||||
if su.Routes != nil {
|
||||
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.Routes))
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
@ -307,8 +307,8 @@ func (s) TestServiceWatchEmptyRDS(t *testing.T) {
|
||||
|
||||
callbackCh := testutils.NewChannel()
|
||||
cancelWatch := xdsClient.WatchService(goodLDSTarget1, func(su ServiceUpdate, err error) {
|
||||
if su.WeightedCluster != nil {
|
||||
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.WeightedCluster))
|
||||
if su.Routes != nil {
|
||||
callbackCh.Send(fmt.Errorf("got WeightedCluster: %+v, want nil", su.Routes))
|
||||
return
|
||||
}
|
||||
if err == nil {
|
||||
@ -394,7 +394,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
})
|
||||
|
||||
wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
|
||||
if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
@ -406,7 +406,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
|
||||
@ -445,7 +445,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
|
||||
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
|
||||
})
|
||||
|
||||
wantUpdate := ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName: 1}}
|
||||
wantUpdate := ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}}
|
||||
|
||||
if _, err := v2Client.addWatches[ldsURL].Receive(); err != nil {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
@ -457,7 +457,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
|
||||
t.Fatalf("want new watch to start, got error %v", err)
|
||||
}
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName: 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName: 1}}}},
|
||||
})
|
||||
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) {
|
||||
@ -477,7 +477,7 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
|
||||
// Send RDS update for the removed LDS resource, expect no updates to
|
||||
// callback, because RDS should be canceled.
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new": 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new": 1}}}},
|
||||
})
|
||||
if u, err := serviceUpdateCh.Receive(); err != testutils.ErrRecvTimeout {
|
||||
t.Errorf("unexpected serviceUpdate: %v, want receiving from channel timeout", u)
|
||||
@ -497,9 +497,9 @@ func (s) TestServiceResourceRemoved(t *testing.T) {
|
||||
}
|
||||
|
||||
v2Client.r.newRDSUpdate(map[string]rdsUpdate{
|
||||
testRDSName: {weightedCluster: map[string]uint32{testCDSName + "new2": 1}},
|
||||
testRDSName: {routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}},
|
||||
})
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{WeightedCluster: map[string]uint32{testCDSName + "new2": 1}}, nil}, serviceCmpOpts...) {
|
||||
if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{ServiceUpdate{Routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{testCDSName + "new2": 1}}}}, nil}, serviceCmpOpts...) {
|
||||
t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err)
|
||||
}
|
||||
}
|
||||
|
@ -1,33 +0,0 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2020 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package client
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and
|
||||
// GRPC_XDS_EXPERIMENTAL_V3_SUPPORT, and this. Move all env variables into a
|
||||
// separate package.
|
||||
const routingEnabledConfigStr = "GRPC_XDS_EXPERIMENTAL_ROUTING"
|
||||
|
||||
// routing is enabled only if env variable is set to true. The default is false.
|
||||
// We may flip the default later.
|
||||
var routingEnabled = strings.EqualFold(os.Getenv(routingEnabledConfigStr), "true")
|
@ -97,51 +97,11 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host
|
||||
return rdsUpdate{}, fmt.Errorf("matched virtual host has no routes")
|
||||
}
|
||||
|
||||
// Keep the old code path for routing disabled.
|
||||
if routingEnabled {
|
||||
routes, err := routesProtoToSlice(vh.Routes, logger)
|
||||
if err != nil {
|
||||
return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err)
|
||||
}
|
||||
return rdsUpdate{routes: routes}, nil
|
||||
routes, err := routesProtoToSlice(vh.Routes, logger)
|
||||
if err != nil {
|
||||
return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err)
|
||||
}
|
||||
|
||||
dr := vh.Routes[len(vh.Routes)-1]
|
||||
match := dr.GetMatch()
|
||||
if match == nil {
|
||||
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route doesn't have a match")
|
||||
}
|
||||
if prefix := match.GetPrefix(); prefix != "" && prefix != "/" {
|
||||
// The matched virtual host is invalid. Match is not "" or "/".
|
||||
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route is %v, want Prefix empty string or /", match)
|
||||
}
|
||||
if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil && !caseSensitive.Value {
|
||||
// The case sensitive is set to false. Not set or set to true are both
|
||||
// valid.
|
||||
return rdsUpdate{}, fmt.Errorf("matched virtual host's default route set case-sensitive to false")
|
||||
}
|
||||
routeAction := dr.GetRoute()
|
||||
if routeAction == nil {
|
||||
return rdsUpdate{}, fmt.Errorf("matched route is nil")
|
||||
}
|
||||
|
||||
if wc := routeAction.GetWeightedClusters(); wc != nil {
|
||||
m, err := weightedClustersProtoToMap(wc)
|
||||
if err != nil {
|
||||
return rdsUpdate{}, fmt.Errorf("matched weighted cluster is invalid: %v", err)
|
||||
}
|
||||
return rdsUpdate{weightedCluster: m}, nil
|
||||
}
|
||||
|
||||
// When there's just one cluster, we set weightedCluster to map with one
|
||||
// entry. This mean we will build a weighted_target balancer even if there's
|
||||
// just one cluster.
|
||||
//
|
||||
// Otherwise, we will need to switch the top policy between weighted_target
|
||||
// and CDS. In case when the action changes between one cluster and multiple
|
||||
// clusters, changing top level policy means recreating TCP connection every
|
||||
// time.
|
||||
return rdsUpdate{weightedCluster: map[string]uint32{routeAction.GetCluster(): 1}}, nil
|
||||
return rdsUpdate{routes: routes}, nil
|
||||
}
|
||||
|
||||
func routesProtoToSlice(routes []*routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) {
|
||||
|
@ -158,7 +158,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
|
||||
{
|
||||
name: "good-route-config-with-empty-string-route",
|
||||
rc: goodRouteConfig1,
|
||||
wantUpdate: rdsUpdate{weightedCluster: map[string]uint32{goodClusterName1: 1}},
|
||||
wantUpdate: rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}}},
|
||||
},
|
||||
{
|
||||
// default route's match is not empty string, but "/".
|
||||
@ -173,7 +173,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
|
||||
Route: &routepb.RouteAction{
|
||||
ClusterSpecifier: &routepb.RouteAction_Cluster{Cluster: goodClusterName1},
|
||||
}}}}}}},
|
||||
wantUpdate: rdsUpdate{weightedCluster: map[string]uint32{goodClusterName1: 1}},
|
||||
wantUpdate: rdsUpdate{routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{goodClusterName1: 1}}}},
|
||||
},
|
||||
|
||||
{
|
||||
@ -217,7 +217,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
|
||||
},
|
||||
TotalWeight: &wrapperspb.UInt32Value{Value: 10},
|
||||
}}}}}}}}},
|
||||
wantUpdate: rdsUpdate{weightedCluster: map[string]uint32{"a": 2, "b": 3, "c": 5}},
|
||||
wantUpdate: rdsUpdate{routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{"a": 2, "b": 3, "c": 5}}}},
|
||||
},
|
||||
}
|
||||
|
||||
@ -243,59 +243,10 @@ func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakeserver.Server) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRDSHandleResponseWithRoutingEnabled starts a fake xDS server, makes a
|
||||
// ClientConn to it, and creates a v2Client using it. Then, it registers an LDS
|
||||
// and RDS watcher and tests different RDS responses.
|
||||
//
|
||||
// Routing is protected by an env variable. This test sets it to true, so the
|
||||
// new fields will be parsed.
|
||||
func (s) TestRDSHandleResponseWithRoutingEnabled(t *testing.T) {
|
||||
routingEnabled = true
|
||||
defer func() {
|
||||
routingEnabled = false
|
||||
}()
|
||||
tests := []struct {
|
||||
name string
|
||||
rdsResponse *xdspb.DiscoveryResponse
|
||||
wantErr bool
|
||||
wantUpdate *rdsUpdate
|
||||
wantUpdateErr bool
|
||||
}{
|
||||
// Response contains one good interesting RouteConfiguration.
|
||||
{
|
||||
name: "one-good-route-config",
|
||||
rdsResponse: goodRDSResponse1,
|
||||
wantErr: false,
|
||||
wantUpdate: &rdsUpdate{
|
||||
// Instead of just weighted targets when routing is disabled,
|
||||
// this result contains a route with perfix "", and action as
|
||||
// weighted targets.
|
||||
routes: []*Route{{
|
||||
Prefix: newStringP(""),
|
||||
Action: map[string]uint32{goodClusterName1: 1},
|
||||
}},
|
||||
},
|
||||
wantUpdateErr: false,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
testWatchHandle(t, &watchHandleTestcase{
|
||||
typeURL: rdsURL,
|
||||
resourceName: goodRouteName1,
|
||||
responseToHandle: test.rdsResponse,
|
||||
wantHandleErr: test.wantErr,
|
||||
wantUpdate: test.wantUpdate,
|
||||
wantUpdateErr: test.wantUpdateErr,
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestRDSHandleResponseWithRoutingDisabled starts a fake xDS server, makes a
|
||||
// ClientConn to it, and creates a v2Client using it. Then, it registers an LDS
|
||||
// and RDS watcher and tests different RDS responses.
|
||||
func (s) TestRDSHandleResponseWithRoutingDisabled(t *testing.T) {
|
||||
// TestRDSHandleResponseWithRouting starts a fake xDS server, makes a ClientConn
|
||||
// to it, and creates a v2Client using it. Then, it registers an LDS and RDS
|
||||
// watcher and tests different RDS responses.
|
||||
func (s) TestRDSHandleResponseWithRouting(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
rdsResponse *xdspb.DiscoveryResponse
|
||||
@ -342,7 +293,22 @@ func (s) TestRDSHandleResponseWithRoutingDisabled(t *testing.T) {
|
||||
name: "one-good-route-config",
|
||||
rdsResponse: goodRDSResponse1,
|
||||
wantErr: false,
|
||||
wantUpdate: &rdsUpdate{weightedCluster: map[string]uint32{goodClusterName1: 1}},
|
||||
wantUpdate: &rdsUpdate{routes: []*Route{{Prefix: newStringP(""), Action: map[string]uint32{goodClusterName1: 1}}}},
|
||||
wantUpdateErr: false,
|
||||
},
|
||||
{
|
||||
name: "one-good-route-config with routes",
|
||||
rdsResponse: goodRDSResponse1,
|
||||
wantErr: false,
|
||||
wantUpdate: &rdsUpdate{
|
||||
// Instead of just weighted targets when routing is disabled,
|
||||
// this result contains a route with perfix "", and action as
|
||||
// weighted targets.
|
||||
routes: []*Route{{
|
||||
Prefix: newStringP(""),
|
||||
Action: map[string]uint32{goodClusterName1: 1},
|
||||
}},
|
||||
},
|
||||
wantUpdateErr: false,
|
||||
},
|
||||
}
|
||||
|
@ -137,23 +137,6 @@ func weightedClusterToBalancerConfig(wc map[string]uint32) balancerConfig {
|
||||
return bc
|
||||
}
|
||||
|
||||
func weightedClusterToJSON(wc map[string]uint32) (string, error) {
|
||||
sc := serviceConfig{
|
||||
LoadBalancingConfig: weightedClusterToBalancerConfig(wc),
|
||||
}
|
||||
bs, err := json.Marshal(sc)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal json: %v", err)
|
||||
}
|
||||
return string(bs), nil
|
||||
}
|
||||
|
||||
func (r *xdsResolver) serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) {
|
||||
// If WeightedClusters is set, routing is disabled (by env variable). Use
|
||||
// weighted target only.
|
||||
if su.WeightedCluster != nil {
|
||||
return weightedClusterToJSON(su.WeightedCluster)
|
||||
}
|
||||
|
||||
return r.routesToJSON(su.Routes)
|
||||
}
|
||||
|
@ -32,31 +32,44 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
testCluster1 = "test-cluster-1"
|
||||
testClusterOnlyJSON = `{"loadBalancingConfig":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": { "test-cluster-1" : { "weight":1, "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } }
|
||||
}
|
||||
}]}`
|
||||
testCluster1 = "test-cluster-1"
|
||||
testOneClusterOnlyJSON = `{"loadBalancingConfig":[{
|
||||
"xds_routing_experimental":{
|
||||
"action":{
|
||||
"test-cluster-1_0":{
|
||||
"childPolicy":[{
|
||||
"weighted_target_experimental":{
|
||||
"targets":{
|
||||
"test-cluster-1":{
|
||||
"weight":1,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
|
||||
}
|
||||
}}}]
|
||||
}
|
||||
},
|
||||
"route":[{"prefix":"","action":"test-cluster-1_0"}]
|
||||
}}]}`
|
||||
testWeightedCDSJSON = `{"loadBalancingConfig":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": {
|
||||
"cluster_1" : {
|
||||
"weight":75,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
|
||||
},
|
||||
"cluster_2" : {
|
||||
"weight":25,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
|
||||
}
|
||||
}
|
||||
}
|
||||
}]}`
|
||||
testWeightedCDSNoChildJSON = `{"loadBalancingConfig":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": {}
|
||||
}
|
||||
}]}`
|
||||
"xds_routing_experimental":{
|
||||
"action":{
|
||||
"cluster_1_cluster_2_1":{
|
||||
"childPolicy":[{
|
||||
"weighted_target_experimental":{
|
||||
"targets":{
|
||||
"cluster_1":{
|
||||
"weight":75,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
|
||||
},
|
||||
"cluster_2":{
|
||||
"weight":25,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
|
||||
}
|
||||
}}}]
|
||||
}
|
||||
},
|
||||
"route":[{"prefix":"","action":"cluster_1_cluster_2_1"}]
|
||||
}}]}`
|
||||
|
||||
testRoutingJSON = `{"loadBalancingConfig":[{
|
||||
"xds_routing_experimental": {
|
||||
"action":{
|
||||
@ -178,51 +191,6 @@ const (
|
||||
`
|
||||
)
|
||||
|
||||
func TestWeightedClusterToJSON(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
wc map[string]uint32
|
||||
wantJSON string // wantJSON is not to be compared verbatim.
|
||||
}{
|
||||
{
|
||||
name: "one cluster only",
|
||||
wc: map[string]uint32{testCluster1: 1},
|
||||
wantJSON: testClusterOnlyJSON,
|
||||
},
|
||||
{
|
||||
name: "empty weighted clusters",
|
||||
wc: nil,
|
||||
wantJSON: testWeightedCDSNoChildJSON,
|
||||
},
|
||||
{
|
||||
name: "weighted clusters",
|
||||
wc: map[string]uint32{
|
||||
"cluster_1": 75,
|
||||
"cluster_2": 25,
|
||||
},
|
||||
wantJSON: testWeightedCDSJSON,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotJSON, err := weightedClusterToJSON(tt.wc)
|
||||
if err != nil {
|
||||
t.Errorf("serviceUpdateToJSON returned error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
gotParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(gotJSON)
|
||||
wantParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(tt.wantJSON)
|
||||
|
||||
if !internal.EqualServiceConfigForTesting(gotParsed.Config, wantParsed.Config) {
|
||||
t.Errorf("serviceUpdateToJSON() = %v, want %v", gotJSON, tt.wantJSON)
|
||||
t.Error("gotParsed: ", cmp.Diff(nil, gotParsed))
|
||||
t.Error("wantParsed: ", cmp.Diff(nil, wantParsed))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoutesToJSON(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
@ -348,15 +316,6 @@ func TestServiceUpdateToJSON(t *testing.T) {
|
||||
wantJSON string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "weighted clusters",
|
||||
su: client.ServiceUpdate{WeightedCluster: map[string]uint32{
|
||||
"cluster_1": 75,
|
||||
"cluster_2": 25,
|
||||
}},
|
||||
wantJSON: testWeightedCDSJSON,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "routing",
|
||||
su: client.ServiceUpdate{
|
||||
|
@ -272,7 +272,7 @@ func TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
|
||||
// Call the watchAPI callback after closing the resolver, and make sure no
|
||||
// update is triggerred on the ClientConn.
|
||||
xdsR.Close()
|
||||
xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{WeightedCluster: map[string]uint32{cluster: 1}}, nil)
|
||||
xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}, nil)
|
||||
if gotVal, gotErr := tcc.stateCh.Receive(); gotErr != testutils.ErrRecvTimeout {
|
||||
t.Fatalf("ClientConn.UpdateState called after xdsResolver is closed: %v", gotVal)
|
||||
}
|
||||
@ -316,20 +316,21 @@ func TestXDSResolverGoodServiceUpdate(t *testing.T) {
|
||||
}()
|
||||
|
||||
waitForWatchService(t, xdsC, targetStr)
|
||||
defer replaceRandNumGenerator(0)()
|
||||
|
||||
for _, tt := range []struct {
|
||||
su client.ServiceUpdate
|
||||
wantJSON string
|
||||
}{
|
||||
{
|
||||
su: client.ServiceUpdate{WeightedCluster: map[string]uint32{testCluster1: 1}},
|
||||
wantJSON: testClusterOnlyJSON,
|
||||
su: client.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{testCluster1: 1}}}},
|
||||
wantJSON: testOneClusterOnlyJSON,
|
||||
},
|
||||
{
|
||||
su: client.ServiceUpdate{WeightedCluster: map[string]uint32{
|
||||
su: client.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{
|
||||
"cluster_1": 75,
|
||||
"cluster_2": 25,
|
||||
}},
|
||||
}}}},
|
||||
wantJSON: testWeightedCDSJSON,
|
||||
},
|
||||
} {
|
||||
@ -382,7 +383,7 @@ func TestXDSResolverGoodUpdateAfterError(t *testing.T) {
|
||||
|
||||
// Invoke the watchAPI callback with a good service update and wait for the
|
||||
// UpdateState method to be called on the ClientConn.
|
||||
xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{WeightedCluster: map[string]uint32{cluster: 1}}, nil)
|
||||
xdsC.InvokeWatchServiceCallback(xdsclient.ServiceUpdate{Routes: []*client.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}, nil)
|
||||
gotState, err := tcc.stateCh.Receive()
|
||||
if err != nil {
|
||||
t.Fatalf("ClientConn.UpdateState returned error: %v", err)
|
||||
|
Reference in New Issue
Block a user