xds: replace eds proto message with xdsclient defined struct in eds balancer (#3174)

The xds client will parse the EDS response, and give the parse result to eds balancer, so the balancer doesn't need to deal with proto directly.

Also moved `ClusterLoadAssignmentBuilder` to another pacakge to be shared by tests in different packages.
This commit is contained in:
Menghan Li
2019-11-14 10:11:01 -08:00
committed by GitHub
parent 0b3a5cc266
commit 448c8c628c
11 changed files with 505 additions and 283 deletions

View File

@ -20,16 +20,10 @@ package edsbalancer
import (
"context"
"encoding/json"
"net"
"reflect"
"strconv"
"sync"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/weightedroundrobin"
@ -40,6 +34,7 @@ import (
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
// TODO: make this a environment variable?
@ -154,26 +149,17 @@ func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
// updateDrops compares new drop policies with the old. If they are different,
// it updates the drop policies and send ClientConn an updated picker.
func (xdsB *EDSBalancer) updateDrops(dropPolicies []*xdspb.ClusterLoadAssignment_Policy_DropOverload) {
func (xdsB *EDSBalancer) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) {
var (
newDrops []*dropper
dropsChanged bool
)
for i, dropPolicy := range dropPolicies {
percentage := dropPolicy.GetDropPercentage()
var (
numerator = percentage.GetNumerator()
denominator uint32
numerator = dropPolicy.Numerator
denominator = dropPolicy.Denominator
)
switch percentage.GetDenominator() {
case typepb.FractionalPercent_HUNDRED:
denominator = 100
case typepb.FractionalPercent_TEN_THOUSAND:
denominator = 10000
case typepb.FractionalPercent_MILLION:
denominator = 1000000
}
newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.GetCategory()))
newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category))
// The following reading xdsB.drops doesn't need mutex because it can only
// be updated by the code following.
@ -203,7 +189,7 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []*xdspb.ClusterLoadAssignment
// SubConns. It also handles drops.
//
// HandleChildPolicy and HandleEDSResponse must be called by the same goroutine.
func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) {
func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
// TODO: Unhandled fields from EDS response:
// - edsResp.GetPolicy().GetOverprovisioningFactor()
// - locality.GetPriority()
@ -213,7 +199,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
// - socketAddress.GetNamedPort(), socketAddress.GetResolverName()
// - resolve endpoint's name with another resolver
xdsB.updateDrops(edsResp.GetPolicy().GetDropOverloads())
xdsB.updateDrops(edsResp.Drops)
// Filter out all localities with weight 0.
//
@ -224,12 +210,12 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
//
// In the future, we should look at the config in CDS response and decide
// whether locality weight matters.
newLocalitiesWithPriority := make(map[priorityType][]*endpointpb.LocalityLbEndpoints)
for _, locality := range edsResp.Endpoints {
if locality.GetLoadBalancingWeight().GetValue() == 0 {
newLocalitiesWithPriority := make(map[priorityType][]xdsclient.Locality)
for _, locality := range edsResp.Localities {
if locality.Weight == 0 {
continue
}
priority := newPriorityType(locality.GetPriority())
priority := newPriorityType(locality.Priority)
newLocalitiesWithPriority[priority] = append(newLocalitiesWithPriority[priority], locality)
}
@ -281,7 +267,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
}
}
func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []*endpointpb.LocalityLbEndpoints) {
func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) {
// newLocalitiesSet contains all names of localities in the new EDS response
// for the same priority. It's used to delete localities that are removed in
// the new EDS response.
@ -289,37 +275,26 @@ func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithCon
for _, locality := range newLocalities {
// One balancer for each locality.
l := locality.GetLocality()
if l == nil {
grpclog.Warningf("xds: received LocalityLbEndpoints with <nil> Locality")
continue
}
lid := internal.Locality{
Region: l.Region,
Zone: l.Zone,
SubZone: l.SubZone,
}
lid := locality.ID
newLocalitiesSet[lid] = struct{}{}
newWeight := locality.GetLoadBalancingWeight().GetValue()
newWeight := locality.Weight
var newAddrs []resolver.Address
for _, lbEndpoint := range locality.GetLbEndpoints() {
for _, lbEndpoint := range locality.Endpoints {
// Filter out all "unhealthy" endpoints (unknown and
// healthy are both considered to be healthy:
// https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus).
if lbEndpoint.GetHealthStatus() != corepb.HealthStatus_HEALTHY &&
lbEndpoint.GetHealthStatus() != corepb.HealthStatus_UNKNOWN {
if lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy &&
lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown {
continue
}
socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()
address := resolver.Address{
Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))),
Addr: lbEndpoint.Address,
}
if xdsB.subBalancerBuilder.Name() == weightedroundrobin.Name &&
lbEndpoint.GetLoadBalancingWeight().GetValue() != 0 {
if xdsB.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 {
address.Metadata = &weightedroundrobin.AddrInfo{
Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(),
Weight: lbEndpoint.Weight,
}
}
newAddrs = append(newAddrs, address)

View File

@ -19,24 +19,19 @@ package edsbalancer
import (
"context"
"fmt"
"net"
"reflect"
"sort"
"strconv"
"testing"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
typespb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
var (
@ -51,81 +46,6 @@ func init() {
}
}
type clusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *clusterLoadAssignmentBuilder {
var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
DropPercentage: &typepb.FractionalPercent{
Numerator: d,
Denominator: typepb.FractionalPercent_HUNDRED,
},
})
}
return &clusterLoadAssignmentBuilder{
v: &xdspb.ClusterLoadAssignment{
ClusterName: clusterName,
Policy: &xdspb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
},
},
}
}
type addLocalityOptions struct {
health []corepb.HealthStatus
}
func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *addLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
}
port, err := strconv.Atoi(portStr)
if err != nil {
panic("failed to atoi " + portStr)
}
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}},
}
if opts != nil && i < len(opts.health) {
lbe.HealthStatus = opts.health[i]
}
lbEndPoints = append(lbEndPoints, lbe)
}
clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Locality: &corepb.Locality{
Region: "",
Zone: "",
SubZone: subzone,
},
LbEndpoints: lbEndPoints,
LoadBalancingWeight: &typespb.UInt32Value{Value: weight},
Priority: priority,
})
}
func (clab *clusterLoadAssignmentBuilder) build() *xdspb.ClusterLoadAssignment {
return clab.v
}
// One locality
// - add backend
// - remove backend
@ -136,9 +56,9 @@ func TestEDS_OneLocality(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// One locality with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
@ -154,9 +74,9 @@ func TestEDS_OneLocality(t *testing.T) {
}
// The same locality, add one more backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
edsb.HandleEDSResponse(clab2.build())
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
sc2 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
@ -173,9 +93,9 @@ func TestEDS_OneLocality(t *testing.T) {
}
// The same locality, delete first backend.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab3.build())
clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
scToRemove := <-cc.removeSubConnCh
if !reflect.DeepEqual(scToRemove, sc1) {
@ -193,9 +113,9 @@ func TestEDS_OneLocality(t *testing.T) {
}
// The same locality, replace backend.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab4.build())
clab4 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab4.Build()))
sc3 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
@ -216,9 +136,9 @@ func TestEDS_OneLocality(t *testing.T) {
}
// The same locality, different drop rate, dropping 50%.
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab5.build())
clab5 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50})
clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab5.Build()))
// Picks with drops.
p5 := <-cc.newPickerCh
@ -245,9 +165,9 @@ func TestEDS_TwoLocalities(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
@ -255,8 +175,8 @@ func TestEDS_TwoLocalities(t *testing.T) {
// Add the second locality later to make sure sc2 belongs to the second
// locality. Otherwise the test is flaky because of a map is used in EDS to
// keep localities.
clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)
@ -272,11 +192,11 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
// Add another locality, with one backend.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
sc3 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc3, connectivity.Connecting)
@ -293,10 +213,10 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
// Remove first locality.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab3.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab3.build())
clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
scToRemove := <-cc.removeSubConnCh
if !reflect.DeepEqual(scToRemove, sc1) {
@ -315,10 +235,10 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
// Add a backend to the last locality.
clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab4.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab4.build())
clab4 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab4.Build()))
sc4 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc4, connectivity.Connecting)
@ -338,10 +258,10 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
// Change weight of the locality[1].
clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.addLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil)
clab5.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab5.build())
clab5 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil)
clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab5.Build()))
// Test pick with two subconns different locality weight.
p5 := <-cc.newPickerCh
@ -357,10 +277,10 @@ func TestEDS_TwoLocalities(t *testing.T) {
}
// Change weight of the locality[1] to 0, it should never be picked.
clab6 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.addLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(clab6.build())
clab6 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil)
clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab6.Build()))
// Changing weight of locality[1] to 0 caused it to be removed. It's subconn
// should also be removed.
@ -393,9 +313,9 @@ func TestEDS_EndpointsHealth(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &addLocalityOptions{
health: []corepb.HealthStatus{
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &xdsclient.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
@ -404,8 +324,8 @@ func TestEDS_EndpointsHealth(t *testing.T) {
corepb.HealthStatus_DEGRADED,
},
})
clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &addLocalityOptions{
health: []corepb.HealthStatus{
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &xdsclient.AddLocalityOptions{
Health: []corepb.HealthStatus{
corepb.HealthStatus_HEALTHY,
corepb.HealthStatus_UNHEALTHY,
corepb.HealthStatus_UNKNOWN,
@ -414,7 +334,7 @@ func TestEDS_EndpointsHealth(t *testing.T) {
corepb.HealthStatus_DEGRADED,
},
})
edsb.HandleEDSResponse(clab1.build())
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
var (
readySCs []balancer.SubConn
@ -527,10 +447,10 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) {
edsb.HandleChildPolicy("test-const-balancer", nil)
// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
for i := 0; i < 2; i++ {
sc := <-cc.newSubConnCh
@ -691,9 +611,9 @@ func TestEDS_LoadReport(t *testing.T) {
backendToBalancerID := make(map[balancer.SubConn]internal.Locality)
// Two localities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc1 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc1, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc1, connectivity.Ready)
@ -704,8 +624,8 @@ func TestEDS_LoadReport(t *testing.T) {
// Add the second locality later to make sure sc2 belongs to the second
// locality. Otherwise the test is flaky because of a map is used in EDS to
// keep localities.
clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
sc2 := <-cc.newSubConnCh
edsb.HandleSubConnStateChange(sc2, connectivity.Connecting)
edsb.HandleSubConnStateChange(sc2, connectivity.Ready)

View File

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
// When a high priority is ready, adding/removing lower locality doesn't cause
@ -36,10 +37,10 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with priorities [0, 1], each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs1 := <-cc.newSubConnAddrsCh
if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want {
@ -63,11 +64,11 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) {
}
// Add p2, it shouldn't cause any udpates.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
select {
case <-cc.newPickerCh:
@ -80,10 +81,10 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) {
}
// Remove p2, no updates.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab3.build())
clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
select {
case <-cc.newPickerCh:
@ -105,10 +106,10 @@ func TestEDSPriority_SwitchPriority(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with priorities [0, 1], each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -151,11 +152,11 @@ func TestEDSPriority_SwitchPriority(t *testing.T) {
}
// Add p2, it shouldn't cause any udpates.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
select {
case <-cc.newPickerCh:
@ -187,10 +188,10 @@ func TestEDSPriority_SwitchPriority(t *testing.T) {
}
// Remove 2, use 1.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab3.build())
clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
// p2 SubConns are removed.
scToRemove := <-cc.removeSubConnCh
@ -215,10 +216,10 @@ func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with different priorities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -245,11 +246,11 @@ func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) {
}
// Add p2, it should create a new SubConn.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab2.build())
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
addrs2 := <-cc.newSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
@ -279,11 +280,11 @@ func TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with priorities [0,1,2], each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -358,10 +359,10 @@ func TestEDSPriority_InitTimeout(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with different priorities, each with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs0 := <-cc.newSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -407,10 +408,10 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with different priorities, each with one backend.
clab0 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab0.build())
clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab0.Build()))
addrs0 := <-cc.newSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -472,12 +473,12 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) {
}
// Add two localities, with two priorities, with one backend.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
clab1.addLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
clab1.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil)
clab1.AddLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
addrs2 := <-cc.newSubConnAddrsCh
if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want {
@ -534,10 +535,10 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) {
edsb := NewXDSBalancer(cc, nil)
// Two localities, with different priorities, each with one backend.
clab0 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(clab0.build())
clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil)
clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab0.Build()))
addrs0 := <-cc.newSubConnAddrsCh
if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want {
@ -559,8 +560,8 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
// Remove all priorities.
clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
edsb.HandleEDSResponse(clab1.build())
clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build()))
// p0 subconn should be removed.
scToRemove := <-cc.removeSubConnCh
@ -577,10 +578,10 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
// Re-add two localities, with previous priorities, but different backends.
clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil)
edsb.HandleEDSResponse(clab2.build())
clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build()))
addrs01 := <-cc.newSubConnAddrsCh
if got, want := addrs01[0].Addr, testEndpointAddrs[2]; got != want {
@ -612,9 +613,9 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) {
}
// Remove p1 from EDS, to fallback to p0.
clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(clab3.build())
clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil)
clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil)
edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build()))
// p1 subconn should be removed.
scToRemove1 := <-cc.removeSubConnCh

