grpclb: fix issues caused by caching SubConns (#1977)
This commit is contained in:
@ -473,9 +473,17 @@ func TestDropRequest(t *testing.T) {
|
||||
ServerName: lbServerName,
|
||||
}})
|
||||
|
||||
// The 1st, non-fail-fast RPC should succeed. This ensures both server
|
||||
// connections are made, because the first one has DropForLoadBalancing set to true.
|
||||
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
|
||||
// Wait for the 1st, non-fail-fast RPC to succeed. This ensures both server
|
||||
// connections are made, because the first one has DropForLoadBalancing set
|
||||
// to true.
|
||||
var i int
|
||||
for i = 0; i < 1000; i++ {
|
||||
if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err == nil {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
if i >= 1000 {
|
||||
t.Fatalf("%v.SayHello(_, _) = _, %v, want _, <nil>", testC, err)
|
||||
}
|
||||
for _, failfast := range []bool{true, false} {
|
||||
|
@ -76,15 +76,16 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
|
||||
}
|
||||
|
||||
// Call refreshSubConns to create/remove SubConns.
|
||||
backendsUpdated := lb.refreshSubConns(backendAddrs)
|
||||
// If no backend was updated, no SubConn will be newed/removed. But since
|
||||
// the full serverList was different, there might be updates in drops or
|
||||
// pick weights(different number of duplicates). We need to update picker
|
||||
// with the fulllist.
|
||||
if !backendsUpdated {
|
||||
lb.regeneratePicker()
|
||||
lb.cc.UpdateBalancerState(lb.state, lb.picker)
|
||||
}
|
||||
lb.refreshSubConns(backendAddrs)
|
||||
// Regenerate and update picker no matter if there's update on backends (if
|
||||
// any SubConn will be newed/removed). Because since the full serverList was
|
||||
// different, there might be updates in drops or pick weights(different
|
||||
// number of duplicates). We need to update picker with the fulllist.
|
||||
//
|
||||
// Now with cache, even if SubConn was newed/removed, there might be no
|
||||
// state changes.
|
||||
lb.regeneratePicker()
|
||||
lb.cc.UpdateBalancerState(lb.state, lb.picker)
|
||||
}
|
||||
|
||||
// refreshSubConns creates/removes SubConns with backendAddrs. It returns a bool
|
||||
@ -114,7 +115,11 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address) bool {
|
||||
continue
|
||||
}
|
||||
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
|
||||
lb.scStates[sc] = connectivity.Idle
|
||||
if _, ok := lb.scStates[sc]; !ok {
|
||||
// Only set state of new sc to IDLE. The state could already be
|
||||
// READY for cached SubConns.
|
||||
lb.scStates[sc] = connectivity.Idle
|
||||
}
|
||||
sc.Connect()
|
||||
}
|
||||
}
|
||||
|
@ -130,15 +130,16 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
|
||||
if len(addrs) != 1 {
|
||||
return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs))
|
||||
}
|
||||
addr := addrs[0]
|
||||
addrWithoutMD := addrs[0]
|
||||
addrWithoutMD.Metadata = nil
|
||||
|
||||
ccc.mu.Lock()
|
||||
defer ccc.mu.Unlock()
|
||||
if entry, ok := ccc.subConnCache[addr]; ok {
|
||||
if entry, ok := ccc.subConnCache[addrWithoutMD]; ok {
|
||||
// If entry is in subConnCache, the SubConn was being deleted.
|
||||
// cancel function will never be nil.
|
||||
entry.cancel()
|
||||
delete(ccc.subConnCache, addr)
|
||||
delete(ccc.subConnCache, addrWithoutMD)
|
||||
return entry.sc, nil
|
||||
}
|
||||
|
||||
@ -147,7 +148,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ccc.subConnToAddr[scNew] = addr
|
||||
ccc.subConnToAddr[scNew] = addrWithoutMD
|
||||
return scNew, nil
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user