grpclb: handle service config and switch to pickfirst (#2719)

This commit is contained in:
Menghan Li
2019-04-11 11:25:27 -07:00
committed by GitHub
parent 289eb50d88
commit 3f98697f44
5 changed files with 363 additions and 60 deletions

View File

@ -129,19 +129,8 @@ func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Bui
}
}
// newLBBuilderWithPickFirst creates a grpclb builder with pick-first.
func newLBBuilderWithPickFirst() balancer.Builder {
return &lbBuilder{
usePickFirst: true,
}
}
type lbBuilder struct {
fallbackTimeout time.Duration
// TODO: delete this when balancer can handle service config. This should be
// updated by service config.
usePickFirst bool // Use roundrobin or pickfirst for backends.
}
func (b *lbBuilder) Name() string {
@ -167,7 +156,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
cc: newLBCacheClientConn(cc),
target: target,
opt: opt,
usePickFirst: b.usePickFirst,
fallbackTimeout: b.fallbackTimeout,
doneCh: make(chan struct{}),
@ -231,11 +219,14 @@ type lbBalancer struct {
// serverList contains anything new. Each generate picker will also have
// reference to this list to do the first layer pick.
fullServerList []*lbpb.Server
// Backend addresses. It's kept so the addresses are available when
// switching between round_robin and pickfirst.
backendAddrs []resolver.Address
// All backends addresses, with metadata set to nil. This list contains all
// backend addresses in the same order and with the same duplicates as in
// serverlist. When generating picker, a SubConn slice with the same order
// but with only READY SCs will be gerenated.
backendAddrs []resolver.Address
backendAddrsWithoutMetadata []resolver.Address
// Roundrobin functionalities.
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
@ -275,7 +266,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
break
}
} else {
for _, a := range lb.backendAddrs {
for _, a := range lb.backendAddrsWithoutMetadata {
if sc, ok := lb.subConns[a]; ok {
if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
readySCs = append(readySCs, sc)
@ -339,6 +330,11 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
}
func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not used")
}
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
}
@ -371,7 +367,7 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
if lb.state != connectivity.Ready {
if !lb.inFallback && !lb.remoteBalancerConnected {
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
}
}
@ -410,7 +406,7 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
return
}
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
lb.mu.Unlock()
}
@ -418,9 +414,30 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
// clientConn. The remoteLB clientConn will handle creating/removing remoteLB
// connections.
func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs)
panic("not used")
}
func (lb *lbBalancer) handleServiceConfig(sc string) {
lb.mu.Lock()
defer lb.mu.Unlock()
newUsePickFirst := childIsPickFirst(sc)
if lb.usePickFirst == newUsePickFirst {
return
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
}
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}
func (lb *lbBalancer) UpdateResolverState(rs resolver.State) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateResolverState: %+v", rs)
}
lb.handleServiceConfig(rs.ServiceConfig)
addrs := rs.Addresses
if len(addrs) <= 0 {
return
}
@ -457,7 +474,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
}

View File

@ -0,0 +1,87 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"encoding/json"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
)
type serviceConfig struct {
LoadBalancingConfig *[]map[string]*grpclbServiceConfig
}
type grpclbServiceConfig struct {
ChildPolicy *[]map[string]json.RawMessage
}
func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}
func parseServiceConfig(s string) *grpclbServiceConfig {
parsedSC := parseFullServiceConfig(s)
if parsedSC == nil {
return nil
}
lbConfigs := parsedSC.LoadBalancingConfig
if lbConfigs == nil {
return nil
}
for _, lbC := range *lbConfigs {
if v, ok := lbC[grpclbName]; ok {
return v
}
}
return nil
}
const (
roundRobinName = roundrobin.Name
pickFirstName = grpc.PickFirstBalancerName
)
func childIsPickFirst(s string) bool {
parsedSC := parseServiceConfig(s)
if parsedSC == nil {
return false
}
childConfigs := parsedSC.ChildPolicy
if childConfigs == nil {
return false
}
for _, childC := range *childConfigs {
// If round_robin exists before pick_first, return false
if _, ok := childC[roundRobinName]; ok {
return false
}
// If pick_first is before round_robin, return true
if _, ok := childC[pickFirstName]; ok {
return true
}
}
return false
}

View File

