From ca3959a1b21a47ae53b2d5df691fb61d2f40a36c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 20 Jul 2020 14:04:46 -0700 Subject: [PATCH] xdsrouting: handle route fields in xds_client (#3747) --- xds/internal/client/client_watchers_rds.go | 12 + .../client/client_watchers_service.go | 7 + .../client/client_watchers_service_test.go | 21 +- xds/internal/client/envconfig.go | 33 +++ xds/internal/client/v2client_rds.go | 131 +++++++++- xds/internal/client/v2client_rds_test.go | 225 +++++++++++++++++- 6 files changed, 416 insertions(+), 13 deletions(-) create mode 100644 xds/internal/client/envconfig.go diff --git a/xds/internal/client/client_watchers_rds.go b/xds/internal/client/client_watchers_rds.go index 4c8f5d42..91de7860 100644 --- a/xds/internal/client/client_watchers_rds.go +++ b/xds/internal/client/client_watchers_rds.go @@ -40,8 +40,20 @@ type HeaderMatcher struct { PresentMatch *bool `json:"presentMatch,omitempty"` } +// Route represents route with matchers and action. +type Route struct { + Path, Prefix, Regex *string + Headers []*HeaderMatcher + Fraction *uint32 + Action map[string]uint32 // action is weighted clusters. +} + type rdsUpdate struct { + // weightedCluster is only set when routing is disabled (env variable + // GRPC_XDS_EXPERIMENTAL_ROUTING is not true). weightedCluster map[string]uint32 + + routes []*Route } type rdsCallbackFunc func(rdsUpdate, error) diff --git a/xds/internal/client/client_watchers_service.go b/xds/internal/client/client_watchers_service.go index 1eea1f31..1cf4c0f9 100644 --- a/xds/internal/client/client_watchers_service.go +++ b/xds/internal/client/client_watchers_service.go @@ -27,7 +27,13 @@ import ( type ServiceUpdate struct { // WeightedCluster is a map from cluster names (CDS resource to watch) to // their weights. + // + // This field is only set when routing is disabled (env variable + // GRPC_XDS_EXPERIMENTAL_ROUTING is not true). WeightedCluster map[string]uint32 + + // Routes + Routes []*Route } // WatchService uses LDS and RDS to discover information about the provided @@ -121,6 +127,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update rdsUpdate, err error) { } w.serviceCb(ServiceUpdate{ WeightedCluster: update.weightedCluster, + Routes: update.routes, }, nil) } diff --git a/xds/internal/client/client_watchers_service_test.go b/xds/internal/client/client_watchers_service_test.go index 37d7fb45..4535285b 100644 --- a/xds/internal/client/client_watchers_service_test.go +++ b/xds/internal/client/client_watchers_service_test.go @@ -40,8 +40,7 @@ var serviceCmpOpts = []cmp.Option{cmp.AllowUnexported(serviceUpdateErr{}), cmpop // TestServiceWatch covers the cases: // - an update is received after a watch() -// - an update for another resource name (which doesn't trigger callback) -// - an upate is received after cancel() +// - an update with routes received func (s) TestServiceWatch(t *testing.T) { v2ClientCh, cleanup := overrideNewXDSV2Client() defer cleanup() @@ -77,6 +76,24 @@ func (s) TestServiceWatch(t *testing.T) { if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate, nil}, serviceCmpOpts...) { t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err) } + + wantUpdate2 := ServiceUpdate{ + Routes: []*Route{{ + Prefix: newStringP(""), + Action: map[string]uint32{testCDSName: 1}, + }}, + } + v2Client.r.newRDSUpdate(map[string]rdsUpdate{ + testRDSName: { + routes: []*Route{{ + Prefix: newStringP(""), + Action: map[string]uint32{testCDSName: 1}, + }}, + }, + }) + if u, err := serviceUpdateCh.Receive(); err != nil || !cmp.Equal(u, serviceUpdateErr{wantUpdate2, nil}, serviceCmpOpts...) { + t.Errorf("unexpected serviceUpdate: %v, error receiving from channel: %v", u, err) + } } // TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS diff --git a/xds/internal/client/envconfig.go b/xds/internal/client/envconfig.go new file mode 100644 index 00000000..40f448e6 --- /dev/null +++ b/xds/internal/client/envconfig.go @@ -0,0 +1,33 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client + +import ( + "os" + "strings" +) + +// TODO: there are multiple env variables, GRPC_XDS_BOOTSTRAP and +// GRPC_XDS_EXPERIMENTAL_V3_SUPPORT, and this. Move all env variables into a +// separate package. +const routingEnabledConfigStr = "GRPC_XDS_EXPERIMENTAL_ROUTING" + +// routing is enabled only if env variable is set to true. The default is false. +// We may flip the default later. +var routingEnabled = strings.EqualFold(os.Getenv(routingEnabledConfigStr), "true") diff --git a/xds/internal/client/v2client_rds.go b/xds/internal/client/v2client_rds.go index 3650c045..1fc9ac97 100644 --- a/xds/internal/client/v2client_rds.go +++ b/xds/internal/client/v2client_rds.go @@ -24,7 +24,9 @@ import ( xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" + typepb "github.com/envoyproxy/go-control-plane/envoy/type" "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/internal/grpclog" ) // handleRDSResponse processes an RDS response received from the xDS server. On @@ -48,7 +50,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { v2c.logger.Infof("Resource with name: %v, type: %T, contains: %v. Picking routes for current watching hostname %v", rc.GetName(), rc, rc, v2c.hostname) // Use the hostname (resourceName for LDS) to find the routes. - u, err := generateRDSUpdateFromRouteConfiguration(rc, hostname) + u, err := generateRDSUpdateFromRouteConfiguration(rc, hostname, v2c.logger) if err != nil { return fmt.Errorf("xds: received invalid RouteConfiguration in RDS response: %+v with err: %v", rc, err) } @@ -76,7 +78,7 @@ func (v2c *v2Client) handleRDSResponse(resp *xdspb.DiscoveryResponse) error { // field must be empty and whose route field must be set. Inside that route // message, the cluster field will contain the clusterName or weighted clusters // we are looking for. -func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host string) (rdsUpdate, error) { +func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host string, logger *grpclog.PrefixLogger) (rdsUpdate, error) { // // Currently this returns "" on error, and the caller will return an error. // But the error doesn't contain details of why the response is invalid @@ -94,6 +96,16 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host // should be at least one default route. return rdsUpdate{}, fmt.Errorf("matched virtual host has no routes") } + + // Keep the old code path for routing disabled. + if routingEnabled { + routes, err := routesProtoToSlice(vh.Routes, logger) + if err != nil { + return rdsUpdate{}, fmt.Errorf("received route is invalid: %v", err) + } + return rdsUpdate{routes: routes}, nil + } + dr := vh.Routes[len(vh.Routes)-1] match := dr.GetMatch() if match == nil { @@ -108,12 +120,12 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host // valid. return rdsUpdate{}, fmt.Errorf("matched virtual host's default route set case-sensitive to false") } - route := dr.GetRoute() - if route == nil { + routeAction := dr.GetRoute() + if routeAction == nil { return rdsUpdate{}, fmt.Errorf("matched route is nil") } - if wc := route.GetWeightedClusters(); wc != nil { + if wc := routeAction.GetWeightedClusters(); wc != nil { m, err := weightedClustersProtoToMap(wc) if err != nil { return rdsUpdate{}, fmt.Errorf("matched weighted cluster is invalid: %v", err) @@ -129,7 +141,114 @@ func generateRDSUpdateFromRouteConfiguration(rc *xdspb.RouteConfiguration, host // and CDS. In case when the action changes between one cluster and multiple // clusters, changing top level policy means recreating TCP connection every // time. - return rdsUpdate{weightedCluster: map[string]uint32{route.GetCluster(): 1}}, nil + return rdsUpdate{weightedCluster: map[string]uint32{routeAction.GetCluster(): 1}}, nil +} + +func routesProtoToSlice(routes []*routepb.Route, logger *grpclog.PrefixLogger) ([]*Route, error) { + var routesRet []*Route + + for _, r := range routes { + match := r.GetMatch() + if match == nil { + return nil, fmt.Errorf("route %+v doesn't have a match", r) + } + + if len(match.GetQueryParameters()) != 0 { + // Ignore route with query parameters. + logger.Warningf("route %+v has query parameter matchers, the route will be ignored", r) + continue + } + + if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil && !caseSensitive.Value { + return nil, fmt.Errorf("route %+v has case-sensitive false", r) + } + + pathSp := match.GetPathSpecifier() + if pathSp == nil { + return nil, fmt.Errorf("route %+v doesn't have a path specifier", r) + } + + var route Route + switch pt := pathSp.(type) { + case *routepb.RouteMatch_Prefix: + route.Prefix = &pt.Prefix + case *routepb.RouteMatch_Path: + route.Path = &pt.Path + case *routepb.RouteMatch_SafeRegex: + route.Regex = &pt.SafeRegex.Regex + case *routepb.RouteMatch_Regex: + return nil, fmt.Errorf("route %+v has Regex, expected SafeRegex instead", r) + default: + logger.Warningf("route %+v has an unrecognized path specifier: %+v", r, pt) + continue + } + + for _, h := range match.GetHeaders() { + var header HeaderMatcher + switch ht := h.GetHeaderMatchSpecifier().(type) { + case *routepb.HeaderMatcher_ExactMatch: + header.ExactMatch = &ht.ExactMatch + case *routepb.HeaderMatcher_SafeRegexMatch: + header.RegexMatch = &ht.SafeRegexMatch.Regex + case *routepb.HeaderMatcher_RangeMatch: + header.RangeMatch = &Int64Range{ + Start: ht.RangeMatch.Start, + End: ht.RangeMatch.End, + } + case *routepb.HeaderMatcher_PresentMatch: + header.PresentMatch = &ht.PresentMatch + case *routepb.HeaderMatcher_PrefixMatch: + header.PrefixMatch = &ht.PrefixMatch + case *routepb.HeaderMatcher_SuffixMatch: + header.SuffixMatch = &ht.SuffixMatch + case *routepb.HeaderMatcher_RegexMatch: + return nil, fmt.Errorf("route %+v has a header matcher with Regex, expected SafeRegex instead", r) + default: + logger.Warningf("route %+v has an unrecognized header matcher: %+v", r, ht) + continue + } + header.Name = h.GetName() + invert := h.GetInvertMatch() + header.InvertMatch = &invert + route.Headers = append(route.Headers, &header) + } + + if fr := match.GetRuntimeFraction(); fr != nil { + d := fr.GetDefaultValue() + n := d.GetNumerator() + switch d.GetDenominator() { + case typepb.FractionalPercent_HUNDRED: + n *= 10000 + case typepb.FractionalPercent_TEN_THOUSAND: + n *= 100 + case typepb.FractionalPercent_MILLION: + } + route.Fraction = &n + } + + clusters := make(map[string]uint32) + switch a := r.GetRoute().GetClusterSpecifier().(type) { + case *routepb.RouteAction_Cluster: + clusters[a.Cluster] = 1 + case *routepb.RouteAction_WeightedClusters: + wcs := a.WeightedClusters + var totalWeight uint32 + for _, c := range wcs.Clusters { + w := c.GetWeight().GetValue() + clusters[c.GetName()] = w + totalWeight += w + } + if totalWeight != wcs.GetTotalWeight().GetValue() { + return nil, fmt.Errorf("route %+v, action %+v, weights of clusters do not add up to total total weight, got: %v, want %v", r, a, wcs.GetTotalWeight().GetValue(), totalWeight) + } + case *routepb.RouteAction_ClusterHeader: + continue + } + + route.Action = clusters + routesRet = append(routesRet, &route) + } + return routesRet, nil } func weightedClustersProtoToMap(wc *routepb.WeightedCluster) (map[string]uint32, error) { diff --git a/xds/internal/client/v2client_rds_test.go b/xds/internal/client/v2client_rds_test.go index a099e32c..e3ec4e01 100644 --- a/xds/internal/client/v2client_rds_test.go +++ b/xds/internal/client/v2client_rds_test.go @@ -23,10 +23,13 @@ import ( "time" xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" routepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" + typepb "github.com/envoyproxy/go-control-plane/envoy/type" "github.com/golang/protobuf/proto" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/testutils/fakeserver" ) @@ -220,7 +223,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, goodLDSTarget1) + gotUpdate, gotError := generateRDSUpdateFromRouteConfiguration(test.rc, goodLDSTarget1, nil) if !cmp.Equal(gotUpdate, test.wantUpdate, cmp.AllowUnexported(rdsUpdate{})) || (gotError != nil) != test.wantError { t.Errorf("generateRDSUpdateFromRouteConfiguration(%+v, %v) = %v, want %v", test.rc, goodLDSTarget1, gotUpdate, test.wantUpdate) } @@ -240,10 +243,59 @@ func doLDS(t *testing.T, v2c *v2Client, fakeServer *fakeserver.Server) { } } -// TestRDSHandleResponse starts a fake xDS server, makes a ClientConn to it, -// and creates a v2Client using it. Then, it registers an LDS and RDS watcher -// and tests different RDS responses. -func (s) TestRDSHandleResponse(t *testing.T) { +// TestRDSHandleResponseWithRoutingEnabled starts a fake xDS server, makes a +// ClientConn to it, and creates a v2Client using it. Then, it registers an LDS +// and RDS watcher and tests different RDS responses. +// +// Routing is protected by an env variable. This test sets it to true, so the +// new fields will be parsed. +func (s) TestRDSHandleResponseWithRoutingEnabled(t *testing.T) { + routingEnabled = true + defer func() { + routingEnabled = false + }() + tests := []struct { + name string + rdsResponse *xdspb.DiscoveryResponse + wantErr bool + wantUpdate *rdsUpdate + wantUpdateErr bool + }{ + // Response contains one good interesting RouteConfiguration. + { + name: "one-good-route-config", + rdsResponse: goodRDSResponse1, + wantErr: false, + wantUpdate: &rdsUpdate{ + // Instead of just weighted targets when routing is disabled, + // this result contains a route with perfix "", and action as + // weighted targets. + routes: []*Route{{ + Prefix: newStringP(""), + Action: map[string]uint32{goodClusterName1: 1}, + }}, + }, + wantUpdateErr: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testWatchHandle(t, &watchHandleTestcase{ + typeURL: rdsURL, + resourceName: goodRouteName1, + responseToHandle: test.rdsResponse, + wantHandleErr: test.wantErr, + wantUpdate: test.wantUpdate, + wantUpdateErr: test.wantUpdateErr, + }) + }) + } +} + +// TestRDSHandleResponseWithRoutingDisabled starts a fake xDS server, makes a +// ClientConn to it, and creates a v2Client using it. Then, it registers an LDS +// and RDS watcher and tests different RDS responses. +func (s) TestRDSHandleResponseWithRoutingDisabled(t *testing.T) { tests := []struct { name string rdsResponse *xdspb.DiscoveryResponse @@ -516,3 +568,166 @@ func (s) TestWeightedClustersProtoToMap(t *testing.T) { }) } } + +func TestRoutesProtoToSlice(t *testing.T) { + tests := []struct { + name string + routes []*routepb.Route + wantRoutes []*Route + wantErr bool + }{ + { + name: "no path", + routes: []*routepb.Route{{ + Match: &routepb.RouteMatch{}, + }}, + wantErr: true, + }, + { + name: "path is regex instead of saferegex", + routes: []*routepb.Route{{ + Match: &routepb.RouteMatch{ + PathSpecifier: &routepb.RouteMatch_Regex{Regex: "*"}, + }, + }}, + wantErr: true, + }, + { + name: "header contains regex", + routes: []*routepb.Route{{ + Match: &routepb.RouteMatch{ + PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/"}, + Headers: []*routepb.HeaderMatcher{{ + Name: "th", + HeaderMatchSpecifier: &routepb.HeaderMatcher_RegexMatch{ + RegexMatch: "*", + }, + }}, + }, + }}, + wantErr: true, + }, + { + name: "case_sensitive is false", + routes: []*routepb.Route{{ + Match: &routepb.RouteMatch{ + PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/"}, + CaseSensitive: &wrapperspb.BoolValue{Value: false}, + }, + }}, + wantErr: true, + }, + { + name: "good", + routes: []*routepb.Route{ + { + Match: &routepb.RouteMatch{ + PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/a/"}, + Headers: []*routepb.HeaderMatcher{ + { + Name: "th", + HeaderMatchSpecifier: &routepb.HeaderMatcher_PrefixMatch{ + PrefixMatch: "tv", + }, + InvertMatch: true, + }, + }, + RuntimeFraction: &corepb.RuntimeFractionalPercent{ + DefaultValue: &typepb.FractionalPercent{ + Numerator: 1, + Denominator: typepb.FractionalPercent_HUNDRED, + }, + }, + }, + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_WeightedClusters{ + WeightedClusters: &routepb.WeightedCluster{ + Clusters: []*routepb.WeightedCluster_ClusterWeight{ + {Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}}, + {Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}}, + }, + TotalWeight: &wrapperspb.UInt32Value{Value: 100}, + }}}}, + }, + }, + wantRoutes: []*Route{{ + Prefix: newStringP("/a/"), + Headers: []*HeaderMatcher{ + { + Name: "th", + InvertMatch: newBoolP(true), + PrefixMatch: newStringP("tv"), + }, + }, + Fraction: newUInt32P(10000), + Action: map[string]uint32{"A": 40, "B": 60}, + }}, + wantErr: false, + }, + { + name: "query is ignored", + routes: []*routepb.Route{ + { + Match: &routepb.RouteMatch{ + PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/a/"}, + }, + Action: &routepb.Route_Route{ + Route: &routepb.RouteAction{ + ClusterSpecifier: &routepb.RouteAction_WeightedClusters{ + WeightedClusters: &routepb.WeightedCluster{ + Clusters: []*routepb.WeightedCluster_ClusterWeight{ + {Name: "B", Weight: &wrapperspb.UInt32Value{Value: 60}}, + {Name: "A", Weight: &wrapperspb.UInt32Value{Value: 40}}, + }, + TotalWeight: &wrapperspb.UInt32Value{Value: 100}, + }}}}, + }, + { + Name: "with_query", + Match: &routepb.RouteMatch{ + PathSpecifier: &routepb.RouteMatch_Prefix{Prefix: "/b/"}, + QueryParameters: []*routepb.QueryParameterMatcher{{Name: "route_will_be_ignored"}}, + }, + }, + }, + // Only one route in the result, because the second one with query + // parameters is ignored. + wantRoutes: []*Route{{ + Prefix: newStringP("/a/"), + Action: map[string]uint32{"A": 40, "B": 60}, + }}, + wantErr: false, + }, + } + + cmpOpts := []cmp.Option{ + cmp.AllowUnexported(Route{}, HeaderMatcher{}, Int64Range{}), + cmpopts.EquateEmpty(), + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := routesProtoToSlice(tt.routes, nil) + if (err != nil) != tt.wantErr { + t.Errorf("routesProtoToSlice() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !cmp.Equal(got, tt.wantRoutes, cmpOpts...) { + t.Errorf("routesProtoToSlice() got = %v, want %v, diff: %v", got, tt.wantRoutes, cmp.Diff(got, tt.wantRoutes, cmpOpts...)) + } + }) + } +} + +func newStringP(s string) *string { + return &s +} + +func newUInt32P(i uint32) *uint32 { + return &i +} + +func newBoolP(b bool) *bool { + return &b +}