View File

@ -27,8 +27,6 @@ import (
"sync"
"time"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
@ -36,6 +34,7 @@ import (
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/edsbalancer"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
const (
@ -101,7 +100,7 @@ func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB
// It's implemented by the real eds balancer and a fake testing eds balancer.
type edsBalancerInterface interface {
// HandleEDSResponse passes the received EDS message from traffic director to eds balancer.
HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment)
HandleEDSResponse(edsResp *xdsclient.EDSUpdate)
// HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use.
HandleChildPolicy(name string, config json.RawMessage)
// HandleSubConnStateChange handles state change for SubConn.
@ -158,7 +157,7 @@ func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) {
var haveGotADS bool
// set up callbacks for the xds client.
newADS := func(ctx context.Context, resp proto.Message) error {
newADS := func(ctx context.Context, resp *xdsclient.EDSUpdate) error {
if !haveGotADS {
if prevClient != nil {
prevClient.close()
@ -410,21 +409,12 @@ func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
type edsResp struct {
ctx context.Context
resp *xdspb.ClusterLoadAssignment
}
func (x *xdsBalancer) newADSResponse(ctx context.Context, resp proto.Message) error {
var update interface{}
switch u := resp.(type) {
case *xdspb.ClusterLoadAssignment:
// nothing to check
update = &edsResp{ctx: ctx, resp: u}
default:
grpclog.Warningf("xdsBalancer: got a response that's not EDS, type = %T", u)
resp *xdsclient.EDSUpdate
}
func (x *xdsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error {
select {
case x.xdsClientUpdate <- update:
case x.xdsClientUpdate <- &edsResp{ctx: ctx, resp: resp}:
case <-x.ctx.Done():
case <-ctx.Done():
}

View File

@ -26,7 +26,6 @@ import (
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
xdsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
@ -34,6 +33,7 @@ import (
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
"google.golang.org/grpc/xds/internal/client/bootstrap"
)
@ -52,7 +52,7 @@ type client struct {
cli xdsgrpc.AggregatedDiscoveryServiceClient
dialer func(context.Context, string) (net.Conn, error)
channelzParentID int64
newADS func(ctx context.Context, resp proto.Message) error
newADS func(ctx context.Context, resp *xdsclient.EDSUpdate) error
loseContact func(ctx context.Context)
cleanup func()
backoff backoff.Strategy
@ -197,7 +197,12 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err)
return
}
if err := c.newADS(c.ctx, adsResp.Message); err != nil {
parsed, err := xdsclient.ParseEDSRespProto(adsResp.Message.(*xdspb.ClusterLoadAssignment))
if err != nil {
grpclog.Warningf("xds: parsing new EDS message failed due to %v.", err)
return
}
if err := c.newADS(c.ctx, parsed); err != nil {
grpclog.Warningf("xds: processing new ADS message failed due to %v.", err)
return
}
@ -213,7 +218,7 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) {
}
}
func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *client {
c := &client{
serviceName: edsServiceName,
dialer: opts.Dialer,

View File

@ -35,11 +35,13 @@ import (
durationpb "github.com/golang/protobuf/ptypes/duration"
structpb "github.com/golang/protobuf/ptypes/struct"
wrpb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
var (
@ -233,8 +235,8 @@ func (s) TestXdsClientResponseHandling(t *testing.T) {
func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return nil
}
@ -255,7 +257,11 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) {
for i, resp := range test.responsesToSend {
td.sendResp(&response{resp: resp})
ads := <-adsChan
if !proto.Equal(ads, test.expectedADSResponses[i]) {
want, err := xdsclient.ParseEDSRespProto(test.expectedADSResponses[i].(*xdspb.ClusterLoadAssignment))
if err != nil {
t.Fatalf("parsing wanted EDS response failed: %v", err)
}
if !cmp.Equal(ads, want) {
t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i])
}
}
@ -286,8 +292,8 @@ func (s) TestXdsClientLoseContact(t *testing.T) {
func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return nil
}
@ -320,8 +326,8 @@ func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return test.adsErr
}
@ -354,8 +360,8 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) {
addr, td, _, cleanup := setupServer(t)
defer cleanup()
adsChan := make(chan proto.Message, 10)
newADS := func(ctx context.Context, i proto.Message) error {
adsChan := make(chan *xdsclient.EDSUpdate, 10)
newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error {
adsChan <- i
return nil
}

View File

@ -28,7 +28,6 @@ import (
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
@ -39,6 +38,7 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
var lbABuilder = &balancerABuilder{}
@ -178,7 +178,7 @@ type scStateChange struct {
type fakeEDSBalancer struct {
cc balancer.ClientConn
edsChan chan *xdspb.ClusterLoadAssignment
edsChan chan *xdsclient.EDSUpdate
childPolicy chan *loadBalancingConfig
fallbackPolicy chan *loadBalancingConfig
subconnStateChange chan *scStateChange
@ -195,7 +195,7 @@ func (f *fakeEDSBalancer) Close() {
latestFakeEdsBalancer = nil
}
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) {
func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) {
f.edsChan <- edsResp
}
@ -209,7 +209,7 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage)
func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface {
lb := &fakeEDSBalancer{
cc: cc,
edsChan: make(chan *xdspb.ClusterLoadAssignment, 10),
edsChan: make(chan *xdsclient.EDSUpdate, 10),
childPolicy: make(chan *loadBalancingConfig, 10),
fallbackPolicy: make(chan *loadBalancingConfig, 10),
subconnStateChange: make(chan *scStateChange, 10),
@ -349,7 +349,11 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) {
if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations
select {
case gotEDS := <-edsLB.edsChan:
if !proto.Equal(gotEDS, testClusterLoadAssignment) {
want, err := xdsclient.ParseEDSRespProto(testClusterLoadAssignment)
if err != nil {
t.Fatalf("parsing wanted EDS response failed: %v", err)
}
if !cmp.Equal(gotEDS, want) {
t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignment)
}
case <-time.After(time.Second):

163
xds/internal/client/eds.go Normal file
View File

@ -0,0 +1,163 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"fmt"
"net"
"strconv"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
"google.golang.org/grpc/xds/internal"
)
// OverloadDropConfig contains the config to drop overloads.
type OverloadDropConfig struct {
Category string
Numerator uint32
Denominator uint32
}
// EndpointHealthStatus represents the health status of an endpoint.
type EndpointHealthStatus int32
const (
// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
EndpointHealthStatusUnknown EndpointHealthStatus = iota
// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
EndpointHealthStatusHealthy
// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
EndpointHealthStatusUnhealthy
// EndpointHealthStatusDraining represents HealthStatus DRAINING.
EndpointHealthStatusDraining
// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
EndpointHealthStatusTimeout
// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
EndpointHealthStatusDegraded
)
// Endpoint contains information of an endpoint.
type Endpoint struct {
Address string
HealthStatus EndpointHealthStatus
Weight uint32
}
// Locality contains information of a locality.
type Locality struct {
Endpoints []Endpoint
ID internal.Locality
Priority uint32
Weight uint32
}
// EDSUpdate contains an EDS update.
type EDSUpdate struct {
Drops []OverloadDropConfig
Localities []Locality
}
func parseAddress(socketAddress *corepb.SocketAddress) string {
return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue())))
}
func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig {
percentage := dropPolicy.GetDropPercentage()
var (
numerator = percentage.GetNumerator()
denominator uint32
)
switch percentage.GetDenominator() {
case typepb.FractionalPercent_HUNDRED:
denominator = 100
case typepb.FractionalPercent_TEN_THOUSAND:
denominator = 10000
case typepb.FractionalPercent_MILLION:
denominator = 1000000
}
return OverloadDropConfig{
Category: dropPolicy.GetCategory(),
Numerator: numerator,
Denominator: denominator,
}
}
func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint {
endpoints := make([]Endpoint, 0, len(lbEndpoints))
for _, lbEndpoint := range lbEndpoints {
endpoints = append(endpoints, Endpoint{
HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()),
Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()),
Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(),
})
}
return endpoints
}
// ParseEDSRespProto turns EDS response proto message to EDSUpdate.
//
// This is temporarily exported to be used in eds balancer, before it switches
// to use xds client. TODO: unexport.
func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) {
ret := &EDSUpdate{}
for _, dropPolicy := range m.GetPolicy().GetDropOverloads() {
ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy))
}
priorities := make(map[uint32]struct{})
for _, locality := range m.Endpoints {
l := locality.GetLocality()
if l == nil {
return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality)
}
lid := internal.Locality{
Region: l.Region,
Zone: l.Zone,
SubZone: l.SubZone,
}
priority := locality.GetPriority()
priorities[priority] = struct{}{}
ret.Localities = append(ret.Localities, Locality{
ID: lid,
Endpoints: parseEndpoints(locality.GetLbEndpoints()),
Weight: locality.GetLoadBalancingWeight().GetValue(),
Priority: priority,
})
}
for i := 0; i < len(priorities); i++ {
if _, ok := priorities[uint32(i)]; !ok {
return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities)
}
}
return ret, nil
}
// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails.
// This is used by EDS balancer tests.
//
// TODO: delete this. The EDS balancer should build an EDSUpdate directly,
// instead of building and parsing a proto message.
func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate {
u, err := ParseEDSRespProto(m)
if err != nil {
panic(err.Error())
}
return u
}

View File

@ -0,0 +1,39 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package client
import (
"testing"
)
// Only error cases are tested, normal cases are covered because EDS balancer
// tests build an EDS responses and parses them.
// TODO: add more tests, with error cases and normal cases.
// Test that parsing fails if EDS response doesn't have all priorities.
// Priorities should range from 0 (highest) to N (lowest) without skipping
func TestParseEDSRespProtoPriorityError(t *testing.T) {
clab0 := NewClusterLoadAssignmentBuilder("test", nil)
clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil)
clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil)
_, err := ParseEDSRespProto(clab0.Build())
if err == nil {
t.Errorf("ParseEDSRespProto() error = %v, wantErr <non-nil>", err)
return
}
}

View File

@ -0,0 +1,117 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// All structs/functions in this file should be unexported. They are used in EDS
// balancer tests now, to generate test inputs. Eventually, EDS balancer tests
// should generate EDSUpdate directly, instead of generating and parsing the
// proto message.
// TODO: unexported everything in this file.
package client
import (
"fmt"
"net"
"strconv"
xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
typepb "github.com/envoyproxy/go-control-plane/envoy/type"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
)
// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS
// response.
type ClusterLoadAssignmentBuilder struct {
v *xdspb.ClusterLoadAssignment
}
// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder.
func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder {
var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload
for i, d := range dropPercents {
drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{
Category: fmt.Sprintf("test-drop-%d", i),
DropPercentage: &typepb.FractionalPercent{
Numerator: d,
Denominator: typepb.FractionalPercent_HUNDRED,
},
})
}
return &ClusterLoadAssignmentBuilder{
v: &xdspb.ClusterLoadAssignment{
ClusterName: clusterName,
Policy: &xdspb.ClusterLoadAssignment_Policy{
DropOverloads: drops,
},
},
}
}
// AddLocalityOptions contains options when adding locality to the builder.
type AddLocalityOptions struct {
Health []corepb.HealthStatus
}
// AddLocality adds a locality to the builder.
func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) {
var lbEndPoints []*endpointpb.LbEndpoint
for i, a := range addrsWithPort {
host, portStr, err := net.SplitHostPort(a)
if err != nil {
panic("failed to split " + a)
}
port, err := strconv.Atoi(portStr)
if err != nil {
panic("failed to atoi " + portStr)
}
lbe := &endpointpb.LbEndpoint{
HostIdentifier: &endpointpb.LbEndpoint_Endpoint{
Endpoint: &endpointpb.Endpoint{
Address: &corepb.Address{
Address: &corepb.Address_SocketAddress{
SocketAddress: &corepb.SocketAddress{
Protocol: corepb.SocketAddress_TCP,
Address: host,
PortSpecifier: &corepb.SocketAddress_PortValue{
PortValue: uint32(port)}}}}}},
}
if opts != nil && i < len(opts.Health) {
lbe.HealthStatus = opts.Health[i]
}
lbEndPoints = append(lbEndPoints, lbe)
}
clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{
Locality: &corepb.Locality{
Region: "",
Zone: "",
SubZone: subzone,
},
LbEndpoints: lbEndPoints,
LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight},
Priority: priority,
})
}
// Build builds ClusterLoadAssignment.
func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment {
return clab.v
}

View File

@ -30,6 +30,8 @@ import (
//
// This struct should only be used as map keys. Use the proto message directly
// in all other places.
//
// TODO: rename to LocalityID.
type Locality struct {
Region string
Zone string