From 95da23bbf9cf49915e41ec68b3b991804b9d62cd Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 26 Jun 2019 12:49:37 -0700 Subject: [PATCH] xds: load report for server loads (from trailer) (#2870) --- balancer/xds/edsbalancer/balancergroup.go | 13 ++ .../xds/edsbalancer/balancergroup_test.go | 19 +- balancer/xds/edsbalancer/test_util_test.go | 10 + balancer/xds/lrs/lrs.go | 120 +++++++++++- balancer/xds/lrs/lrs_test.go | 179 ++++++++++++------ 5 files changed, 272 insertions(+), 69 deletions(-) diff --git a/balancer/xds/edsbalancer/balancergroup.go b/balancer/xds/edsbalancer/balancergroup.go index ebd2aa37..62d54ea7 100644 --- a/balancer/xds/edsbalancer/balancergroup.go +++ b/balancer/xds/edsbalancer/balancergroup.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/balancer/internal/wrr" "google.golang.org/grpc/balancer/xds/internal" + orcapb "google.golang.org/grpc/balancer/xds/internal/proto/udpa/data/orca/v1/orca_load_report" "google.golang.org/grpc/balancer/xds/lrs" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/grpclog" @@ -329,6 +330,11 @@ func (pg *pickerGroup) Pick(ctx context.Context, opts balancer.PickOptions) (con return p.Pick(ctx, opts) } +const ( + serverLoadCPUName = "cpu_utilization" + serverLoadMemoryName = "mem_utilization" +) + type loadReportPicker struct { balancer.Picker @@ -351,6 +357,13 @@ func (lrp *loadReportPicker) Pick(ctx context.Context, opts balancer.PickOptions td := done done = func(info balancer.DoneInfo) { lrp.loadStore.CallFinished(lrp.id, info.Err) + if load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport); ok { + lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization) + lrp.loadStore.CallServerLoad(lrp.id, serverLoadMemoryName, load.MemUtilization) + for n, d := range load.RequestCostOrUtilization { + lrp.loadStore.CallServerLoad(lrp.id, n, d) + } + } if td != nil { td(info) } diff --git a/balancer/xds/edsbalancer/balancergroup_test.go b/balancer/xds/edsbalancer/balancergroup_test.go index 27c86719..e9cdaa54 100644 --- a/balancer/xds/edsbalancer/balancergroup_test.go +++ b/balancer/xds/edsbalancer/balancergroup_test.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/balancer/xds/internal" + orcapb "google.golang.org/grpc/balancer/xds/internal/proto/udpa/data/orca/v1/orca_load_report" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" ) @@ -415,14 +416,25 @@ func TestBalancerGroup_LoadReport(t *testing.T) { var ( wantStart []internal.Locality wantEnd []internal.Locality + wantCost []testServerLoad ) for i := 0; i < 10; i++ { sc, done, _ := p1.Pick(context.Background(), balancer.PickOptions{}) locality := backendToBalancerID[sc] wantStart = append(wantStart, locality) if done != nil && sc != sc1 { - done(balancer.DoneInfo{}) - wantEnd = append(wantEnd, backendToBalancerID[sc]) + done(balancer.DoneInfo{ + ServerLoad: &orcapb.OrcaLoadReport{ + CpuUtilization: 10, + MemUtilization: 5, + RequestCostOrUtilization: map[string]float64{"pi": 3.14}, + }, + }) + wantEnd = append(wantEnd, locality) + wantCost = append(wantCost, + testServerLoad{name: serverLoadCPUName, d: 10}, + testServerLoad{name: serverLoadMemoryName, d: 5}, + testServerLoad{name: "pi", d: 3.14}) } } @@ -432,4 +444,7 @@ func TestBalancerGroup_LoadReport(t *testing.T) { if !reflect.DeepEqual(testLoadStore.callsEnded, wantEnd) { t.Fatalf("want ended: %v, got: %v", testLoadStore.callsEnded, wantEnd) } + if !reflect.DeepEqual(testLoadStore.callsCost, wantCost) { + t.Fatalf("want cost: %v, got: %v", testLoadStore.callsCost, wantCost) + } } diff --git a/balancer/xds/edsbalancer/test_util_test.go b/balancer/xds/edsbalancer/test_util_test.go index 559a431e..79484893 100644 --- a/balancer/xds/edsbalancer/test_util_test.go +++ b/balancer/xds/edsbalancer/test_util_test.go @@ -123,9 +123,15 @@ func (tcc *testClientConn) Target() string { panic("not implemented") } +type testServerLoad struct { + name string + d float64 +} + type testLoadStore struct { callsStarted []internal.Locality callsEnded []internal.Locality + callsCost []testServerLoad } func newTestLoadStore() *testLoadStore { @@ -144,6 +150,10 @@ func (tls *testLoadStore) CallFinished(l internal.Locality, err error) { tls.callsEnded = append(tls.callsEnded, l) } +func (tls *testLoadStore) CallServerLoad(l internal.Locality, name string, d float64) { + tls.callsCost = append(tls.callsCost, testServerLoad{name: name, d: d}) +} + func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) { panic("not implemented") } diff --git a/balancer/xds/lrs/lrs.go b/balancer/xds/lrs/lrs.go index f82e235b..5b0ab562 100644 --- a/balancer/xds/lrs/lrs.go +++ b/balancer/xds/lrs/lrs.go @@ -43,6 +43,7 @@ type Store interface { CallDropped(category string) CallStarted(l internal.Locality) CallFinished(l internal.Locality, err error) + CallServerLoad(l internal.Locality, name string, d float64) ReportTo(ctx context.Context, cc *grpc.ClientConn) } @@ -51,6 +52,13 @@ type rpcCountData struct { succeeded *uint64 errored *uint64 inProgress *uint64 + + // Map from load name to load data (sum+count). Loading data from map is + // atomic, but updating data takes a lock, which could cause contention when + // multiple RPCs try to report loads for the same name. + // + // To fix the contention, shard this map. + serverLoads sync.Map // map[string]*rpcLoadData } func newRPCCountData() *rpcCountData { @@ -61,6 +69,75 @@ func newRPCCountData() *rpcCountData { } } +func (rcd *rpcCountData) incrSucceeded() { + atomic.AddUint64(rcd.succeeded, 1) +} + +func (rcd *rpcCountData) loadAndClearSucceeded() uint64 { + return atomic.SwapUint64(rcd.succeeded, 0) +} + +func (rcd *rpcCountData) incrErrored() { + atomic.AddUint64(rcd.errored, 1) +} + +func (rcd *rpcCountData) loadAndClearErrored() uint64 { + return atomic.SwapUint64(rcd.errored, 0) +} + +func (rcd *rpcCountData) incrInProgress() { + atomic.AddUint64(rcd.inProgress, 1) +} + +func (rcd *rpcCountData) decrInProgress() { + atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1) +} + +func (rcd *rpcCountData) loadInProgress() uint64 { + return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading. +} + +func (rcd *rpcCountData) addServerLoad(name string, d float64) { + loads, ok := rcd.serverLoads.Load(name) + if !ok { + tl := newRPCLoadData() + loads, _ = rcd.serverLoads.LoadOrStore(name, tl) + } + loads.(*rpcLoadData).add(d) +} + +// Data for server loads (from trailers or oob). Fields in this struct must be +// updated consistently. +// +// The current solution is to hold a lock, which could cause contention. To fix, +// shard serverLoads map in rpcCountData. +type rpcLoadData struct { + mu sync.Mutex + sum float64 + count uint64 +} + +func newRPCLoadData() *rpcLoadData { + return &rpcLoadData{} +} + +func (rld *rpcLoadData) add(v float64) { + rld.mu.Lock() + rld.sum += v + rld.count++ + rld.mu.Unlock() +} + +func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) { + rld.mu.Lock() + s = rld.sum + rld.sum = 0 + c = rld.count + rld.count = 0 + rld.mu.Unlock() + return +} + // lrsStore collects loads from xds balancer, and periodically sends load to the // server. type lrsStore struct { @@ -110,7 +187,7 @@ func (ls *lrsStore) CallStarted(l internal.Locality) { tp := newRPCCountData() p, _ = ls.localityRPCCount.LoadOrStore(l, tp) } - atomic.AddUint64(p.(*rpcCountData).inProgress, 1) + p.(*rpcCountData).incrInProgress() } func (ls *lrsStore) CallFinished(l internal.Locality, err error) { @@ -120,14 +197,24 @@ func (ls *lrsStore) CallFinished(l internal.Locality, err error) { // case where entry for call-finish is not found should never happen. return } - atomic.AddUint64(p.(*rpcCountData).inProgress, negativeOneUInt64) // atomic.Add(x, -1) + p.(*rpcCountData).decrInProgress() if err == nil { - atomic.AddUint64(p.(*rpcCountData).succeeded, 1) + p.(*rpcCountData).incrSucceeded() } else { - atomic.AddUint64(p.(*rpcCountData).errored, 1) + p.(*rpcCountData).incrErrored() } } +func (ls *lrsStore) CallServerLoad(l internal.Locality, name string, d float64) { + p, ok := ls.localityRPCCount.Load(l) + if !ok { + // The map is never cleared, only values in the map are reset. So the + // case where entry for CallServerLoad is not found should never happen. + return + } + p.(*rpcCountData).addServerLoad(name, d) +} + func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats { var ( totalDropped uint64 @@ -150,13 +237,30 @@ func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats tempLocality := locality.(internal.Locality) tempCount := countP.(*rpcCountData) - tempSucceeded := atomic.SwapUint64(tempCount.succeeded, 0) - tempInProgress := atomic.LoadUint64(tempCount.inProgress) // InProgress count is not clear when reading. - tempErrored := atomic.SwapUint64(tempCount.errored, 0) + tempSucceeded := tempCount.loadAndClearSucceeded() + tempInProgress := tempCount.loadInProgress() + tempErrored := tempCount.loadAndClearErrored() if tempSucceeded == 0 && tempInProgress == 0 && tempErrored == 0 { return true } + var loadMetricStats []*loadreportpb.EndpointLoadMetricStats + tempCount.serverLoads.Range(func(name, data interface{}) bool { + tempName := name.(string) + tempSum, tempCount := data.(*rpcLoadData).loadAndClear() + if tempCount == 0 { + return true + } + loadMetricStats = append(loadMetricStats, + &loadreportpb.EndpointLoadMetricStats{ + MetricName: tempName, + NumRequestsFinishedWithMetric: tempCount, + TotalMetricValue: tempSum, + }, + ) + return true + }) + localityStats = append(localityStats, &loadreportpb.UpstreamLocalityStats{ Locality: &basepb.Locality{ Region: tempLocality.Region, @@ -166,7 +270,7 @@ func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats TotalSuccessfulRequests: tempSucceeded, TotalRequestsInProgress: tempInProgress, TotalErrorRequests: tempErrored, - LoadMetricStats: nil, // TODO: populate for user loads. + LoadMetricStats: loadMetricStats, UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads. }) return true diff --git a/balancer/xds/lrs/lrs_test.go b/balancer/xds/lrs/lrs_test.go index de26a23f..1dbafd8b 100644 --- a/balancer/xds/lrs/lrs_test.go +++ b/balancer/xds/lrs/lrs_test.go @@ -25,7 +25,6 @@ import ( "reflect" "sort" "sync" - "sync/atomic" "testing" "time" @@ -51,37 +50,47 @@ var ( errTest = fmt.Errorf("test error") ) -func newRPCCountDataWithInitData(succeeded, errored, inprogress uint64) *rpcCountData { - return &rpcCountData{ - succeeded: &succeeded, - errored: &errored, - inProgress: &inprogress, +type rpcCountDataForTest struct { + succeeded uint64 + errored uint64 + inProgress uint64 + serverLoads map[string]float64 +} + +func newRPCCountDataForTest(succeeded, errored, inprogress uint64, serverLoads map[string]float64) *rpcCountDataForTest { + return &rpcCountDataForTest{ + succeeded: succeeded, + errored: errored, + inProgress: inprogress, + serverLoads: serverLoads, } } -func (rcd *rpcCountData) Equal(b *rpcCountData) bool { - return *rcd.inProgress == *b.inProgress && *rcd.errored == *b.errored && *rcd.succeeded == *b.succeeded +// Equal() is needed to compare unexported fields. +func (rcd *rpcCountDataForTest) Equal(b *rpcCountDataForTest) bool { + return rcd.inProgress == b.inProgress && + rcd.errored == b.errored && + rcd.succeeded == b.succeeded && + reflect.DeepEqual(rcd.serverLoads, b.serverLoads) } // equalClusterStats sorts requests and clear report internal before comparing. func equalClusterStats(a, b []*loadreportpb.ClusterStats) bool { - for _, s := range a { - sort.Slice(s.DroppedRequests, func(i, j int) bool { - return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category - }) - sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool { - return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String() - }) - s.LoadReportInterval = nil - } - for _, s := range b { - sort.Slice(s.DroppedRequests, func(i, j int) bool { - return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category - }) - sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool { - return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String() - }) - s.LoadReportInterval = nil + for _, t := range [][]*loadreportpb.ClusterStats{a, b} { + for _, s := range t { + sort.Slice(s.DroppedRequests, func(i, j int) bool { + return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category + }) + sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool { + return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String() + }) + for _, us := range s.UpstreamLocalityStats { + sort.Slice(us.LoadMetricStats, func(i, j int) bool { + return us.LoadMetricStats[i].MetricName < us.LoadMetricStats[j].MetricName + }) + } + s.LoadReportInterval = nil + } } return reflect.DeepEqual(a, b) } @@ -114,7 +123,7 @@ func Test_lrsStore_buildStats_drops(t *testing.T) { dropCategories[0]: 31, dropCategories[1]: 41, }, { - dropCategories[0]: 0, // This is shouldn't cause an empty report for category[0]. + dropCategories[0]: 0, // This shouldn't cause an empty report for category[0]. dropCategories[1]: 26, }}, }, @@ -172,48 +181,77 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) { name string rpcs []map[internal.Locality]struct { start, success, failure uint64 + serverData map[string]float64 // Will be reported with successful RPCs. } }{ { name: "one rpcCount report", rpcs: []map[internal.Locality]struct { start, success, failure uint64 + serverData map[string]float64 }{{ - localities[0]: {8, 3, 1}, + localities[0]: {8, 3, 1, nil}, }}, }, { - name: "two localities rpcCount reports", + name: "two localities one rpcCount report", rpcs: []map[internal.Locality]struct { start, success, failure uint64 + serverData map[string]float64 }{{ - localities[0]: {8, 3, 1}, - localities[1]: {15, 1, 5}, + localities[0]: {8, 3, 1, nil}, + localities[1]: {15, 1, 5, nil}, }}, }, { - name: "two rpcCount reports", + name: "three rpcCount reports", rpcs: []map[internal.Locality]struct { start, success, failure uint64 + serverData map[string]float64 }{{ - localities[0]: {8, 3, 1}, - localities[1]: {15, 1, 5}, + localities[0]: {8, 3, 1, nil}, + localities[1]: {15, 1, 5, nil}, }, { - localities[0]: {8, 3, 1}, + localities[0]: {8, 3, 1, nil}, }, { - localities[1]: {15, 1, 5}, + localities[1]: {15, 1, 5, nil}, }}, }, { name: "no empty report", rpcs: []map[internal.Locality]struct { start, success, failure uint64 + serverData map[string]float64 }{{ - localities[0]: {4, 3, 1}, - localities[1]: {7, 1, 5}, + localities[0]: {4, 3, 1, nil}, + localities[1]: {7, 1, 5, nil}, }, { - localities[0]: {0, 0, 0}, // This is shouldn't cause an empty report for locality[0]. - localities[1]: {1, 1, 0}, + localities[0]: {0, 0, 0, nil}, // This shouldn't cause an empty report for locality[0]. + localities[1]: {1, 1, 0, nil}, + }}, + }, + { + name: "two localities one report with server loads", + rpcs: []map[internal.Locality]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}}, + localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}}, + }}, + }, + { + name: "three reports with server loads", + rpcs: []map[internal.Locality]struct { + start, success, failure uint64 + serverData map[string]float64 + }{{ + localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}}, + localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}}, + }, { + localities[0]: {8, 3, 1, map[string]float64{"cpu": 1, "mem": 2}}, + }, { + localities[1]: {15, 4, 5, map[string]float64{"net": 13, "disk": 1.4}}, }}, }, } @@ -234,11 +272,22 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) { if count.success == 0 && tempInProgress == 0 && count.failure == 0 { continue } + var loadMetricStats []*loadreportpb.EndpointLoadMetricStats + for n, d := range count.serverData { + loadMetricStats = append(loadMetricStats, + &loadreportpb.EndpointLoadMetricStats{ + MetricName: n, + NumRequestsFinishedWithMetric: count.success, + TotalMetricValue: d * float64(count.success), + }, + ) + } upstreamLocalityStats = append(upstreamLocalityStats, &loadreportpb.UpstreamLocalityStats{ Locality: l.ToProto(), TotalSuccessfulRequests: count.success, TotalRequestsInProgress: tempInProgress, TotalErrorRequests: count.failure, + LoadMetricStats: loadMetricStats, }) } // InProgress count doesn't get cleared at each buildStats, and @@ -262,26 +311,29 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) { for l, count := range counts { for i := 0; i < int(count.success); i++ { wg.Add(1) - go func(i int, l internal.Locality) { + go func(l internal.Locality, serverData map[string]float64) { ls.CallStarted(l) ls.CallFinished(l, nil) + for n, d := range serverData { + ls.CallServerLoad(l, n, d) + } wg.Done() - }(i, l) + }(l, count.serverData) } for i := 0; i < int(count.failure); i++ { wg.Add(1) - go func(i int, l internal.Locality) { + go func(l internal.Locality) { ls.CallStarted(l) ls.CallFinished(l, errTest) wg.Done() - }(i, l) + }(l) } for i := 0; i < int(count.start-count.success-count.failure); i++ { wg.Add(1) - go func(i int, l internal.Locality) { + go func(l internal.Locality) { ls.CallStarted(l) wg.Done() - }(i, l) + }(l) } } wg.Wait() @@ -301,7 +353,7 @@ type lrsServer struct { mu sync.Mutex dropTotal uint64 drops map[string]uint64 - rpcs map[internal.Locality]*rpcCountData + rpcs map[internal.Locality]*rpcCountDataForTest } func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_StreamLoadStatsServer) error { @@ -351,12 +403,18 @@ func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_Strea } counts, ok := lrss.rpcs[l] if !ok { - counts = newRPCCountDataWithInitData(0, 0, 0) + counts = newRPCCountDataForTest(0, 0, 0, nil) lrss.rpcs[l] = counts } - atomic.AddUint64(counts.succeeded, ss.TotalSuccessfulRequests) - atomic.StoreUint64(counts.inProgress, ss.TotalRequestsInProgress) - atomic.AddUint64(counts.errored, ss.TotalErrorRequests) + counts.succeeded += ss.TotalSuccessfulRequests + counts.inProgress = ss.TotalRequestsInProgress + counts.errored += ss.TotalErrorRequests + for _, ts := range ss.LoadMetricStats { + if counts.serverLoads == nil { + counts.serverLoads = make(map[string]float64) + } + counts.serverLoads[ts.MetricName] = ts.TotalMetricValue / float64(ts.NumRequestsFinishedWithMetric) + } } lrss.mu.Unlock() } @@ -371,7 +429,7 @@ func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr str lrss = &lrsServer{ reportingInterval: reportingInterval, drops: make(map[string]uint64), - rpcs: make(map[internal.Locality]*rpcCountData), + rpcs: make(map[internal.Locality]*rpcCountDataForTest), } lrsgrpc.RegisterLoadReportingServiceServer(svr, lrss) go svr.Serve(lis) @@ -413,23 +471,26 @@ func Test_lrsStore_ReportTo(t *testing.T) { } } - rpcs := map[internal.Locality]*rpcCountData{ - localities[0]: newRPCCountDataWithInitData(3, 1, 4), - localities[1]: newRPCCountDataWithInitData(1, 5, 9), + rpcs := map[internal.Locality]*rpcCountDataForTest{ + localities[0]: newRPCCountDataForTest(3, 1, 4, nil), + localities[1]: newRPCCountDataForTest(1, 5, 9, map[string]float64{"pi": 3.14, "e": 2.71}), } for l, count := range rpcs { - for i := 0; i < int(*count.succeeded); i++ { - go func(i int, l internal.Locality) { + for i := 0; i < int(count.succeeded); i++ { + go func(i int, l internal.Locality, count *rpcCountDataForTest) { ls.CallStarted(l) ls.CallFinished(l, nil) - }(i, l) + for n, d := range count.serverLoads { + ls.CallServerLoad(l, n, d) + } + }(i, l, count) } - for i := 0; i < int(*count.inProgress); i++ { + for i := 0; i < int(count.inProgress); i++ { go func(i int, l internal.Locality) { ls.CallStarted(l) }(i, l) } - for i := 0; i < int(*count.errored); i++ { + for i := 0; i < int(count.errored); i++ { go func(i int, l internal.Locality) { ls.CallStarted(l) ls.CallFinished(l, errTest)