From 844b2a520bcdca4783ec10c76d8bf02c13d9c902 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Tue, 10 Apr 2018 14:23:07 -0700 Subject: [PATCH] grpclb: fix issues caused by caching SubConns (#1977) --- grpclb/grpclb_test.go | 14 +++++++++++--- grpclb_remote_balancer.go | 25 +++++++++++++++---------- grpclb_util.go | 9 +++++---- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/grpclb/grpclb_test.go b/grpclb/grpclb_test.go index d83ea6b8..92959df3 100644 --- a/grpclb/grpclb_test.go +++ b/grpclb/grpclb_test.go @@ -473,9 +473,17 @@ func TestDropRequest(t *testing.T) { ServerName: lbServerName, }}) - // The 1st, non-fail-fast RPC should succeed. This ensures both server - // connections are made, because the first one has DropForLoadBalancing set to true. - if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + // Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server + // connections are made, because the first one has DropForLoadBalancing set + // to true. + var i int + for i = 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err == nil { + break + } + time.Sleep(time.Millisecond) + } + if i >= 1000 { t.Fatalf("%v.SayHello(_, _) = _, %v, want _, ", testC, err) } for _, failfast := range []bool{true, false} { diff --git a/grpclb_remote_balancer.go b/grpclb_remote_balancer.go index f87e9733..b8dd4f18 100644 --- a/grpclb_remote_balancer.go +++ b/grpclb_remote_balancer.go @@ -76,15 +76,16 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { } // Call refreshSubConns to create/remove SubConns. - backendsUpdated := lb.refreshSubConns(backendAddrs) - // If no backend was updated, no SubConn will be newed/removed. But since - // the full serverList was different, there might be updates in drops or - // pick weights(different number of duplicates). We need to update picker - // with the fulllist. - if !backendsUpdated { - lb.regeneratePicker() - lb.cc.UpdateBalancerState(lb.state, lb.picker) - } + lb.refreshSubConns(backendAddrs) + // Regenerate and update picker no matter if there's update on backends (if + // any SubConn will be newed/removed). Because since the full serverList was + // different, there might be updates in drops or pick weights(different + // number of duplicates). We need to update picker with the fulllist. + // + // Now with cache, even if SubConn was newed/removed, there might be no + // state changes. + lb.regeneratePicker() + lb.cc.UpdateBalancerState(lb.state, lb.picker) } // refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool @@ -114,7 +115,11 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool { continue } lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map. - lb.scStates[sc] = connectivity.Idle + if _, ok := lb.scStates[sc]; !ok { + // Only set state of new sc to IDLE. The state could already be + // READY for cached SubConns. + lb.scStates[sc] = connectivity.Idle + } sc.Connect() } } diff --git a/grpclb_util.go b/grpclb_util.go index 0552c303..46ba2747 100644 --- a/grpclb_util.go +++ b/grpclb_util.go @@ -130,15 +130,16 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer if len(addrs) != 1 { return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) } - addr := addrs[0] + addrWithoutMD := addrs[0] + addrWithoutMD.Metadata = nil ccc.mu.Lock() defer ccc.mu.Unlock() - if entry, ok := ccc.subConnCache[addr]; ok { + if entry, ok := ccc.subConnCache[addrWithoutMD]; ok { // If entry is in subConnCache, the SubConn was being deleted. // cancel function will never be nil. entry.cancel() - delete(ccc.subConnCache, addr) + delete(ccc.subConnCache, addrWithoutMD) return entry.sc, nil } @@ -147,7 +148,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer return nil, err } - ccc.subConnToAddr[scNew] = addr + ccc.subConnToAddr[scNew] = addrWithoutMD return scNew, nil }