diff --git a/balancer_switching_test.go b/balancer_switching_test.go index 92c196d3..1fd2f564 100644 --- a/balancer_switching_test.go +++ b/balancer_switching_test.go @@ -38,7 +38,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error { err error ) connected := false - for i := 0; i < 1000; i++ { + for i := 0; i < 5000; i++ { if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == servers[0].port { if connected { // connected is set to false if peer is not server[0]. So if @@ -53,7 +53,7 @@ func checkPickFirst(cc *ClientConn, servers []*server) error { time.Sleep(time.Millisecond) } if !connected { - return fmt.Errorf("pickfirst is not in effect after 1 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port) + return fmt.Errorf("pickfirst is not in effect after 5 second, EmptyCall() = _, %v, want _, %v", err, servers[0].port) } // The following RPCs should all succeed with the first server. for i := 0; i < 3; i++ { @@ -78,7 +78,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { // picked by the closing pickfirst balancer, and the test becomes flaky. for _, s := range servers { var up bool - for i := 0; i < 1000; i++ { + for i := 0; i < 5000; i++ { if err = Invoke(context.Background(), "/foo/bar", &req, &reply, cc); ErrorDesc(err) == s.port { up = true break @@ -86,7 +86,7 @@ func checkRoundRobin(cc *ClientConn, servers []*server) error { time.Sleep(time.Millisecond) } if !up { - return fmt.Errorf("server %v is not up within 1 second", s.port) + return fmt.Errorf("server %v is not up within 5 second", s.port) } } } @@ -131,3 +131,284 @@ func TestSwitchBalancer(t *testing.T) { t.Fatalf("check pickfirst returned non-nil error: %v", err) } } + +// First addr update contains grpclb. +func TestSwitchBalancerGRPCLBFirst(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + + // ClientConn will switch balancer to grpclb when receives an address of + // type GRPCLB. + r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}) + var isGRPCLB bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isGRPCLB = cc.curBalancerName == "grpclb" + cc.mu.Unlock() + if isGRPCLB { + break + } + time.Sleep(time.Millisecond) + } + if !isGRPCLB { + t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) + } + + // New update containing new backend and new grpclb. Should not switch + // balancer. + r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}) + for i := 0; i < 200; i++ { + cc.mu.Lock() + isGRPCLB = cc.curBalancerName == "grpclb" + cc.mu.Unlock() + if !isGRPCLB { + break + } + time.Sleep(time.Millisecond) + } + if !isGRPCLB { + t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb") + } + + var isPickFirst bool + // Switch balancer to pickfirst. + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isPickFirst = cc.curBalancerName == pickfirstName + cc.mu.Unlock() + if isPickFirst { + break + } + time.Sleep(time.Millisecond) + } + if !isPickFirst { + t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) + } +} + +// First addr update does not contain grpclb. +func TestSwitchBalancerGRPCLBSecond(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + var isPickFirst bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isPickFirst = cc.curBalancerName == pickfirstName + cc.mu.Unlock() + if isPickFirst { + break + } + time.Sleep(time.Millisecond) + } + if !isPickFirst { + t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) + } + + // ClientConn will switch balancer to grpclb when receives an address of + // type GRPCLB. + r.NewAddress([]resolver.Address{{Addr: "backend"}, {Addr: "grpclb", Type: resolver.GRPCLB}}) + var isGRPCLB bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isGRPCLB = cc.curBalancerName == "grpclb" + cc.mu.Unlock() + if isGRPCLB { + break + } + time.Sleep(time.Millisecond) + } + if !isGRPCLB { + t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) + } + + // New update containing new backend and new grpclb. Should not switch + // balancer. + r.NewAddress([]resolver.Address{{Addr: "backend2"}, {Addr: "grpclb2", Type: resolver.GRPCLB}}) + for i := 0; i < 200; i++ { + cc.mu.Lock() + isGRPCLB = cc.curBalancerName == "grpclb" + cc.mu.Unlock() + if !isGRPCLB { + break + } + time.Sleep(time.Millisecond) + } + if !isGRPCLB { + t.Fatalf("within 200 ms, cc.balancer switched to !grpclb, want grpclb") + } + + // Switch balancer back. + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isPickFirst = cc.curBalancerName == pickfirstName + cc.mu.Unlock() + if isPickFirst { + break + } + time.Sleep(time.Millisecond) + } + if !isPickFirst { + t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) + } +} + +// Test that if the current balancer is roundrobin, after switching to grpclb, +// when the resolved address doesn't contain grpclb addresses, balancer will be +// switched back to roundrobin. +func TestSwitchBalancerGRPCLBRoundRobin(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + var isRoundRobin bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isRoundRobin = cc.curBalancerName == "round_robin" + cc.mu.Unlock() + if isRoundRobin { + break + } + time.Sleep(time.Millisecond) + } + if !isRoundRobin { + t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) + } + + // ClientConn will switch balancer to grpclb when receives an address of + // type GRPCLB. + r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}) + var isGRPCLB bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isGRPCLB = cc.curBalancerName == "grpclb" + cc.mu.Unlock() + if isGRPCLB { + break + } + time.Sleep(time.Millisecond) + } + if !isGRPCLB { + t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) + } + + // Switch balancer back. + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isRoundRobin = cc.curBalancerName == "round_robin" + cc.mu.Unlock() + if isRoundRobin { + break + } + time.Sleep(time.Millisecond) + } + if !isRoundRobin { + t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) + } +} + +// Test that if resolved address list contains grpclb, the balancer option in +// service config won't take effect. But when there's no grpclb address in a new +// resolved address list, balancer will be switched to the new one. +func TestSwitchBalancerGRPCLBServiceConfig(t *testing.T) { + defer leakcheck.Check(t) + r, rcleanup := manual.GenerateAndRegisterManualResolver() + defer rcleanup() + + cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + var isPickFirst bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isPickFirst = cc.curBalancerName == pickfirstName + cc.mu.Unlock() + if isPickFirst { + break + } + time.Sleep(time.Millisecond) + } + if !isPickFirst { + t.Fatalf("after 5 second, cc.balancer is of type %v, not pick_first", cc.curBalancerName) + } + + // ClientConn will switch balancer to grpclb when receives an address of + // type GRPCLB. + r.NewAddress([]resolver.Address{{Addr: "grpclb", Type: resolver.GRPCLB}}) + var isGRPCLB bool + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isGRPCLB = cc.curBalancerName == "grpclb" + cc.mu.Unlock() + if isGRPCLB { + break + } + time.Sleep(time.Millisecond) + } + if !isGRPCLB { + t.Fatalf("after 5 second, cc.balancer is of type %v, not grpclb", cc.curBalancerName) + } + + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + var isRoundRobin bool + for i := 0; i < 200; i++ { + cc.mu.Lock() + isRoundRobin = cc.curBalancerName == "round_robin" + cc.mu.Unlock() + if isRoundRobin { + break + } + time.Sleep(time.Millisecond) + } + // Balancer shoult NOT switch to round_robin because resolved list contains + // grpclb. + if isRoundRobin { + t.Fatalf("within 200 ms, cc.balancer switched to round_robin, want grpclb") + } + + // Switch balancer back. + r.NewAddress([]resolver.Address{{Addr: "backend"}}) + for i := 0; i < 5000; i++ { + cc.mu.Lock() + isRoundRobin = cc.curBalancerName == "round_robin" + cc.mu.Unlock() + if isRoundRobin { + break + } + time.Sleep(time.Millisecond) + } + if !isRoundRobin { + t.Fatalf("after 5 second, cc.balancer is of type %v, not round_robin", cc.curBalancerName) + } +} diff --git a/clientconn.go b/clientconn.go index c072f69f..a047d194 100644 --- a/clientconn.go +++ b/clientconn.go @@ -610,6 +610,7 @@ type ClientConn struct { // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters curBalancerName string + preBalancerName string // previous balancer name. curAddresses []resolver.Address balancerWrapper *ccBalancerWrapper } @@ -667,42 +668,65 @@ func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { return } - // TODO(bar switching) when grpclb is submitted, check address type and start grpclb. - if cc.balancerWrapper == nil { - // First time handling resolved addresses. Build a balancer use either - // the builder specified by dial option, or pickfirst. - builder := cc.dopts.balancerBuilder - if builder == nil { - // No customBalancer was specified by DialOption, and this is the first - // time handling resolved addresses, create a pickfirst balancer. - builder = newPickfirstBuilder() - } - cc.curBalancerName = builder.Name() - cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) - } - cc.curAddresses = addrs + + if cc.dopts.balancerBuilder != nil && cc.balancerWrapper == nil { + cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) + } else { + var isGRPCLB bool + for _, a := range addrs { + if a.Type == resolver.GRPCLB { + isGRPCLB = true + break + } + } + var newBalancerName string + if isGRPCLB { + newBalancerName = grpclbName + } else { + // Address list doesn't contain grpclb address. Try to pick a + // non-grpclb balancer. + newBalancerName = cc.curBalancerName + // If current balancer is grpclb, switch to the previous one. + if newBalancerName == grpclbName { + newBalancerName = cc.preBalancerName + } + // The following could be true in two cases: + // - the first time handling resolved addresses + // (curBalancerName="") + // - the first time handling non-grpclb addresses + // (curBalancerName="grpclb", preBalancerName="") + if newBalancerName == "" { + newBalancerName = pickfirstName + } + } + cc.switchBalancer(newBalancerName) + } cc.balancerWrapper.handleResolvedAddrs(addrs, nil) } -// switchBalancer starts the switching from current balancer to the balancer with name. +// switchBalancer starts the switching from current balancer to the balancer +// with the given name. +// +// It will NOT send the current address list to the new balancer. If needed, +// caller of this function should send address list to the new balancer after +// this function returns. // // Caller must hold cc.mu. func (cc *ClientConn) switchBalancer(name string) { if cc.conns == nil { return } - grpclog.Infof("ClientConn switching balancer to %q", name) - - if cc.dopts.balancerBuilder != nil { - grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead") - return - } if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) { return } + grpclog.Infof("ClientConn switching balancer to %q", name) + if cc.dopts.balancerBuilder != nil { + grpclog.Infoln("ignoring balancer switching: WithBalancer DialOption used instead") + return + } // TODO(bar switching) change this to two steps: drain and close. // Keep track of sc in wrapper. if cc.balancerWrapper != nil { @@ -711,12 +735,12 @@ func (cc *ClientConn) switchBalancer(name string) { builder := balancer.Get(name) if builder == nil { - grpclog.Infof("failed to get balancer builder for: %v (this should never happen...)", name) + grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) builder = newPickfirstBuilder() } + cc.preBalancerName = cc.curBalancerName cc.curBalancerName = builder.Name() cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) - cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil) } func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { @@ -867,8 +891,18 @@ func (cc *ClientConn) handleServiceConfig(js string) error { cc.mu.Lock() cc.scRaw = js cc.sc = sc - if sc.LB != nil { - cc.switchBalancer(*sc.LB) + if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config. + if cc.curBalancerName == grpclbName { + // If current balancer is grpclb, there's at least one grpclb + // balancer address in the resolved list. Don't switch the balancer, + // but change the previous balancer name, so if a new resolved + // address list doesn't contain grpclb address, balancer will be + // switched to *sc.LB. + cc.preBalancerName = *sc.LB + } else { + cc.switchBalancer(*sc.LB) + cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil) + } } cc.mu.Unlock() return nil diff --git a/grpclb.go b/grpclb.go index 8489c690..0750fc1e 100644 --- a/grpclb.go +++ b/grpclb.go @@ -36,6 +36,7 @@ import ( const ( lbTokeyKey = "lb-token" defaultFallbackTimeout = 10 * time.Second + grpclbName = "grpclb" ) func convertDuration(d *lbpb.Duration) time.Duration { @@ -82,12 +83,15 @@ func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { return m, nil } -// NewLBBuilder creates a builder for grpclb. For testing only. -func NewLBBuilder() balancer.Builder { - // TODO(bar grpclb) this function is exported for testing only, remove it when resolver supports selecting grpclb. +// newLBBuilder creates a builder for grpclb. +func newLBBuilder() balancer.Builder { return NewLBBuilderWithFallbackTimeout(defaultFallbackTimeout) } +func init() { + balancer.Register(newLBBuilder()) +} + // NewLBBuilderWithFallbackTimeout creates a grpclb builder with the given // fallbackTimeout. If no response is received from the remote balancer within // fallbackTimeout, the backend addresses from the resolved address list will be @@ -105,7 +109,7 @@ type lbBuilder struct { } func (b *lbBuilder) Name() string { - return "grpclb" + return grpclbName } func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index 9bab731c..8c4b0132 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -344,7 +344,6 @@ func TestGRPCLB(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -396,7 +395,6 @@ func TestGRPCLBWeighted(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -462,7 +460,6 @@ func TestDropRequest(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -537,7 +534,6 @@ func TestBalancerDisconnects(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), grpc.WithTransportCredentials(&creds), grpc.WithDialer(fakeNameDialer)) if err != nil { t.Fatalf("Failed to dial to the backend %v", err) @@ -710,7 +706,6 @@ func runAndGetStats(t *testing.T, dropForLoadBalancing, dropForRateLimiting bool ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, - grpc.WithBalancerBuilder(grpc.NewLBBuilder()), grpc.WithTransportCredentials(&creds), grpc.WithPerRPCCredentials(failPreRPCCred{}), grpc.WithDialer(fakeNameDialer)) diff --git a/pickfirst.go b/pickfirst.go index e83ca2b0..69623c4b 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -26,6 +26,8 @@ import ( "google.golang.org/grpc/resolver" ) +const pickfirstName = "pick_first" + func newPickfirstBuilder() balancer.Builder { return &pickfirstBuilder{} } @@ -37,7 +39,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions } func (*pickfirstBuilder) Name() string { - return "pick_first" + return pickfirstName } type pickfirstBalancer struct {