diff --git a/xds/internal/balancer/edsbalancer/edsbalancer.go b/xds/internal/balancer/edsbalancer/edsbalancer.go index 7b734a05..a13bcc54 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer.go @@ -20,16 +20,10 @@ package edsbalancer import ( "context" "encoding/json" - "net" "reflect" - "strconv" "sync" "time" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - typepb "github.com/envoyproxy/go-control-plane/envoy/type" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/weightedroundrobin" @@ -40,6 +34,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/lrs" + xdsclient "google.golang.org/grpc/xds/internal/client" ) // TODO: make this a environment variable? @@ -154,26 +149,17 @@ func (xdsB *EDSBalancer) HandleChildPolicy(name string, config json.RawMessage) // updateDrops compares new drop policies with the old. If they are different, // it updates the drop policies and send ClientConn an updated picker. -func (xdsB *EDSBalancer) updateDrops(dropPolicies []*xdspb.ClusterLoadAssignment_Policy_DropOverload) { +func (xdsB *EDSBalancer) updateDrops(dropPolicies []xdsclient.OverloadDropConfig) { var ( newDrops []*dropper dropsChanged bool ) for i, dropPolicy := range dropPolicies { - percentage := dropPolicy.GetDropPercentage() var ( - numerator = percentage.GetNumerator() - denominator uint32 + numerator = dropPolicy.Numerator + denominator = dropPolicy.Denominator ) - switch percentage.GetDenominator() { - case typepb.FractionalPercent_HUNDRED: - denominator = 100 - case typepb.FractionalPercent_TEN_THOUSAND: - denominator = 10000 - case typepb.FractionalPercent_MILLION: - denominator = 1000000 - } - newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.GetCategory())) + newDrops = append(newDrops, newDropper(numerator, denominator, dropPolicy.Category)) // The following reading xdsB.drops doesn't need mutex because it can only // be updated by the code following. @@ -203,7 +189,7 @@ func (xdsB *EDSBalancer) updateDrops(dropPolicies []*xdspb.ClusterLoadAssignment // SubConns. It also handles drops. // // HandleChildPolicy and HandleEDSResponse must be called by the same goroutine. -func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) { +func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) { // TODO: Unhandled fields from EDS response: // - edsResp.GetPolicy().GetOverprovisioningFactor() // - locality.GetPriority() @@ -213,7 +199,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) // - socketAddress.GetNamedPort(), socketAddress.GetResolverName() // - resolve endpoint's name with another resolver - xdsB.updateDrops(edsResp.GetPolicy().GetDropOverloads()) + xdsB.updateDrops(edsResp.Drops) // Filter out all localities with weight 0. // @@ -224,12 +210,12 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) // // In the future, we should look at the config in CDS response and decide // whether locality weight matters. - newLocalitiesWithPriority := make(map[priorityType][]*endpointpb.LocalityLbEndpoints) - for _, locality := range edsResp.Endpoints { - if locality.GetLoadBalancingWeight().GetValue() == 0 { + newLocalitiesWithPriority := make(map[priorityType][]xdsclient.Locality) + for _, locality := range edsResp.Localities { + if locality.Weight == 0 { continue } - priority := newPriorityType(locality.GetPriority()) + priority := newPriorityType(locality.Priority) newLocalitiesWithPriority[priority] = append(newLocalitiesWithPriority[priority], locality) } @@ -281,7 +267,7 @@ func (xdsB *EDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) } } -func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []*endpointpb.LocalityLbEndpoints) { +func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithConfig, newLocalities []xdsclient.Locality) { // newLocalitiesSet contains all names of localities in the new EDS response // for the same priority. It's used to delete localities that are removed in // the new EDS response. @@ -289,37 +275,26 @@ func (xdsB *EDSBalancer) handleEDSResponsePerPriority(bgwc *balancerGroupWithCon for _, locality := range newLocalities { // One balancer for each locality. - l := locality.GetLocality() - if l == nil { - grpclog.Warningf("xds: received LocalityLbEndpoints with Locality") - continue - } - lid := internal.Locality{ - Region: l.Region, - Zone: l.Zone, - SubZone: l.SubZone, - } + lid := locality.ID newLocalitiesSet[lid] = struct{}{} - newWeight := locality.GetLoadBalancingWeight().GetValue() + newWeight := locality.Weight var newAddrs []resolver.Address - for _, lbEndpoint := range locality.GetLbEndpoints() { + for _, lbEndpoint := range locality.Endpoints { // Filter out all "unhealthy" endpoints (unknown and // healthy are both considered to be healthy: // https://www.envoyproxy.io/docs/envoy/latest/api-v2/api/v2/core/health_check.proto#envoy-api-enum-core-healthstatus). - if lbEndpoint.GetHealthStatus() != corepb.HealthStatus_HEALTHY && - lbEndpoint.GetHealthStatus() != corepb.HealthStatus_UNKNOWN { + if lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusHealthy && + lbEndpoint.HealthStatus != xdsclient.EndpointHealthStatusUnknown { continue } - socketAddress := lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress() address := resolver.Address{ - Addr: net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))), + Addr: lbEndpoint.Address, } - if xdsB.subBalancerBuilder.Name() == weightedroundrobin.Name && - lbEndpoint.GetLoadBalancingWeight().GetValue() != 0 { + if xdsB.subBalancerBuilder.Name() == weightedroundrobin.Name && lbEndpoint.Weight != 0 { address.Metadata = &weightedroundrobin.AddrInfo{ - Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), + Weight: lbEndpoint.Weight, } } newAddrs = append(newAddrs, address) diff --git a/xds/internal/balancer/edsbalancer/edsbalancer_test.go b/xds/internal/balancer/edsbalancer/edsbalancer_test.go index 33327a6d..edc4b411 100644 --- a/xds/internal/balancer/edsbalancer/edsbalancer_test.go +++ b/xds/internal/balancer/edsbalancer/edsbalancer_test.go @@ -19,24 +19,19 @@ package edsbalancer import ( "context" "fmt" - "net" "reflect" "sort" - "strconv" "testing" "time" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" - endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" - typepb "github.com/envoyproxy/go-control-plane/envoy/type" - typespb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal" + xdsclient "google.golang.org/grpc/xds/internal/client" ) var ( @@ -51,81 +46,6 @@ func init() { } } -type clusterLoadAssignmentBuilder struct { - v *xdspb.ClusterLoadAssignment -} - -func newClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *clusterLoadAssignmentBuilder { - var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload - for i, d := range dropPercents { - drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{ - Category: fmt.Sprintf("test-drop-%d", i), - DropPercentage: &typepb.FractionalPercent{ - Numerator: d, - Denominator: typepb.FractionalPercent_HUNDRED, - }, - }) - } - - return &clusterLoadAssignmentBuilder{ - v: &xdspb.ClusterLoadAssignment{ - ClusterName: clusterName, - Policy: &xdspb.ClusterLoadAssignment_Policy{ - DropOverloads: drops, - }, - }, - } -} - -type addLocalityOptions struct { - health []corepb.HealthStatus -} - -func (clab *clusterLoadAssignmentBuilder) addLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *addLocalityOptions) { - var lbEndPoints []*endpointpb.LbEndpoint - for i, a := range addrsWithPort { - host, portStr, err := net.SplitHostPort(a) - if err != nil { - panic("failed to split " + a) - } - port, err := strconv.Atoi(portStr) - if err != nil { - panic("failed to atoi " + portStr) - } - - lbe := &endpointpb.LbEndpoint{ - HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ - Endpoint: &endpointpb.Endpoint{ - Address: &corepb.Address{ - Address: &corepb.Address_SocketAddress{ - SocketAddress: &corepb.SocketAddress{ - Protocol: corepb.SocketAddress_TCP, - Address: host, - PortSpecifier: &corepb.SocketAddress_PortValue{ - PortValue: uint32(port)}}}}}}, - } - if opts != nil && i < len(opts.health) { - lbe.HealthStatus = opts.health[i] - } - lbEndPoints = append(lbEndPoints, lbe) - } - - clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{ - Locality: &corepb.Locality{ - Region: "", - Zone: "", - SubZone: subzone, - }, - LbEndpoints: lbEndPoints, - LoadBalancingWeight: &typespb.UInt32Value{Value: weight}, - Priority: priority, - }) -} - -func (clab *clusterLoadAssignmentBuilder) build() *xdspb.ClusterLoadAssignment { - return clab.v -} - // One locality // - add backend // - remove backend @@ -136,9 +56,9 @@ func TestEDS_OneLocality(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // One locality with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) sc1 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc1, connectivity.Connecting) @@ -154,9 +74,9 @@ func TestEDS_OneLocality(t *testing.T) { } // The same locality, add one more backend. - clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil) - edsb.HandleEDSResponse(clab2.build()) + clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build())) sc2 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc2, connectivity.Connecting) @@ -173,9 +93,9 @@ func TestEDS_OneLocality(t *testing.T) { } // The same locality, delete first backend. - clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab3.build()) + clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build())) scToRemove := <-cc.removeSubConnCh if !reflect.DeepEqual(scToRemove, sc1) { @@ -193,9 +113,9 @@ func TestEDS_OneLocality(t *testing.T) { } // The same locality, replace backend. - clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab4.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab4.build()) + clab4 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab4.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab4.Build())) sc3 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc3, connectivity.Connecting) @@ -216,9 +136,9 @@ func TestEDS_OneLocality(t *testing.T) { } // The same locality, different drop rate, dropping 50%. - clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50}) - clab5.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab5.build()) + clab5 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], []uint32{50}) + clab5.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab5.Build())) // Picks with drops. p5 := <-cc.newPickerCh @@ -245,9 +165,9 @@ func TestEDS_TwoLocalities(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) sc1 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc1, connectivity.Connecting) edsb.HandleSubConnStateChange(sc1, connectivity.Ready) @@ -255,8 +175,8 @@ func TestEDS_TwoLocalities(t *testing.T) { // Add the second locality later to make sure sc2 belongs to the second // locality. Otherwise the test is flaky because of a map is used in EDS to // keep localities. - clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) sc2 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc2, connectivity.Connecting) edsb.HandleSubConnStateChange(sc2, connectivity.Ready) @@ -272,11 +192,11 @@ func TestEDS_TwoLocalities(t *testing.T) { } // Add another locality, with one backend. - clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab2.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - clab2.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab2.build()) + clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab2.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + clab2.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build())) sc3 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc3, connectivity.Connecting) @@ -293,10 +213,10 @@ func TestEDS_TwoLocalities(t *testing.T) { } // Remove first locality. - clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - clab3.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab3.build()) + clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + clab3.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build())) scToRemove := <-cc.removeSubConnCh if !reflect.DeepEqual(scToRemove, sc1) { @@ -315,10 +235,10 @@ func TestEDS_TwoLocalities(t *testing.T) { } // Add a backend to the last locality. - clab4 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab4.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - clab4.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - edsb.HandleEDSResponse(clab4.build()) + clab4 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab4.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + clab4.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab4.Build())) sc4 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc4, connectivity.Connecting) @@ -338,10 +258,10 @@ func TestEDS_TwoLocalities(t *testing.T) { } // Change weight of the locality[1]. - clab5 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab5.addLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil) - clab5.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - edsb.HandleEDSResponse(clab5.build()) + clab5 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab5.AddLocality(testSubZones[1], 2, 0, testEndpointAddrs[1:2], nil) + clab5.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab5.Build())) // Test pick with two subconns different locality weight. p5 := <-cc.newPickerCh @@ -357,10 +277,10 @@ func TestEDS_TwoLocalities(t *testing.T) { } // Change weight of the locality[1] to 0, it should never be picked. - clab6 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab6.addLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil) - clab6.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) - edsb.HandleEDSResponse(clab6.build()) + clab6 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab6.AddLocality(testSubZones[1], 0, 0, testEndpointAddrs[1:2], nil) + clab6.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:4], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab6.Build())) // Changing weight of locality[1] to 0 caused it to be removed. It's subconn // should also be removed. @@ -393,9 +313,9 @@ func TestEDS_EndpointsHealth(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, each 3 backend, one Healthy, one Unhealthy, one Unknown. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &addLocalityOptions{ - health: []corepb.HealthStatus{ + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:6], &xdsclient.AddLocalityOptions{ + Health: []corepb.HealthStatus{ corepb.HealthStatus_HEALTHY, corepb.HealthStatus_UNHEALTHY, corepb.HealthStatus_UNKNOWN, @@ -404,8 +324,8 @@ func TestEDS_EndpointsHealth(t *testing.T) { corepb.HealthStatus_DEGRADED, }, }) - clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &addLocalityOptions{ - health: []corepb.HealthStatus{ + clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[6:12], &xdsclient.AddLocalityOptions{ + Health: []corepb.HealthStatus{ corepb.HealthStatus_HEALTHY, corepb.HealthStatus_UNHEALTHY, corepb.HealthStatus_UNKNOWN, @@ -414,7 +334,7 @@ func TestEDS_EndpointsHealth(t *testing.T) { corepb.HealthStatus_DEGRADED, }, }) - edsb.HandleEDSResponse(clab1.build()) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) var ( readySCs []balancer.SubConn @@ -527,10 +447,10 @@ func TestEDS_UpdateSubBalancerName(t *testing.T) { edsb.HandleChildPolicy("test-const-balancer", nil) // Two localities, each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) for i := 0; i < 2; i++ { sc := <-cc.newSubConnCh @@ -691,9 +611,9 @@ func TestEDS_LoadReport(t *testing.T) { backendToBalancerID := make(map[balancer.SubConn]internal.Locality) // Two localities, each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) sc1 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc1, connectivity.Connecting) edsb.HandleSubConnStateChange(sc1, connectivity.Ready) @@ -704,8 +624,8 @@ func TestEDS_LoadReport(t *testing.T) { // Add the second locality later to make sure sc2 belongs to the second // locality. Otherwise the test is flaky because of a map is used in EDS to // keep localities. - clab1.addLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1.AddLocality(testSubZones[1], 1, 0, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) sc2 := <-cc.newSubConnCh edsb.HandleSubConnStateChange(sc2, connectivity.Connecting) edsb.HandleSubConnStateChange(sc2, connectivity.Ready) diff --git a/xds/internal/balancer/edsbalancer/priority_test.go b/xds/internal/balancer/edsbalancer/priority_test.go index b2eb49b6..73d9cc63 100644 --- a/xds/internal/balancer/edsbalancer/priority_test.go +++ b/xds/internal/balancer/edsbalancer/priority_test.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + xdsclient "google.golang.org/grpc/xds/internal/client" ) // When a high priority is ready, adding/removing lower locality doesn't cause @@ -36,10 +37,10 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with priorities [0, 1], each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) addrs1 := <-cc.newSubConnAddrsCh if got, want := addrs1[0].Addr, testEndpointAddrs[0]; got != want { @@ -63,11 +64,11 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) { } // Add p2, it shouldn't cause any udpates. - clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - clab2.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab2.build()) + clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build())) select { case <-cc.newPickerCh: @@ -80,10 +81,10 @@ func TestEDSPriority_HighPriorityReady(t *testing.T) { } // Remove p2, no updates. - clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab3.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab3.build()) + clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build())) select { case <-cc.newPickerCh: @@ -105,10 +106,10 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with priorities [0, 1], each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) addrs0 := <-cc.newSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -151,11 +152,11 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { } // Add p2, it shouldn't cause any udpates. - clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - clab2.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab2.build()) + clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build())) select { case <-cc.newPickerCh: @@ -187,10 +188,10 @@ func TestEDSPriority_SwitchPriority(t *testing.T) { } // Remove 2, use 1. - clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab3.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab3.build()) + clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab3.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build())) // p2 SubConns are removed. scToRemove := <-cc.removeSubConnCh @@ -215,10 +216,10 @@ func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with different priorities, each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) addrs0 := <-cc.newSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -245,11 +246,11 @@ func TestEDSPriority_HigherDownWhileAddingLower(t *testing.T) { } // Add p2, it should create a new SubConn. - clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - clab2.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab2.build()) + clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + clab2.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build())) addrs2 := <-cc.newSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { @@ -279,11 +280,11 @@ func TestEDSPriority_HigherReadyCloseAllLower(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with priorities [0,1,2], each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - clab1.addLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + clab1.AddLocality(testSubZones[2], 1, 2, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) addrs0 := <-cc.newSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -358,10 +359,10 @@ func TestEDSPriority_InitTimeout(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with different priorities, each with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) addrs0 := <-cc.newSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -407,10 +408,10 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with different priorities, each with one backend. - clab0 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab0.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab0.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab0.build()) + clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab0.Build())) addrs0 := <-cc.newSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -472,12 +473,12 @@ func TestEDSPriority_MultipleLocalities(t *testing.T) { } // Add two localities, with two priorities, with one backend. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab1.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab1.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - clab1.addLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) - clab1.addLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab1.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + clab1.AddLocality(testSubZones[2], 1, 0, testEndpointAddrs[2:3], nil) + clab1.AddLocality(testSubZones[3], 1, 1, testEndpointAddrs[3:4], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) addrs2 := <-cc.newSubConnAddrsCh if got, want := addrs2[0].Addr, testEndpointAddrs[2]; got != want { @@ -534,10 +535,10 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { edsb := NewXDSBalancer(cc, nil) // Two localities, with different priorities, each with one backend. - clab0 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab0.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) - clab0.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) - edsb.HandleEDSResponse(clab0.build()) + clab0 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab0.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + clab0.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[1:2], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab0.Build())) addrs0 := <-cc.newSubConnAddrsCh if got, want := addrs0[0].Addr, testEndpointAddrs[0]; got != want { @@ -559,8 +560,8 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { } // Remove all priorities. - clab1 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - edsb.HandleEDSResponse(clab1.build()) + clab1 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab1.Build())) // p0 subconn should be removed. scToRemove := <-cc.removeSubConnCh @@ -577,10 +578,10 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { } // Re-add two localities, with previous priorities, but different backends. - clab2 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab2.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - clab2.addLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil) - edsb.HandleEDSResponse(clab2.build()) + clab2 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab2.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) + clab2.AddLocality(testSubZones[1], 1, 1, testEndpointAddrs[3:4], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab2.Build())) addrs01 := <-cc.newSubConnAddrsCh if got, want := addrs01[0].Addr, testEndpointAddrs[2]; got != want { @@ -612,9 +613,9 @@ func TestEDSPriority_RemovesAllLocalities(t *testing.T) { } // Remove p1 from EDS, to fallback to p0. - clab3 := newClusterLoadAssignmentBuilder(testClusterNames[0], nil) - clab3.addLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) - edsb.HandleEDSResponse(clab3.build()) + clab3 := xdsclient.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[2:3], nil) + edsb.HandleEDSResponse(xdsclient.ParseEDSRespProtoForTesting(clab3.Build())) // p1 subconn should be removed. scToRemove1 := <-cc.removeSubConnCh diff --git a/xds/internal/balancer/xds.go b/xds/internal/balancer/xds.go index 601762b4..b119231a 100644 --- a/xds/internal/balancer/xds.go +++ b/xds/internal/balancer/xds.go @@ -27,8 +27,6 @@ import ( "sync" "time" - xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" - "github.com/golang/protobuf/proto" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -36,6 +34,7 @@ import ( "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/edsbalancer" "google.golang.org/grpc/xds/internal/balancer/lrs" + xdsclient "google.golang.org/grpc/xds/internal/client" ) const ( @@ -101,7 +100,7 @@ func (b *xdsBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadB // It's implemented by the real eds balancer and a fake testing eds balancer. type edsBalancerInterface interface { // HandleEDSResponse passes the received EDS message from traffic director to eds balancer. - HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) + HandleEDSResponse(edsResp *xdsclient.EDSUpdate) // HandleChildPolicy updates the eds balancer the intra-cluster load balancing policy to use. HandleChildPolicy(name string, config json.RawMessage) // HandleSubConnStateChange handles state change for SubConn. @@ -158,7 +157,7 @@ func (x *xdsBalancer) startNewXDSClient(u *XDSConfig) { var haveGotADS bool // set up callbacks for the xds client. - newADS := func(ctx context.Context, resp proto.Message) error { + newADS := func(ctx context.Context, resp *xdsclient.EDSUpdate) error { if !haveGotADS { if prevClient != nil { prevClient.close() @@ -410,21 +409,12 @@ func (x *xdsBalancer) UpdateClientConnState(s balancer.ClientConnState) error { type edsResp struct { ctx context.Context - resp *xdspb.ClusterLoadAssignment + resp *xdsclient.EDSUpdate } -func (x *xdsBalancer) newADSResponse(ctx context.Context, resp proto.Message) error { - var update interface{} - switch u := resp.(type) { - case *xdspb.ClusterLoadAssignment: - // nothing to check - update = &edsResp{ctx: ctx, resp: u} - default: - grpclog.Warningf("xdsBalancer: got a response that's not EDS, type = %T", u) - } - +func (x *xdsBalancer) newADSResponse(ctx context.Context, resp *xdsclient.EDSUpdate) error { select { - case x.xdsClientUpdate <- update: + case x.xdsClientUpdate <- &edsResp{ctx: ctx, resp: resp}: case <-x.ctx.Done(): case <-ctx.Done(): } diff --git a/xds/internal/balancer/xds_client.go b/xds/internal/balancer/xds_client.go index e89bc22f..48ac96f9 100644 --- a/xds/internal/balancer/xds_client.go +++ b/xds/internal/balancer/xds_client.go @@ -26,7 +26,6 @@ import ( xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" xdsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" - "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -34,6 +33,7 @@ import ( "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/xds/internal/balancer/lrs" + xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" ) @@ -52,7 +52,7 @@ type client struct { cli xdsgrpc.AggregatedDiscoveryServiceClient dialer func(context.Context, string) (net.Conn, error) channelzParentID int64 - newADS func(ctx context.Context, resp proto.Message) error + newADS func(ctx context.Context, resp *xdsclient.EDSUpdate) error loseContact func(ctx context.Context) cleanup func() backoff backoff.Strategy @@ -197,7 +197,12 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) { grpclog.Warningf("xds: failed to unmarshal resources due to %v.", err) return } - if err := c.newADS(c.ctx, adsResp.Message); err != nil { + parsed, err := xdsclient.ParseEDSRespProto(adsResp.Message.(*xdspb.ClusterLoadAssignment)) + if err != nil { + grpclog.Warningf("xds: parsing new EDS message failed due to %v.", err) + return + } + if err := c.newADS(c.ctx, parsed); err != nil { grpclog.Warningf("xds: processing new ADS message failed due to %v.", err) return } @@ -213,7 +218,7 @@ func (c *client) adsCallAttempt() (firstRespReceived bool) { } } -func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, proto.Message) error, loseContact func(ctx context.Context), exitCleanup func()) *client { +func newXDSClient(balancerName string, edsServiceName string, opts balancer.BuildOptions, loadStore lrs.Store, newADS func(context.Context, *xdsclient.EDSUpdate) error, loseContact func(ctx context.Context), exitCleanup func()) *client { c := &client{ serviceName: edsServiceName, dialer: opts.Dialer, diff --git a/xds/internal/balancer/xds_client_test.go b/xds/internal/balancer/xds_client_test.go index d47bcf5e..36778391 100644 --- a/xds/internal/balancer/xds_client_test.go +++ b/xds/internal/balancer/xds_client_test.go @@ -35,11 +35,13 @@ import ( durationpb "github.com/golang/protobuf/ptypes/duration" structpb "github.com/golang/protobuf/ptypes/struct" wrpb "github.com/golang/protobuf/ptypes/wrappers" + "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/resolver" "google.golang.org/grpc/status" + xdsclient "google.golang.org/grpc/xds/internal/client" ) var ( @@ -233,8 +235,8 @@ func (s) TestXdsClientResponseHandling(t *testing.T) { func testXdsClientResponseHandling(t *testing.T, test *testConfig) { addr, td, _, cleanup := setupServer(t) defer cleanup() - adsChan := make(chan proto.Message, 10) - newADS := func(ctx context.Context, i proto.Message) error { + adsChan := make(chan *xdsclient.EDSUpdate, 10) + newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { adsChan <- i return nil } @@ -255,7 +257,11 @@ func testXdsClientResponseHandling(t *testing.T, test *testConfig) { for i, resp := range test.responsesToSend { td.sendResp(&response{resp: resp}) ads := <-adsChan - if !proto.Equal(ads, test.expectedADSResponses[i]) { + want, err := xdsclient.ParseEDSRespProto(test.expectedADSResponses[i].(*xdspb.ClusterLoadAssignment)) + if err != nil { + t.Fatalf("parsing wanted EDS response failed: %v", err) + } + if !cmp.Equal(ads, want) { t.Fatalf("received unexpected ads response, got %v, want %v", ads, test.expectedADSResponses[i]) } } @@ -286,8 +292,8 @@ func (s) TestXdsClientLoseContact(t *testing.T) { func testXdsClientLoseContactRemoteClose(t *testing.T, test *testConfig) { addr, td, _, cleanup := setupServer(t) defer cleanup() - adsChan := make(chan proto.Message, 10) - newADS := func(ctx context.Context, i proto.Message) error { + adsChan := make(chan *xdsclient.EDSUpdate, 10) + newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { adsChan <- i return nil } @@ -320,8 +326,8 @@ func testXdsClientLoseContactADSRelatedErrorOccur(t *testing.T, test *testConfig addr, td, _, cleanup := setupServer(t) defer cleanup() - adsChan := make(chan proto.Message, 10) - newADS := func(ctx context.Context, i proto.Message) error { + adsChan := make(chan *xdsclient.EDSUpdate, 10) + newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { adsChan <- i return test.adsErr } @@ -354,8 +360,8 @@ func (s) TestXdsClientExponentialRetry(t *testing.T) { addr, td, _, cleanup := setupServer(t) defer cleanup() - adsChan := make(chan proto.Message, 10) - newADS := func(ctx context.Context, i proto.Message) error { + adsChan := make(chan *xdsclient.EDSUpdate, 10) + newADS := func(ctx context.Context, i *xdsclient.EDSUpdate) error { adsChan <- i return nil } diff --git a/xds/internal/balancer/xds_test.go b/xds/internal/balancer/xds_test.go index 1577d4f4..235ccd64 100644 --- a/xds/internal/balancer/xds_test.go +++ b/xds/internal/balancer/xds_test.go @@ -28,7 +28,6 @@ import ( xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/golang/protobuf/jsonpb" - "github.com/golang/protobuf/proto" wrapperspb "github.com/golang/protobuf/ptypes/wrappers" "github.com/google/go-cmp/cmp" "google.golang.org/grpc/balancer" @@ -39,6 +38,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/lrs" + xdsclient "google.golang.org/grpc/xds/internal/client" ) var lbABuilder = &balancerABuilder{} @@ -178,7 +178,7 @@ type scStateChange struct { type fakeEDSBalancer struct { cc balancer.ClientConn - edsChan chan *xdspb.ClusterLoadAssignment + edsChan chan *xdsclient.EDSUpdate childPolicy chan *loadBalancingConfig fallbackPolicy chan *loadBalancingConfig subconnStateChange chan *scStateChange @@ -195,7 +195,7 @@ func (f *fakeEDSBalancer) Close() { latestFakeEdsBalancer = nil } -func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdspb.ClusterLoadAssignment) { +func (f *fakeEDSBalancer) HandleEDSResponse(edsResp *xdsclient.EDSUpdate) { f.edsChan <- edsResp } @@ -209,7 +209,7 @@ func (f *fakeEDSBalancer) HandleChildPolicy(name string, config json.RawMessage) func newFakeEDSBalancer(cc balancer.ClientConn, loadStore lrs.Store) edsBalancerInterface { lb := &fakeEDSBalancer{ cc: cc, - edsChan: make(chan *xdspb.ClusterLoadAssignment, 10), + edsChan: make(chan *xdsclient.EDSUpdate, 10), childPolicy: make(chan *loadBalancingConfig, 10), fallbackPolicy: make(chan *loadBalancingConfig, 10), subconnStateChange: make(chan *scStateChange, 10), @@ -349,7 +349,11 @@ func (s) TestXdsBalanceHandleBalancerConfigBalancerNameUpdate(t *testing.T) { if edsLB := getLatestEdsBalancer(); edsLB != nil { // edsLB won't change between the two iterations select { case gotEDS := <-edsLB.edsChan: - if !proto.Equal(gotEDS, testClusterLoadAssignment) { + want, err := xdsclient.ParseEDSRespProto(testClusterLoadAssignment) + if err != nil { + t.Fatalf("parsing wanted EDS response failed: %v", err) + } + if !cmp.Equal(gotEDS, want) { t.Fatalf("edsBalancer got eds: %v, want %v", gotEDS, testClusterLoadAssignment) } case <-time.After(time.Second): diff --git a/xds/internal/client/eds.go b/xds/internal/client/eds.go new file mode 100644 index 00000000..afc72cdc --- /dev/null +++ b/xds/internal/client/eds.go @@ -0,0 +1,163 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "fmt" + "net" + "strconv" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + typepb "github.com/envoyproxy/go-control-plane/envoy/type" + "google.golang.org/grpc/xds/internal" +) + +// OverloadDropConfig contains the config to drop overloads. +type OverloadDropConfig struct { + Category string + Numerator uint32 + Denominator uint32 +} + +// EndpointHealthStatus represents the health status of an endpoint. +type EndpointHealthStatus int32 + +const ( + // EndpointHealthStatusUnknown represents HealthStatus UNKNOWN. + EndpointHealthStatusUnknown EndpointHealthStatus = iota + // EndpointHealthStatusHealthy represents HealthStatus HEALTHY. + EndpointHealthStatusHealthy + // EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY. + EndpointHealthStatusUnhealthy + // EndpointHealthStatusDraining represents HealthStatus DRAINING. + EndpointHealthStatusDraining + // EndpointHealthStatusTimeout represents HealthStatus TIMEOUT. + EndpointHealthStatusTimeout + // EndpointHealthStatusDegraded represents HealthStatus DEGRADED. + EndpointHealthStatusDegraded +) + +// Endpoint contains information of an endpoint. +type Endpoint struct { + Address string + HealthStatus EndpointHealthStatus + Weight uint32 +} + +// Locality contains information of a locality. +type Locality struct { + Endpoints []Endpoint + ID internal.Locality + Priority uint32 + Weight uint32 +} + +// EDSUpdate contains an EDS update. +type EDSUpdate struct { + Drops []OverloadDropConfig + Localities []Locality +} + +func parseAddress(socketAddress *corepb.SocketAddress) string { + return net.JoinHostPort(socketAddress.GetAddress(), strconv.Itoa(int(socketAddress.GetPortValue()))) +} + +func parseDropPolicy(dropPolicy *xdspb.ClusterLoadAssignment_Policy_DropOverload) OverloadDropConfig { + percentage := dropPolicy.GetDropPercentage() + var ( + numerator = percentage.GetNumerator() + denominator uint32 + ) + switch percentage.GetDenominator() { + case typepb.FractionalPercent_HUNDRED: + denominator = 100 + case typepb.FractionalPercent_TEN_THOUSAND: + denominator = 10000 + case typepb.FractionalPercent_MILLION: + denominator = 1000000 + } + return OverloadDropConfig{ + Category: dropPolicy.GetCategory(), + Numerator: numerator, + Denominator: denominator, + } +} + +func parseEndpoints(lbEndpoints []*endpointpb.LbEndpoint) []Endpoint { + endpoints := make([]Endpoint, 0, len(lbEndpoints)) + for _, lbEndpoint := range lbEndpoints { + endpoints = append(endpoints, Endpoint{ + HealthStatus: EndpointHealthStatus(lbEndpoint.GetHealthStatus()), + Address: parseAddress(lbEndpoint.GetEndpoint().GetAddress().GetSocketAddress()), + Weight: lbEndpoint.GetLoadBalancingWeight().GetValue(), + }) + } + return endpoints +} + +// ParseEDSRespProto turns EDS response proto message to EDSUpdate. +// +// This is temporarily exported to be used in eds balancer, before it switches +// to use xds client. TODO: unexport. +func ParseEDSRespProto(m *xdspb.ClusterLoadAssignment) (*EDSUpdate, error) { + ret := &EDSUpdate{} + for _, dropPolicy := range m.GetPolicy().GetDropOverloads() { + ret.Drops = append(ret.Drops, parseDropPolicy(dropPolicy)) + } + priorities := make(map[uint32]struct{}) + for _, locality := range m.Endpoints { + l := locality.GetLocality() + if l == nil { + return nil, fmt.Errorf("EDS response contains a locality without ID, locality: %+v", locality) + } + lid := internal.Locality{ + Region: l.Region, + Zone: l.Zone, + SubZone: l.SubZone, + } + priority := locality.GetPriority() + priorities[priority] = struct{}{} + ret.Localities = append(ret.Localities, Locality{ + ID: lid, + Endpoints: parseEndpoints(locality.GetLbEndpoints()), + Weight: locality.GetLoadBalancingWeight().GetValue(), + Priority: priority, + }) + } + for i := 0; i < len(priorities); i++ { + if _, ok := priorities[uint32(i)]; !ok { + return nil, fmt.Errorf("priority %v missing (with different priorities %v received)", i, priorities) + } + } + return ret, nil +} + +// ParseEDSRespProtoForTesting parses EDS response, and panic if parsing fails. +// This is used by EDS balancer tests. +// +// TODO: delete this. The EDS balancer should build an EDSUpdate directly, +// instead of building and parsing a proto message. +func ParseEDSRespProtoForTesting(m *xdspb.ClusterLoadAssignment) *EDSUpdate { + u, err := ParseEDSRespProto(m) + if err != nil { + panic(err.Error()) + } + return u +} diff --git a/xds/internal/client/eds_test.go b/xds/internal/client/eds_test.go new file mode 100644 index 00000000..eb29ec61 --- /dev/null +++ b/xds/internal/client/eds_test.go @@ -0,0 +1,39 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package client + +import ( + "testing" +) + +// Only error cases are tested, normal cases are covered because EDS balancer +// tests build an EDS responses and parses them. +// TODO: add more tests, with error cases and normal cases. + +// Test that parsing fails if EDS response doesn't have all priorities. +// Priorities should range from 0 (highest) to N (lowest) without skipping +func TestParseEDSRespProtoPriorityError(t *testing.T) { + clab0 := NewClusterLoadAssignmentBuilder("test", nil) + clab0.AddLocality("locality-1", 1, 0, []string{"addr1:314"}, nil) + clab0.AddLocality("locality-2", 1, 2, []string{"addr2:159"}, nil) + _, err := ParseEDSRespProto(clab0.Build()) + if err == nil { + t.Errorf("ParseEDSRespProto() error = %v, wantErr ", err) + return + } +} diff --git a/xds/internal/client/eds_testutil.go b/xds/internal/client/eds_testutil.go new file mode 100644 index 00000000..49363150 --- /dev/null +++ b/xds/internal/client/eds_testutil.go @@ -0,0 +1,117 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// All structs/functions in this file should be unexported. They are used in EDS +// balancer tests now, to generate test inputs. Eventually, EDS balancer tests +// should generate EDSUpdate directly, instead of generating and parsing the +// proto message. +// TODO: unexported everything in this file. + +package client + +import ( + "fmt" + "net" + "strconv" + + xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2" + corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpointpb "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + typepb "github.com/envoyproxy/go-control-plane/envoy/type" + wrapperspb "github.com/golang/protobuf/ptypes/wrappers" +) + +// ClusterLoadAssignmentBuilder builds a ClusterLoadAssignment, aka EDS +// response. +type ClusterLoadAssignmentBuilder struct { + v *xdspb.ClusterLoadAssignment +} + +// NewClusterLoadAssignmentBuilder creates a ClusterLoadAssignmentBuilder. +func NewClusterLoadAssignmentBuilder(clusterName string, dropPercents []uint32) *ClusterLoadAssignmentBuilder { + var drops []*xdspb.ClusterLoadAssignment_Policy_DropOverload + for i, d := range dropPercents { + drops = append(drops, &xdspb.ClusterLoadAssignment_Policy_DropOverload{ + Category: fmt.Sprintf("test-drop-%d", i), + DropPercentage: &typepb.FractionalPercent{ + Numerator: d, + Denominator: typepb.FractionalPercent_HUNDRED, + }, + }) + } + + return &ClusterLoadAssignmentBuilder{ + v: &xdspb.ClusterLoadAssignment{ + ClusterName: clusterName, + Policy: &xdspb.ClusterLoadAssignment_Policy{ + DropOverloads: drops, + }, + }, + } +} + +// AddLocalityOptions contains options when adding locality to the builder. +type AddLocalityOptions struct { + Health []corepb.HealthStatus +} + +// AddLocality adds a locality to the builder. +func (clab *ClusterLoadAssignmentBuilder) AddLocality(subzone string, weight uint32, priority uint32, addrsWithPort []string, opts *AddLocalityOptions) { + var lbEndPoints []*endpointpb.LbEndpoint + for i, a := range addrsWithPort { + host, portStr, err := net.SplitHostPort(a) + if err != nil { + panic("failed to split " + a) + } + port, err := strconv.Atoi(portStr) + if err != nil { + panic("failed to atoi " + portStr) + } + + lbe := &endpointpb.LbEndpoint{ + HostIdentifier: &endpointpb.LbEndpoint_Endpoint{ + Endpoint: &endpointpb.Endpoint{ + Address: &corepb.Address{ + Address: &corepb.Address_SocketAddress{ + SocketAddress: &corepb.SocketAddress{ + Protocol: corepb.SocketAddress_TCP, + Address: host, + PortSpecifier: &corepb.SocketAddress_PortValue{ + PortValue: uint32(port)}}}}}}, + } + if opts != nil && i < len(opts.Health) { + lbe.HealthStatus = opts.Health[i] + } + lbEndPoints = append(lbEndPoints, lbe) + } + + clab.v.Endpoints = append(clab.v.Endpoints, &endpointpb.LocalityLbEndpoints{ + Locality: &corepb.Locality{ + Region: "", + Zone: "", + SubZone: subzone, + }, + LbEndpoints: lbEndPoints, + LoadBalancingWeight: &wrapperspb.UInt32Value{Value: weight}, + Priority: priority, + }) +} + +// Build builds ClusterLoadAssignment. +func (clab *ClusterLoadAssignmentBuilder) Build() *xdspb.ClusterLoadAssignment { + return clab.v +} diff --git a/xds/internal/internal.go b/xds/internal/internal.go index 850bd768..844a64dd 100644 --- a/xds/internal/internal.go +++ b/xds/internal/internal.go @@ -30,6 +30,8 @@ import ( // // This struct should only be used as map keys. Use the proto message directly // in all other places. +// +// TODO: rename to LocalityID. type Locality struct { Region string Zone string