@ -0,0 +1,154 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpclb
import (
"encoding/json"
"reflect"
"testing"
)
func Test_parseFullServiceConfig(t *testing.T) {
tests := []struct {
name string
s string
want *serviceConfig
}{
{
name: "empty",
s: "",
want: nil,
},
{
name: "success1",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{
{"grpclb": &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"pick_first": json.RawMessage("{}")},
},
}},
},
},
},
{
name: "success2",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{
{"grpclb": &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"round_robin": json.RawMessage("{}")},
{"pick_first": json.RawMessage("{}")},
},
}},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseFullServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want)
}
})
}
}
func Test_parseServiceConfig(t *testing.T) {
tests := []struct {
name string
s string
want *grpclbServiceConfig
}{
{
name: "empty",
s: "",
want: nil,
},
{
name: "success1",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"pick_first": json.RawMessage("{}")},
},
},
},
{
name: "success2",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"round_robin": json.RawMessage("{}")},
{"pick_first": json.RawMessage("{}")},
},
},
},
{
name: "no_grpclb",
s: `{"loadBalancingConfig":[{"notgrpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want)
}
})
}
}
func Test_childIsPickFirst(t *testing.T) {
tests := []struct {
name string
s string
want bool
}{
{
name: "invalid",
s: "",
want: false,
},
{
name: "pickfirst_only",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: true,
},
{
name: "pickfirst_before_rr",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}},{"round_robin":{}}]}}]}`,
want: true,
},
{
name: "rr_before_pickfirst",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := childIsPickFirst(tt.s); got != tt.want {
t.Errorf("childIsPickFirst() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -87,14 +87,14 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
// Call refreshSubConns to create/remove SubConns. If we are in fallback,
// this is also exiting fallback.
lb.refreshSubConns(backendAddrs, true)
lb.refreshSubConns(backendAddrs, false, lb.usePickFirst)
}
// refreshSubConns creates/removes SubConns with backendAddrs, and refreshes
// balancer state and picker.
//
// Caller must hold lb.mu.
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) {
func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback bool, pickFirst bool) {
defer func() {
// Regenerate and update picker after refreshing subconns because with
// cache, even if SubConn was newed/removed, there might be no state
@ -103,14 +103,28 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCL
lb.updateStateAndPicker(true, true)
}()
lb.inFallback = !fromGRPCLBServer
lb.inFallback = fallback
opts := balancer.NewSubConnOptions{}
if fromGRPCLBServer {
if !fallback {
opts.CredsBundle = lb.grpclbBackendCreds
}
lb.backendAddrs = nil
lb.backendAddrs = backendAddrs
lb.backendAddrsWithoutMetadata = nil
if lb.usePickFirst != pickFirst {
// Remove all SubConns when switching modes.
for a, sc := range lb.subConns {
if lb.usePickFirst {
lb.cc.cc.RemoveSubConn(sc)
} else {
lb.cc.RemoveSubConn(sc)
}
delete(lb.subConns, a)
}
lb.usePickFirst = pickFirst
}
if lb.usePickFirst {
var sc balancer.SubConn
@ -134,7 +148,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCL
return
}
// addrsSet is the set converted from backendAddrs, it's used to quick
// addrsSet is the set converted from backendAddrsWithoutMetadata, it's used to quick
// lookup for an address.
addrsSet := make(map[resolver.Address]struct{})
// Create new SubConns.
@ -142,7 +156,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCL
addrWithoutMD := addr
addrWithoutMD.Metadata = nil
addrsSet[addrWithoutMD] = struct{}{}
lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD)
lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutMD)
if _, ok := lb.subConns[addrWithoutMD]; !ok {
// Use addrWithMD to create the SubConn.
@ -282,7 +296,7 @@ func (lb *lbBalancer) watchRemoteBalancer() {
// aggregated state is not Ready.
if !lb.inFallback && lb.state != connectivity.Ready {
// Entering fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()

View File

@ -807,10 +807,8 @@ func TestFallback(t *testing.T) {
}
}
// The remote balancer sends response with duplicates to grpclb client.
func TestGRPCLBPickFirst(t *testing.T) {
balancer.Register(newLBBuilderWithPickFirst())
defer balancer.Register(newLBBuilder())
const grpclbServiceConfigWithPickFirst = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`
defer leakcheck.Check(t)
@ -854,57 +852,90 @@ func TestGRPCLBPickFirst(t *testing.T) {
defer cc.Close()
testC := testpb.NewTestServiceClient(cc)
r.UpdateState(resolver.State{Addresses: []resolver.Address{{
var (
p peer.Peer
result string
)
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
// Start with sub policy pick_first.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}}})
}},
ServiceConfig: grpclbServiceConfigWithPickFirst,
})
var p peer.Peer
portPicked1 := 0
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:2]}
result = ""
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if portPicked1 == 0 {
portPicked1 = p.Addr.(*net.TCPAddr).Port
continue
}
if portPicked1 != p.Addr.(*net.TCPAddr).Port {
t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked1, p.Addr.(*net.TCPAddr).Port)
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
}
if seq := "00000"; !strings.Contains(result, strings.Repeat(seq, 100)) {
t.Errorf("got result sequence %q, want patten %q", result, seq)
}
portPicked2 := portPicked1
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[:1]}
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[2:]}
result = ""
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if portPicked2 == portPicked1 {
portPicked2 = p.Addr.(*net.TCPAddr).Port
continue
}
if portPicked2 != p.Addr.(*net.TCPAddr).Port {
t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked2, p.Addr.(*net.TCPAddr).Port)
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
}
if seq := "22222"; !strings.Contains(result, strings.Repeat(seq, 100)) {
t.Errorf("got result sequence %q, want patten %q", result, seq)
}
portPicked := portPicked2
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:]}
result = ""
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
if portPicked == portPicked2 {
portPicked = p.Addr.(*net.TCPAddr).Port
continue
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
}
if portPicked != p.Addr.(*net.TCPAddr).Port {
t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked, p.Addr.(*net.TCPAddr).Port)
if seq := "22222"; !strings.Contains(result, strings.Repeat(seq, 100)) {
t.Errorf("got result sequence %q, want patten %q", result, seq)
}
// Switch sub policy to roundrobin.
r.UpdateState(resolver.State{
Addresses: []resolver.Address{{
Addr: tss.lbAddr,
Type: resolver.GRPCLB,
ServerName: lbServerName,
}},
ServiceConfig: `{}`,
})
result = ""
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, <nil>", err)
}
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
}
if seq := "121212"; !strings.Contains(result, strings.Repeat(seq, 100)) {
t.Errorf("got result sequence %q, want patten %q", result, seq)
}
tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]}
result = ""
for i := 0; i < 1000; i++ {
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil {
t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, <nil>", testC, err)
}
result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port])
}
if seq := "012012012"; !strings.Contains(result, strings.Repeat(seq, 2)) {
t.Errorf("got result sequence %q, want patten %q", result, seq)
}
}