xdsrouting: resolver to generate service config with routes, and pick routing balancer (#3751)
This commit is contained in:
@ -22,12 +22,14 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
|
||||
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const (
|
||||
cdsName = "cds_experimental"
|
||||
weightedTargetName = "weighted_target_experimental"
|
||||
xdsRoutingName = "xds_routing_experimental"
|
||||
)
|
||||
|
||||
type serviceConfig struct {
|
||||
@ -53,24 +55,91 @@ type cdsBalancerConfig struct {
|
||||
Cluster string `json:"cluster"`
|
||||
}
|
||||
|
||||
func serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) {
|
||||
// Even if WeightedCluster has only one entry, we still use weighted_target
|
||||
// as top level balancer, to avoid switching top policy between CDS and
|
||||
// weighted_target, causing TCP connection to be recreated.
|
||||
targets := make(map[string]cdsWithWeight)
|
||||
for name, weight := range su.WeightedCluster {
|
||||
targets[name] = cdsWithWeight{
|
||||
Weight: weight,
|
||||
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: name}),
|
||||
type route struct {
|
||||
Path *string `json:"path,omitempty"`
|
||||
Prefix *string `json:"prefix,omitempty"`
|
||||
Regex *string `json:"regex,omitempty"`
|
||||
Headers []*xdsclient.HeaderMatcher `json:"headers,omitempty"`
|
||||
Fraction *wrapperspb.UInt32Value `json:"matchFraction,omitempty"`
|
||||
Action string `json:"action"`
|
||||
}
|
||||
|
||||
type xdsActionConfig struct {
|
||||
ChildPolicy balancerConfig `json:"childPolicy"`
|
||||
}
|
||||
|
||||
type xdsRoutingBalancerConfig struct {
|
||||
Action map[string]xdsActionConfig `json:"action"`
|
||||
Route []*route `json:"route"`
|
||||
}
|
||||
|
||||
func (r *xdsResolver) routesToJSON(routes []*xdsclient.Route) (string, error) {
|
||||
r.updateActions(newActionsFromRoutes(routes))
|
||||
|
||||
// Generate routes.
|
||||
var rts []*route
|
||||
for _, rt := range routes {
|
||||
t := &route{
|
||||
Path: rt.Path,
|
||||
Prefix: rt.Prefix,
|
||||
Regex: rt.Regex,
|
||||
Headers: rt.Headers,
|
||||
}
|
||||
|
||||
if f := rt.Fraction; f != nil {
|
||||
t.Fraction = &wrapperspb.UInt32Value{Value: *f}
|
||||
}
|
||||
|
||||
t.Action = r.getActionAssignedName(rt.Action)
|
||||
rts = append(rts, t)
|
||||
}
|
||||
|
||||
// Generate actions.
|
||||
action := make(map[string]xdsActionConfig)
|
||||
for _, act := range r.actions {
|
||||
action[act.assignedName] = xdsActionConfig{
|
||||
ChildPolicy: weightedClusterToBalancerConfig(act.clustersWithWeights),
|
||||
}
|
||||
}
|
||||
|
||||
sc := serviceConfig{
|
||||
LoadBalancingConfig: newBalancerConfig(
|
||||
xdsRoutingName, xdsRoutingBalancerConfig{
|
||||
Route: rts,
|
||||
Action: action,
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
bs, err := json.Marshal(sc)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to marshal json: %v", err)
|
||||
}
|
||||
return string(bs), nil
|
||||
}
|
||||
|
||||
func weightedClusterToBalancerConfig(wc map[string]uint32) balancerConfig {
|
||||
// Even if WeightedCluster has only one entry, we still use weighted_target
|
||||
// as top level balancer, to avoid switching top policy between CDS and
|
||||
// weighted_target, causing TCP connection to be recreated.
|
||||
targets := make(map[string]cdsWithWeight)
|
||||
for name, weight := range wc {
|
||||
targets[name] = cdsWithWeight{
|
||||
Weight: weight,
|
||||
ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: name}),
|
||||
}
|
||||
}
|
||||
bc := newBalancerConfig(
|
||||
weightedTargetName, weightedCDSBalancerConfig{
|
||||
Targets: targets,
|
||||
},
|
||||
),
|
||||
)
|
||||
return bc
|
||||
}
|
||||
|
||||
func weightedClusterToJSON(wc map[string]uint32) (string, error) {
|
||||
sc := serviceConfig{
|
||||
LoadBalancingConfig: weightedClusterToBalancerConfig(wc),
|
||||
}
|
||||
bs, err := json.Marshal(sc)
|
||||
if err != nil {
|
||||
@ -78,3 +147,13 @@ func serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) {
|
||||
}
|
||||
return string(bs), nil
|
||||
}
|
||||
|
||||
func (r *xdsResolver) serviceUpdateToJSON(su xdsclient.ServiceUpdate) (string, error) {
|
||||
// If WeightedClusters is set, routing is disabled (by env variable). Use
|
||||
// weighted target only.
|
||||
if su.WeightedCluster != nil {
|
||||
return weightedClusterToJSON(su.WeightedCluster)
|
||||
}
|
||||
|
||||
return r.routesToJSON(su.Routes)
|
||||
}
|
||||
|
186
xds/internal/resolver/serviceconfig_action.go
Normal file
186
xds/internal/resolver/serviceconfig_action.go
Normal file
@ -0,0 +1,186 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2020 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
type actionWithAssignedName struct {
|
||||
// cluster:weight, "A":40, "B":60
|
||||
clustersWithWeights map[string]uint32
|
||||
// clusterNames, without weights, sorted and hashed, "A_B_"
|
||||
clusterNames string
|
||||
// The assigned name, clusters plus a random number, "A_B_1"
|
||||
assignedName string
|
||||
// randomNumber is the number appended to assignedName.
|
||||
randomNumber int64
|
||||
}
|
||||
|
||||
// newActionsFromRoutes gets actions from the routes, and turns them into a map
|
||||
// keyed by the hash of the clusters.
|
||||
//
|
||||
// In the returned map, all actions don't have assignedName. The assignedName
|
||||
// will be filled in after comparing the new actions with the existing actions,
|
||||
// so when a new and old action only diff in weights, the new action can reuse
|
||||
// the old action's name.
|
||||
//
|
||||
// from
|
||||
// {B:60, A:40}, {A:30, B:70}, {B:90, C:10}
|
||||
//
|
||||
// to
|
||||
// A40_B60_: {{A:40, B:60}, "A_B_", ""}
|
||||
// A30_B70_: {{A:30, B:70}, "A_B_", ""}
|
||||
// B90_C10_: {{B:90, C:10}, "B_C_", ""}
|
||||
func newActionsFromRoutes(routes []*xdsclient.Route) map[string]actionWithAssignedName {
|
||||
newActions := make(map[string]actionWithAssignedName)
|
||||
for _, route := range routes {
|
||||
var clusterNames []string
|
||||
for n := range route.Action {
|
||||
clusterNames = append(clusterNames, n)
|
||||
}
|
||||
|
||||
// Sort names to be consistent.
|
||||
sort.Strings(clusterNames)
|
||||
clustersOnly := ""
|
||||
clustersWithWeight := ""
|
||||
for _, c := range clusterNames {
|
||||
// Generates A_B_
|
||||
clustersOnly = clustersOnly + c + "_"
|
||||
// Generates A40_B60_
|
||||
clustersWithWeight = clustersWithWeight + c + strconv.FormatUint(uint64(route.Action[c]), 10) + "_"
|
||||
}
|
||||
|
||||
if _, ok := newActions[clustersWithWeight]; !ok {
|
||||
newActions[clustersWithWeight] = actionWithAssignedName{
|
||||
clustersWithWeights: route.Action,
|
||||
clusterNames: clustersOnly,
|
||||
}
|
||||
}
|
||||
}
|
||||
return newActions
|
||||
}
|
||||
|
||||
// updateActions takes a new map of actions, and updates the existing action map in the resolver.
|
||||
//
|
||||
// In the old map, all actions have assignedName set.
|
||||
// In the new map, all actions have no assignedName.
|
||||
//
|
||||
// After the update, the action map is updated to have all actions from the new
|
||||
// map, with assignedName:
|
||||
// - if the new action exists in old, get the old name
|
||||
// - if the new action doesn't exist in old
|
||||
// - if there is an old action that will be removed, and has the same set of
|
||||
// clusters, reuse the old action's name
|
||||
// - otherwise, generate a new name
|
||||
func (r *xdsResolver) updateActions(newActions map[string]actionWithAssignedName) {
|
||||
if r.actions == nil {
|
||||
r.actions = make(map[string]actionWithAssignedName)
|
||||
}
|
||||
|
||||
// Delete actions from existingActions if they are not in newActions. Keep
|
||||
// the removed actions in a map, with key as clusterNames without weights,
|
||||
// so their assigned names can be reused.
|
||||
existingActions := r.actions
|
||||
actionsRemoved := make(map[string][]string)
|
||||
for actionHash, act := range existingActions {
|
||||
if _, ok := newActions[actionHash]; !ok {
|
||||
actionsRemoved[act.clusterNames] = append(actionsRemoved[act.clusterNames], act.assignedName)
|
||||
delete(existingActions, actionHash)
|
||||
}
|
||||
}
|
||||
|
||||
// Find actions in newActions but not in oldActions. Add them, and try to
|
||||
// reuse assigned names from actionsRemoved.
|
||||
if r.usedActionNameRandomNumber == nil {
|
||||
r.usedActionNameRandomNumber = make(map[int64]bool)
|
||||
}
|
||||
for actionHash, act := range newActions {
|
||||
if _, ok := existingActions[actionHash]; !ok {
|
||||
if assignedNamed, ok := actionsRemoved[act.clusterNames]; ok {
|
||||
// Reuse the first assigned name from actionsRemoved.
|
||||
act.assignedName = assignedNamed[0]
|
||||
// If there are more names to reuse after this, update the slice
|
||||
// in the map. Otherwise, remove the entry from the map.
|
||||
if len(assignedNamed) > 1 {
|
||||
actionsRemoved[act.clusterNames] = assignedNamed[1:]
|
||||
} else {
|
||||
delete(actionsRemoved, act.clusterNames)
|
||||
}
|
||||
existingActions[actionHash] = act
|
||||
continue
|
||||
}
|
||||
// Generate a new name.
|
||||
act.randomNumber = r.nextAssignedNameRandomNumber()
|
||||
act.assignedName = fmt.Sprintf("%s%d", act.clusterNames, act.randomNumber)
|
||||
existingActions[actionHash] = act
|
||||
}
|
||||
}
|
||||
|
||||
// Delete entry from nextIndex if all actions with the clusters are removed.
|
||||
remainingRandomNumbers := make(map[int64]bool)
|
||||
for _, act := range existingActions {
|
||||
remainingRandomNumbers[act.randomNumber] = true
|
||||
}
|
||||
r.usedActionNameRandomNumber = remainingRandomNumbers
|
||||
}
|
||||
|
||||
var grpcrandInt63n = grpcrand.Int63n
|
||||
|
||||
func (r *xdsResolver) nextAssignedNameRandomNumber() int64 {
|
||||
for {
|
||||
t := grpcrandInt63n(math.MaxInt32)
|
||||
if !r.usedActionNameRandomNumber[t] {
|
||||
return t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// getActionAssignedName hashes the clusters from the action, and find the
|
||||
// assigned action name. The assigned action names are kept in r.actions, with
|
||||
// the clusters name hash as map key.
|
||||
//
|
||||
// The assigned action name is not simply the hash. For example, the hash can be
|
||||
// "A40_B60_", but the assigned name can be "A_B_0". It's this way so the action
|
||||
// can be reused if only weights are changing.
|
||||
func (r *xdsResolver) getActionAssignedName(action map[string]uint32) string {
|
||||
var clusterNames []string
|
||||
for n := range action {
|
||||
clusterNames = append(clusterNames, n)
|
||||
}
|
||||
// Hash cluster names. Sort names to be consistent.
|
||||
sort.Strings(clusterNames)
|
||||
clustersWithWeight := ""
|
||||
for _, c := range clusterNames {
|
||||
// Generates hash "A40_B60_".
|
||||
clustersWithWeight = clustersWithWeight + c + strconv.FormatUint(uint64(action[c]), 10) + "_"
|
||||
}
|
||||
// Look in r.actions for the assigned action name.
|
||||
if act, ok := r.actions[clustersWithWeight]; ok {
|
||||
return act.assignedName
|
||||
}
|
||||
r.logger.Warningf("no assigned name found for action %v", action)
|
||||
return ""
|
||||
}
|
356
xds/internal/resolver/serviceconfig_action_test.go
Normal file
356
xds/internal/resolver/serviceconfig_action_test.go
Normal file
@ -0,0 +1,356 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2020 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package resolver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
func TestNewActionsFromRoutes(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
routes []*xdsclient.Route
|
||||
want map[string]actionWithAssignedName
|
||||
}{
|
||||
{
|
||||
name: "temp",
|
||||
routes: []*xdsclient.Route{
|
||||
{Action: map[string]uint32{"B": 60, "A": 40}},
|
||||
{Action: map[string]uint32{"A": 30, "B": 70}},
|
||||
{Action: map[string]uint32{"B": 90, "C": 10}},
|
||||
},
|
||||
want: map[string]actionWithAssignedName{
|
||||
"A40_B60_": {map[string]uint32{"A": 40, "B": 60}, "A_B_", "", 0},
|
||||
"A30_B70_": {map[string]uint32{"A": 30, "B": 70}, "A_B_", "", 0},
|
||||
"B90_C10_": {map[string]uint32{"B": 90, "C": 10}, "B_C_", "", 0},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
cmpOpts := []cmp.Option{cmp.AllowUnexported(actionWithAssignedName{})}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if got := newActionsFromRoutes(tt.routes); !cmp.Equal(got, tt.want, cmpOpts...) {
|
||||
t.Errorf("newActionsFromRoutes() got unexpected result, diff %v", cmp.Diff(got, tt.want, cmpOpts...))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoveOrReuseName(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
oldActions map[string]actionWithAssignedName
|
||||
oldRandNums map[int64]bool
|
||||
newActions map[string]actionWithAssignedName
|
||||
wantActions map[string]actionWithAssignedName
|
||||
wantRandNums map[int64]bool
|
||||
}{
|
||||
{
|
||||
name: "add same cluster",
|
||||
oldActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
},
|
||||
oldRandNums: map[int64]bool{
|
||||
0: true,
|
||||
},
|
||||
newActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
},
|
||||
wantActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_1000",
|
||||
randomNumber: 1000,
|
||||
},
|
||||
},
|
||||
wantRandNums: map[int64]bool{
|
||||
0: true,
|
||||
1000: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "delete same cluster",
|
||||
oldActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_1",
|
||||
randomNumber: 1,
|
||||
},
|
||||
},
|
||||
oldRandNums: map[int64]bool{
|
||||
0: true,
|
||||
1: true,
|
||||
},
|
||||
newActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
},
|
||||
wantActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
},
|
||||
wantRandNums: map[int64]bool{
|
||||
0: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add new clusters",
|
||||
oldActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
},
|
||||
oldRandNums: map[int64]bool{
|
||||
0: true,
|
||||
},
|
||||
newActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
"a50_b50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 50, "b": 50},
|
||||
clusterNames: "a_b_",
|
||||
},
|
||||
},
|
||||
wantActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
"a50_b50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 50, "b": 50},
|
||||
clusterNames: "a_b_",
|
||||
assignedName: "a_b_1000",
|
||||
randomNumber: 1000,
|
||||
},
|
||||
},
|
||||
wantRandNums: map[int64]bool{
|
||||
0: true,
|
||||
1000: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reuse",
|
||||
oldActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
},
|
||||
oldRandNums: map[int64]bool{
|
||||
0: true,
|
||||
},
|
||||
newActions: map[string]actionWithAssignedName{
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
},
|
||||
wantActions: map[string]actionWithAssignedName{
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
},
|
||||
wantRandNums: map[int64]bool{
|
||||
0: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "add and reuse",
|
||||
oldActions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_1",
|
||||
randomNumber: 1,
|
||||
},
|
||||
"a50_b50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 50, "b": 50},
|
||||
clusterNames: "a_b_",
|
||||
assignedName: "a_b_2",
|
||||
randomNumber: 2,
|
||||
},
|
||||
},
|
||||
oldRandNums: map[int64]bool{
|
||||
0: true,
|
||||
1: true,
|
||||
2: true,
|
||||
},
|
||||
newActions: map[string]actionWithAssignedName{
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
"a30_b30_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 30, "b": 30, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
},
|
||||
"c50_d50_": {
|
||||
clustersWithWeights: map[string]uint32{"c": 50, "d": 50},
|
||||
clusterNames: "c_d_",
|
||||
},
|
||||
},
|
||||
wantActions: map[string]actionWithAssignedName{
|
||||
"a10_b50_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 10, "b": 50, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_1",
|
||||
randomNumber: 1,
|
||||
},
|
||||
"a30_b30_c40_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 30, "b": 30, "c": 40},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
randomNumber: 0,
|
||||
},
|
||||
"c50_d50_": {
|
||||
clustersWithWeights: map[string]uint32{"c": 50, "d": 50},
|
||||
clusterNames: "c_d_",
|
||||
assignedName: "c_d_1000",
|
||||
randomNumber: 1000,
|
||||
},
|
||||
},
|
||||
wantRandNums: map[int64]bool{
|
||||
0: true,
|
||||
1: true,
|
||||
1000: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
cmpOpts := []cmp.Option{cmp.AllowUnexported(actionWithAssignedName{})}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer replaceRandNumGenerator(1000)()
|
||||
r := &xdsResolver{
|
||||
actions: tt.oldActions,
|
||||
usedActionNameRandomNumber: tt.oldRandNums,
|
||||
}
|
||||
r.updateActions(tt.newActions)
|
||||
if !cmp.Equal(r.actions, tt.wantActions, cmpOpts...) {
|
||||
t.Errorf("removeOrReuseName() got unexpected actions, diff %v", cmp.Diff(r.actions, tt.wantActions, cmpOpts...))
|
||||
}
|
||||
if !cmp.Equal(r.usedActionNameRandomNumber, tt.wantRandNums) {
|
||||
t.Errorf("removeOrReuseName() got unexpected nextIndex, diff %v", cmp.Diff(r.usedActionNameRandomNumber, tt.wantRandNums))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetActionAssignedName(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
actions map[string]actionWithAssignedName
|
||||
action map[string]uint32
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "good",
|
||||
actions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
},
|
||||
},
|
||||
action: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
want: "a_b_c_0",
|
||||
},
|
||||
{
|
||||
name: "two",
|
||||
actions: map[string]actionWithAssignedName{
|
||||
"a20_b30_c50_": {
|
||||
clustersWithWeights: map[string]uint32{"a": 20, "b": 30, "c": 50},
|
||||
clusterNames: "a_b_c_",
|
||||
assignedName: "a_b_c_0",
|
||||
},
|
||||
"c50_d50_": {
|
||||
clustersWithWeights: map[string]uint32{"c": 50, "d": 50},
|
||||
clusterNames: "c_d_",
|
||||
assignedName: "c_d_0",
|
||||
},
|
||||
},
|
||||
action: map[string]uint32{"c": 50, "d": 50},
|
||||
want: "c_d_0",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := &xdsResolver{
|
||||
actions: tt.actions,
|
||||
}
|
||||
if got := r.getActionAssignedName(tt.action); got != tt.want {
|
||||
t.Errorf("getActionAssignedName() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@ -23,9 +23,12 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/weightedtarget"
|
||||
_ "google.golang.org/grpc/xds/internal/balancer/xdsrouting"
|
||||
"google.golang.org/grpc/xds/internal/client"
|
||||
xdsclient "google.golang.org/grpc/xds/internal/client"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -54,36 +57,155 @@ const (
|
||||
"targets": {}
|
||||
}
|
||||
}]}`
|
||||
testRoutingJSON = `{"loadBalancingConfig":[{
|
||||
"xds_routing_experimental": {
|
||||
"action":{
|
||||
"cluster_1_cluster_2_0":{
|
||||
"childPolicy":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": {
|
||||
"cluster_1" : {
|
||||
"weight":75,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
|
||||
},
|
||||
"cluster_2" : {
|
||||
"weight":25,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
},
|
||||
|
||||
"route":[{
|
||||
"path":"/service_1/method_1",
|
||||
"action":"cluster_1_cluster_2_0"
|
||||
}]
|
||||
}
|
||||
}]}
|
||||
`
|
||||
testRoutingAllMatchersJSON = `{"loadBalancingConfig":[{
|
||||
"xds_routing_experimental": {
|
||||
"action":{
|
||||
"cluster_1_0":{
|
||||
"childPolicy":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": {
|
||||
"cluster_1" : {
|
||||
"weight":1,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
},
|
||||
"cluster_2_0":{
|
||||
"childPolicy":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": {
|
||||
"cluster_2" : {
|
||||
"weight":1,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
},
|
||||
"cluster_3_0":{
|
||||
"childPolicy":[{
|
||||
"weighted_target_experimental": {
|
||||
"targets": {
|
||||
"cluster_3" : {
|
||||
"weight":1,
|
||||
"childPolicy":[{"cds_experimental":{"cluster":"cluster_3"}}]
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
},
|
||||
|
||||
"route":[{
|
||||
"path":"/service_1/method_1",
|
||||
"action":"cluster_1_0"
|
||||
},
|
||||
{
|
||||
"prefix":"/service_2/method_1",
|
||||
"action":"cluster_1_0"
|
||||
},
|
||||
{
|
||||
"regex":"^/service_2/method_3$",
|
||||
"action":"cluster_1_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"headers":[{"name":"header-1", "exactMatch":"value-1", "invertMatch":true}],
|
||||
"action":"cluster_2_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"headers":[{"name":"header-1", "regexMatch":"^value-1$"}],
|
||||
"action":"cluster_2_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"headers":[{"name":"header-1", "rangeMatch":{"start":-1, "end":7}}],
|
||||
"action":"cluster_3_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"headers":[{"name":"header-1", "presentMatch":true}],
|
||||
"action":"cluster_3_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"headers":[{"name":"header-1", "prefixMatch":"value-1"}],
|
||||
"action":"cluster_2_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"headers":[{"name":"header-1", "suffixMatch":"value-1"}],
|
||||
"action":"cluster_2_0"
|
||||
},
|
||||
{
|
||||
"prefix":"",
|
||||
"matchFraction":{"value": 31415},
|
||||
"action":"cluster_3_0"
|
||||
}]
|
||||
}
|
||||
}]}
|
||||
`
|
||||
)
|
||||
|
||||
func TestServiceUpdateToJSON(t *testing.T) {
|
||||
func TestWeightedClusterToJSON(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
su client.ServiceUpdate
|
||||
wc map[string]uint32
|
||||
wantJSON string // wantJSON is not to be compared verbatim.
|
||||
}{
|
||||
{
|
||||
name: "one cluster only",
|
||||
su: client.ServiceUpdate{WeightedCluster: map[string]uint32{testCluster1: 1}},
|
||||
wc: map[string]uint32{testCluster1: 1},
|
||||
wantJSON: testClusterOnlyJSON,
|
||||
},
|
||||
{
|
||||
name: "empty weighted clusters",
|
||||
su: client.ServiceUpdate{WeightedCluster: nil},
|
||||
wc: nil,
|
||||
wantJSON: testWeightedCDSNoChildJSON,
|
||||
},
|
||||
{
|
||||
name: "weighted clusters",
|
||||
su: client.ServiceUpdate{WeightedCluster: map[string]uint32{
|
||||
wc: map[string]uint32{
|
||||
"cluster_1": 75,
|
||||
"cluster_2": 25,
|
||||
}},
|
||||
},
|
||||
wantJSON: testWeightedCDSJSON,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotJSON, err := serviceUpdateToJSON(tt.su)
|
||||
gotJSON, err := weightedClusterToJSON(tt.wc)
|
||||
if err != nil {
|
||||
t.Errorf("serviceUpdateToJSON returned error: %v", err)
|
||||
return
|
||||
@ -100,3 +222,186 @@ func TestServiceUpdateToJSON(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRoutesToJSON(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
routes []*xdsclient.Route
|
||||
wantJSON string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "one route",
|
||||
routes: []*xdsclient.Route{{
|
||||
Path: newStringP("/service_1/method_1"),
|
||||
Action: map[string]uint32{"cluster_1": 75, "cluster_2": 25},
|
||||
}},
|
||||
wantJSON: testRoutingJSON,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "all matchers",
|
||||
routes: []*xdsclient.Route{
|
||||
{
|
||||
Path: newStringP("/service_1/method_1"),
|
||||
Action: map[string]uint32{"cluster_1": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP("/service_2/method_1"),
|
||||
Action: map[string]uint32{"cluster_1": 1},
|
||||
},
|
||||
{
|
||||
Regex: newStringP("^/service_2/method_3$"),
|
||||
Action: map[string]uint32{"cluster_1": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Headers: []*xdsclient.HeaderMatcher{{
|
||||
Name: "header-1",
|
||||
InvertMatch: newBoolP(true),
|
||||
ExactMatch: newStringP("value-1"),
|
||||
}},
|
||||
Action: map[string]uint32{"cluster_2": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Headers: []*xdsclient.HeaderMatcher{{
|
||||
Name: "header-1",
|
||||
RegexMatch: newStringP("^value-1$"),
|
||||
}},
|
||||
Action: map[string]uint32{"cluster_2": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Headers: []*xdsclient.HeaderMatcher{{
|
||||
Name: "header-1",
|
||||
RangeMatch: &xdsclient.Int64Range{Start: -1, End: 7},
|
||||
}},
|
||||
Action: map[string]uint32{"cluster_3": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Headers: []*xdsclient.HeaderMatcher{{
|
||||
Name: "header-1",
|
||||
PresentMatch: newBoolP(true),
|
||||
}},
|
||||
Action: map[string]uint32{"cluster_3": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Headers: []*xdsclient.HeaderMatcher{{
|
||||
Name: "header-1",
|
||||
PrefixMatch: newStringP("value-1"),
|
||||
}},
|
||||
Action: map[string]uint32{"cluster_2": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Headers: []*xdsclient.HeaderMatcher{{
|
||||
Name: "header-1",
|
||||
SuffixMatch: newStringP("value-1"),
|
||||
}},
|
||||
Action: map[string]uint32{"cluster_2": 1},
|
||||
},
|
||||
{
|
||||
Prefix: newStringP(""),
|
||||
Fraction: newUint32P(31415),
|
||||
Action: map[string]uint32{"cluster_3": 1},
|
||||
},
|
||||
},
|
||||
wantJSON: testRoutingAllMatchersJSON,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Note this random number function only generates 0. This is
|
||||
// because the test doesn't handle action update, and there's only
|
||||
// one action for each cluster bundle.
|
||||
//
|
||||
// This is necessary so the output is deterministic.
|
||||
grpcrandInt63n = func(int64) int64 { return 0 }
|
||||
defer func() { grpcrandInt63n = grpcrand.Int63n }()
|
||||
|
||||
gotJSON, err := (&xdsResolver{}).routesToJSON(tt.routes)
|
||||
if err != nil {
|
||||
t.Errorf("routesToJSON returned error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
gotParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(gotJSON)
|
||||
wantParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(tt.wantJSON)
|
||||
|
||||
if !internal.EqualServiceConfigForTesting(gotParsed.Config, wantParsed.Config) {
|
||||
t.Errorf("serviceUpdateToJSON() = %v, want %v", gotJSON, tt.wantJSON)
|
||||
t.Error("gotParsed: ", cmp.Diff(nil, gotParsed))
|
||||
t.Error("wantParsed: ", cmp.Diff(nil, wantParsed))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceUpdateToJSON(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
su client.ServiceUpdate
|
||||
wantJSON string
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "weighted clusters",
|
||||
su: client.ServiceUpdate{WeightedCluster: map[string]uint32{
|
||||
"cluster_1": 75,
|
||||
"cluster_2": 25,
|
||||
}},
|
||||
wantJSON: testWeightedCDSJSON,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "routing",
|
||||
su: client.ServiceUpdate{
|
||||
Routes: []*xdsclient.Route{{
|
||||
Path: newStringP("/service_1/method_1"),
|
||||
Action: map[string]uint32{"cluster_1": 75, "cluster_2": 25},
|
||||
}},
|
||||
},
|
||||
wantJSON: testRoutingJSON,
|
||||
wantErr: false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
defer replaceRandNumGenerator(0)()
|
||||
gotJSON, err := (&xdsResolver{}).serviceUpdateToJSON(tt.su)
|
||||
if err != nil {
|
||||
t.Errorf("serviceUpdateToJSON returned error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
gotParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(gotJSON)
|
||||
wantParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(tt.wantJSON)
|
||||
|
||||
if !internal.EqualServiceConfigForTesting(gotParsed.Config, wantParsed.Config) {
|
||||
t.Errorf("serviceUpdateToJSON() = %v, want %v", gotJSON, tt.wantJSON)
|
||||
t.Error("gotParsed: ", cmp.Diff(nil, gotParsed))
|
||||
t.Error("wantParsed: ", cmp.Diff(nil, wantParsed))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Two updates to the same resolver, test that action names are reused.
|
||||
func TestServiceUpdateToJSON_TwoConfig_UpdateActions(t *testing.T) {
|
||||
}
|
||||
|
||||
func newStringP(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func newBoolP(b bool) *bool {
|
||||
return &b
|
||||
}
|
||||
|
||||
func newUint32P(i uint32) *uint32 {
|
||||
return &i
|
||||
}
|
||||
|
@ -159,6 +159,16 @@ type xdsResolver struct {
|
||||
updateCh chan suWithError
|
||||
// cancelWatch is the function to cancel the watcher.
|
||||
cancelWatch func()
|
||||
|
||||
// actions is a map from hash of weighted cluster, to the weighted cluster
|
||||
// map, and it's assigned name. E.g.
|
||||
// "A40_B60_": {{A:40, B:60}, "A_B_", "A_B_0"}
|
||||
// "A30_B70_": {{A:30, B:70}, "A_B_", "A_B_1"}
|
||||
// "B90_C10_": {{B:90, C:10}, "B_C_", "B_C_0"}
|
||||
actions map[string]actionWithAssignedName
|
||||
// usedActionNameRandomNumber contains random numbers that have been used in
|
||||
// assigned names, to avoid collision.
|
||||
usedActionNameRandomNumber map[int64]bool
|
||||
}
|
||||
|
||||
// run is a long running goroutine which blocks on receiving service updates
|
||||
@ -185,7 +195,7 @@ func (r *xdsResolver) run() {
|
||||
r.cc.ReportError(update.err)
|
||||
continue
|
||||
}
|
||||
sc, err := serviceUpdateToJSON(update.su)
|
||||
sc, err := r.serviceUpdateToJSON(update.su)
|
||||
if err != nil {
|
||||
r.logger.Warningf("failed to convert update to service config: %v", err)
|
||||
r.cc.ReportError(err)
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpcrand"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/serviceconfig"
|
||||
xdsinternal "google.golang.org/grpc/xds/internal"
|
||||
@ -446,3 +447,15 @@ func TestXDSResolverResourceNotFoundError(t *testing.T) {
|
||||
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
|
||||
}
|
||||
}
|
||||
|
||||
func replaceRandNumGenerator(start int64) func() {
|
||||
nextInt := start
|
||||
grpcrandInt63n = func(int64) (ret int64) {
|
||||
ret = nextInt
|
||||
nextInt++
|
||||
return
|
||||
}
|
||||
return func() {
|
||||
grpcrandInt63n = grpcrand.Int63n
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user