xds: load report for server loads (from trailer) (#2870)

This commit is contained in:
Menghan Li
2019-06-26 12:49:37 -07:00
committed by GitHub
parent 5caf962939
commit 95da23bbf9
5 changed files with 272 additions and 69 deletions

View File

@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/internal/wrr"
"google.golang.org/grpc/balancer/xds/internal"
orcapb "google.golang.org/grpc/balancer/xds/internal/proto/udpa/data/orca/v1/orca_load_report"
"google.golang.org/grpc/balancer/xds/lrs"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
@ -329,6 +330,11 @@ func (pg *pickerGroup) Pick(ctx context.Context, opts balancer.PickOptions) (con
return p.Pick(ctx, opts)
}
const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"
)
type loadReportPicker struct {
balancer.Picker
@ -351,6 +357,13 @@ func (lrp *loadReportPicker) Pick(ctx context.Context, opts balancer.PickOptions
td := done
done = func(info balancer.DoneInfo) {
lrp.loadStore.CallFinished(lrp.id, info.Err)
if load, ok := info.ServerLoad.(*orcapb.OrcaLoadReport); ok {
lrp.loadStore.CallServerLoad(lrp.id, serverLoadCPUName, load.CpuUtilization)
lrp.loadStore.CallServerLoad(lrp.id, serverLoadMemoryName, load.MemUtilization)
for n, d := range load.RequestCostOrUtilization {
lrp.loadStore.CallServerLoad(lrp.id, n, d)
}
}
if td != nil {
td(info)
}

View File

@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/balancer/xds/internal"
orcapb "google.golang.org/grpc/balancer/xds/internal/proto/udpa/data/orca/v1/orca_load_report"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
)
@ -415,14 +416,25 @@ func TestBalancerGroup_LoadReport(t *testing.T) {
var (
wantStart []internal.Locality
wantEnd []internal.Locality
wantCost []testServerLoad
)
for i := 0; i < 10; i++ {
sc, done, _ := p1.Pick(context.Background(), balancer.PickOptions{})
locality := backendToBalancerID[sc]
wantStart = append(wantStart, locality)
if done != nil && sc != sc1 {
done(balancer.DoneInfo{})
wantEnd = append(wantEnd, backendToBalancerID[sc])
done(balancer.DoneInfo{
ServerLoad: &orcapb.OrcaLoadReport{
CpuUtilization: 10,
MemUtilization: 5,
RequestCostOrUtilization: map[string]float64{"pi": 3.14},
},
})
wantEnd = append(wantEnd, locality)
wantCost = append(wantCost,
testServerLoad{name: serverLoadCPUName, d: 10},
testServerLoad{name: serverLoadMemoryName, d: 5},
testServerLoad{name: "pi", d: 3.14})
}
}
@ -432,4 +444,7 @@ func TestBalancerGroup_LoadReport(t *testing.T) {
if !reflect.DeepEqual(testLoadStore.callsEnded, wantEnd) {
t.Fatalf("want ended: %v, got: %v", testLoadStore.callsEnded, wantEnd)
}
if !reflect.DeepEqual(testLoadStore.callsCost, wantCost) {
t.Fatalf("want cost: %v, got: %v", testLoadStore.callsCost, wantCost)
}
}

View File

@ -123,9 +123,15 @@ func (tcc *testClientConn) Target() string {
panic("not implemented")
}
type testServerLoad struct {
name string
d float64
}
type testLoadStore struct {
callsStarted []internal.Locality
callsEnded []internal.Locality
callsCost []testServerLoad
}
func newTestLoadStore() *testLoadStore {
@ -144,6 +150,10 @@ func (tls *testLoadStore) CallFinished(l internal.Locality, err error) {
tls.callsEnded = append(tls.callsEnded, l)
}
func (tls *testLoadStore) CallServerLoad(l internal.Locality, name string, d float64) {
tls.callsCost = append(tls.callsCost, testServerLoad{name: name, d: d})
}
func (*testLoadStore) ReportTo(ctx context.Context, cc *grpc.ClientConn) {
panic("not implemented")
}

View File

@ -43,6 +43,7 @@ type Store interface {
CallDropped(category string)
CallStarted(l internal.Locality)
CallFinished(l internal.Locality, err error)
CallServerLoad(l internal.Locality, name string, d float64)
ReportTo(ctx context.Context, cc *grpc.ClientConn)
}
@ -51,6 +52,13 @@ type rpcCountData struct {
succeeded *uint64
errored *uint64
inProgress *uint64
// Map from load name to load data (sum+count). Loading data from map is
// atomic, but updating data takes a lock, which could cause contention when
// multiple RPCs try to report loads for the same name.
//
// To fix the contention, shard this map.
serverLoads sync.Map // map[string]*rpcLoadData
}
func newRPCCountData() *rpcCountData {
@ -61,6 +69,75 @@ func newRPCCountData() *rpcCountData {
}
}
func (rcd *rpcCountData) incrSucceeded() {
atomic.AddUint64(rcd.succeeded, 1)
}
func (rcd *rpcCountData) loadAndClearSucceeded() uint64 {
return atomic.SwapUint64(rcd.succeeded, 0)
}
func (rcd *rpcCountData) incrErrored() {
atomic.AddUint64(rcd.errored, 1)
}
func (rcd *rpcCountData) loadAndClearErrored() uint64 {
return atomic.SwapUint64(rcd.errored, 0)
}
func (rcd *rpcCountData) incrInProgress() {
atomic.AddUint64(rcd.inProgress, 1)
}
func (rcd *rpcCountData) decrInProgress() {
atomic.AddUint64(rcd.inProgress, negativeOneUInt64) // atomic.Add(x, -1)
}
func (rcd *rpcCountData) loadInProgress() uint64 {
return atomic.LoadUint64(rcd.inProgress) // InProgress count is not clear when reading.
}
func (rcd *rpcCountData) addServerLoad(name string, d float64) {
loads, ok := rcd.serverLoads.Load(name)
if !ok {
tl := newRPCLoadData()
loads, _ = rcd.serverLoads.LoadOrStore(name, tl)
}
loads.(*rpcLoadData).add(d)
}
// Data for server loads (from trailers or oob). Fields in this struct must be
// updated consistently.
//
// The current solution is to hold a lock, which could cause contention. To fix,
// shard serverLoads map in rpcCountData.
type rpcLoadData struct {
mu sync.Mutex
sum float64
count uint64
}
func newRPCLoadData() *rpcLoadData {
return &rpcLoadData{}
}
func (rld *rpcLoadData) add(v float64) {
rld.mu.Lock()
rld.sum += v
rld.count++
rld.mu.Unlock()
}
func (rld *rpcLoadData) loadAndClear() (s float64, c uint64) {
rld.mu.Lock()
s = rld.sum
rld.sum = 0
c = rld.count
rld.count = 0
rld.mu.Unlock()
return
}
// lrsStore collects loads from xds balancer, and periodically sends load to the
// server.
type lrsStore struct {
@ -110,7 +187,7 @@ func (ls *lrsStore) CallStarted(l internal.Locality) {
tp := newRPCCountData()
p, _ = ls.localityRPCCount.LoadOrStore(l, tp)
}
atomic.AddUint64(p.(*rpcCountData).inProgress, 1)
p.(*rpcCountData).incrInProgress()
}
func (ls *lrsStore) CallFinished(l internal.Locality, err error) {
@ -120,14 +197,24 @@ func (ls *lrsStore) CallFinished(l internal.Locality, err error) {
// case where entry for call-finish is not found should never happen.
return
}
atomic.AddUint64(p.(*rpcCountData).inProgress, negativeOneUInt64) // atomic.Add(x, -1)
p.(*rpcCountData).decrInProgress()
if err == nil {
atomic.AddUint64(p.(*rpcCountData).succeeded, 1)
p.(*rpcCountData).incrSucceeded()
} else {
atomic.AddUint64(p.(*rpcCountData).errored, 1)
p.(*rpcCountData).incrErrored()
}
}
func (ls *lrsStore) CallServerLoad(l internal.Locality, name string, d float64) {
p, ok := ls.localityRPCCount.Load(l)
if !ok {
// The map is never cleared, only values in the map are reset. So the
// case where entry for CallServerLoad is not found should never happen.
return
}
p.(*rpcCountData).addServerLoad(name, d)
}
func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats {
var (
totalDropped uint64
@ -150,13 +237,30 @@ func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats
tempLocality := locality.(internal.Locality)
tempCount := countP.(*rpcCountData)
tempSucceeded := atomic.SwapUint64(tempCount.succeeded, 0)
tempInProgress := atomic.LoadUint64(tempCount.inProgress) // InProgress count is not clear when reading.
tempErrored := atomic.SwapUint64(tempCount.errored, 0)
tempSucceeded := tempCount.loadAndClearSucceeded()
tempInProgress := tempCount.loadInProgress()
tempErrored := tempCount.loadAndClearErrored()
if tempSucceeded == 0 && tempInProgress == 0 && tempErrored == 0 {
return true
}
var loadMetricStats []*loadreportpb.EndpointLoadMetricStats
tempCount.serverLoads.Range(func(name, data interface{}) bool {
tempName := name.(string)
tempSum, tempCount := data.(*rpcLoadData).loadAndClear()
if tempCount == 0 {
return true
}
loadMetricStats = append(loadMetricStats,
&loadreportpb.EndpointLoadMetricStats{
MetricName: tempName,
NumRequestsFinishedWithMetric: tempCount,
TotalMetricValue: tempSum,
},
)
return true
})
localityStats = append(localityStats, &loadreportpb.UpstreamLocalityStats{
Locality: &basepb.Locality{
Region: tempLocality.Region,
@ -166,7 +270,7 @@ func (ls *lrsStore) buildStats(clusterName string) []*loadreportpb.ClusterStats
TotalSuccessfulRequests: tempSucceeded,
TotalRequestsInProgress: tempInProgress,
TotalErrorRequests: tempErrored,
LoadMetricStats: nil, // TODO: populate for user loads.
LoadMetricStats: loadMetricStats,
UpstreamEndpointStats: nil, // TODO: populate for per endpoint loads.
})
return true

View File

@ -25,7 +25,6 @@ import (
"reflect"
"sort"
"sync"
"sync/atomic"
"testing"
"time"
@ -51,37 +50,47 @@ var (
errTest = fmt.Errorf("test error")
)
func newRPCCountDataWithInitData(succeeded, errored, inprogress uint64) *rpcCountData {
return &rpcCountData{
succeeded: &succeeded,
errored: &errored,
inProgress: &inprogress,
type rpcCountDataForTest struct {
succeeded uint64
errored uint64
inProgress uint64
serverLoads map[string]float64
}
func newRPCCountDataForTest(succeeded, errored, inprogress uint64, serverLoads map[string]float64) *rpcCountDataForTest {
return &rpcCountDataForTest{
succeeded: succeeded,
errored: errored,
inProgress: inprogress,
serverLoads: serverLoads,
}
}
func (rcd *rpcCountData) Equal(b *rpcCountData) bool {
return *rcd.inProgress == *b.inProgress && *rcd.errored == *b.errored && *rcd.succeeded == *b.succeeded
// Equal() is needed to compare unexported fields.
func (rcd *rpcCountDataForTest) Equal(b *rpcCountDataForTest) bool {
return rcd.inProgress == b.inProgress &&
rcd.errored == b.errored &&
rcd.succeeded == b.succeeded &&
reflect.DeepEqual(rcd.serverLoads, b.serverLoads)
}
// equalClusterStats sorts requests and clear report internal before comparing.
func equalClusterStats(a, b []*loadreportpb.ClusterStats) bool {
for _, s := range a {
sort.Slice(s.DroppedRequests, func(i, j int) bool {
return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category
})
sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool {
return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String()
})
s.LoadReportInterval = nil
}
for _, s := range b {
sort.Slice(s.DroppedRequests, func(i, j int) bool {
return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category
})
sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool {
return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String()
})
s.LoadReportInterval = nil
for _, t := range [][]*loadreportpb.ClusterStats{a, b} {
for _, s := range t {
sort.Slice(s.DroppedRequests, func(i, j int) bool {
return s.DroppedRequests[i].Category < s.DroppedRequests[j].Category
})
sort.Slice(s.UpstreamLocalityStats, func(i, j int) bool {
return s.UpstreamLocalityStats[i].Locality.String() < s.UpstreamLocalityStats[j].Locality.String()
})
for _, us := range s.UpstreamLocalityStats {
sort.Slice(us.LoadMetricStats, func(i, j int) bool {
return us.LoadMetricStats[i].MetricName < us.LoadMetricStats[j].MetricName
})
}
s.LoadReportInterval = nil
}
}
return reflect.DeepEqual(a, b)
}
@ -114,7 +123,7 @@ func Test_lrsStore_buildStats_drops(t *testing.T) {
dropCategories[0]: 31,
dropCategories[1]: 41,
}, {
dropCategories[0]: 0, // This is shouldn't cause an empty report for category[0].
dropCategories[0]: 0, // This shouldn't cause an empty report for category[0].
dropCategories[1]: 26,
}},
},
@ -172,48 +181,77 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) {
name string
rpcs []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64 // Will be reported with successful RPCs.
}
}{
{
name: "one rpcCount report",
rpcs: []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64
}{{
localities[0]: {8, 3, 1},
localities[0]: {8, 3, 1, nil},
}},
},
{
name: "two localities rpcCount reports",
name: "two localities one rpcCount report",
rpcs: []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64
}{{
localities[0]: {8, 3, 1},
localities[1]: {15, 1, 5},
localities[0]: {8, 3, 1, nil},
localities[1]: {15, 1, 5, nil},
}},
},
{
name: "two rpcCount reports",
name: "three rpcCount reports",
rpcs: []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64
}{{
localities[0]: {8, 3, 1},
localities[1]: {15, 1, 5},
localities[0]: {8, 3, 1, nil},
localities[1]: {15, 1, 5, nil},
}, {
localities[0]: {8, 3, 1},
localities[0]: {8, 3, 1, nil},
}, {
localities[1]: {15, 1, 5},
localities[1]: {15, 1, 5, nil},
}},
},
{
name: "no empty report",
rpcs: []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64
}{{
localities[0]: {4, 3, 1},
localities[1]: {7, 1, 5},
localities[0]: {4, 3, 1, nil},
localities[1]: {7, 1, 5, nil},
}, {
localities[0]: {0, 0, 0}, // This is shouldn't cause an empty report for locality[0].
localities[1]: {1, 1, 0},
localities[0]: {0, 0, 0, nil}, // This shouldn't cause an empty report for locality[0].
localities[1]: {1, 1, 0, nil},
}},
},
{
name: "two localities one report with server loads",
rpcs: []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64
}{{
localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}},
localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}},
}},
},
{
name: "three reports with server loads",
rpcs: []map[internal.Locality]struct {
start, success, failure uint64
serverData map[string]float64
}{{
localities[0]: {8, 3, 1, map[string]float64{"cpu": 15, "mem": 20}},
localities[1]: {15, 4, 5, map[string]float64{"net": 5, "disk": 0.8}},
}, {
localities[0]: {8, 3, 1, map[string]float64{"cpu": 1, "mem": 2}},
}, {
localities[1]: {15, 4, 5, map[string]float64{"net": 13, "disk": 1.4}},
}},
},
}
@ -234,11 +272,22 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) {
if count.success == 0 && tempInProgress == 0 && count.failure == 0 {
continue
}
var loadMetricStats []*loadreportpb.EndpointLoadMetricStats
for n, d := range count.serverData {
loadMetricStats = append(loadMetricStats,
&loadreportpb.EndpointLoadMetricStats{
MetricName: n,
NumRequestsFinishedWithMetric: count.success,
TotalMetricValue: d * float64(count.success),
},
)
}
upstreamLocalityStats = append(upstreamLocalityStats, &loadreportpb.UpstreamLocalityStats{
Locality: l.ToProto(),
TotalSuccessfulRequests: count.success,
TotalRequestsInProgress: tempInProgress,
TotalErrorRequests: count.failure,
LoadMetricStats: loadMetricStats,
})
}
// InProgress count doesn't get cleared at each buildStats, and
@ -262,26 +311,29 @@ func Test_lrsStore_buildStats_rpcCounts(t *testing.T) {
for l, count := range counts {
for i := 0; i < int(count.success); i++ {
wg.Add(1)
go func(i int, l internal.Locality) {
go func(l internal.Locality, serverData map[string]float64) {
ls.CallStarted(l)
ls.CallFinished(l, nil)
for n, d := range serverData {
ls.CallServerLoad(l, n, d)
}
wg.Done()
}(i, l)
}(l, count.serverData)
}
for i := 0; i < int(count.failure); i++ {
wg.Add(1)
go func(i int, l internal.Locality) {
go func(l internal.Locality) {
ls.CallStarted(l)
ls.CallFinished(l, errTest)
wg.Done()
}(i, l)
}(l)
}
for i := 0; i < int(count.start-count.success-count.failure); i++ {
wg.Add(1)
go func(i int, l internal.Locality) {
go func(l internal.Locality) {
ls.CallStarted(l)
wg.Done()
}(i, l)
}(l)
}
}
wg.Wait()
@ -301,7 +353,7 @@ type lrsServer struct {
mu sync.Mutex
dropTotal uint64
drops map[string]uint64
rpcs map[internal.Locality]*rpcCountData
rpcs map[internal.Locality]*rpcCountDataForTest
}
func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
@ -351,12 +403,18 @@ func (lrss *lrsServer) StreamLoadStats(stream lrsgrpc.LoadReportingService_Strea
}
counts, ok := lrss.rpcs[l]
if !ok {
counts = newRPCCountDataWithInitData(0, 0, 0)
counts = newRPCCountDataForTest(0, 0, 0, nil)
lrss.rpcs[l] = counts
}
atomic.AddUint64(counts.succeeded, ss.TotalSuccessfulRequests)
atomic.StoreUint64(counts.inProgress, ss.TotalRequestsInProgress)
atomic.AddUint64(counts.errored, ss.TotalErrorRequests)
counts.succeeded += ss.TotalSuccessfulRequests
counts.inProgress = ss.TotalRequestsInProgress
counts.errored += ss.TotalErrorRequests
for _, ts := range ss.LoadMetricStats {
if counts.serverLoads == nil {
counts.serverLoads = make(map[string]float64)
}
counts.serverLoads[ts.MetricName] = ts.TotalMetricValue / float64(ts.NumRequestsFinishedWithMetric)
}
}
lrss.mu.Unlock()
}
@ -371,7 +429,7 @@ func setupServer(t *testing.T, reportingInterval *durationpb.Duration) (addr str
lrss = &lrsServer{
reportingInterval: reportingInterval,
drops: make(map[string]uint64),
rpcs: make(map[internal.Locality]*rpcCountData),
rpcs: make(map[internal.Locality]*rpcCountDataForTest),
}
lrsgrpc.RegisterLoadReportingServiceServer(svr, lrss)
go svr.Serve(lis)
@ -413,23 +471,26 @@ func Test_lrsStore_ReportTo(t *testing.T) {
}
}
rpcs := map[internal.Locality]*rpcCountData{
localities[0]: newRPCCountDataWithInitData(3, 1, 4),
localities[1]: newRPCCountDataWithInitData(1, 5, 9),
rpcs := map[internal.Locality]*rpcCountDataForTest{
localities[0]: newRPCCountDataForTest(3, 1, 4, nil),
localities[1]: newRPCCountDataForTest(1, 5, 9, map[string]float64{"pi": 3.14, "e": 2.71}),
}
for l, count := range rpcs {
for i := 0; i < int(*count.succeeded); i++ {
go func(i int, l internal.Locality) {
for i := 0; i < int(count.succeeded); i++ {
go func(i int, l internal.Locality, count *rpcCountDataForTest) {
ls.CallStarted(l)
ls.CallFinished(l, nil)
}(i, l)
for n, d := range count.serverLoads {
ls.CallServerLoad(l, n, d)
}
}(i, l, count)
}
for i := 0; i < int(*count.inProgress); i++ {
for i := 0; i < int(count.inProgress); i++ {
go func(i int, l internal.Locality) {
ls.CallStarted(l)
}(i, l)
}
for i := 0; i < int(*count.errored); i++ {
for i := 0; i < int(count.errored); i++ {
go func(i int, l internal.Locality) {
ls.CallStarted(l)
ls.CallFinished(l, errTest)