diff --git a/grpclb.go b/grpclb.go index d14a5d40..e8a92d12 100644 --- a/grpclb.go +++ b/grpclb.go @@ -127,7 +127,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal } lb := &lbBalancer{ - cc: cc, + cc: newLBCacheClientConn(cc), target: target, opt: opt, fallbackTimeout: b.fallbackTimeout, @@ -145,7 +145,7 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal } type lbBalancer struct { - cc balancer.ClientConn + cc *lbCacheClientConn target string opt balancer.BuildOptions fallbackTimeout time.Duration @@ -339,4 +339,5 @@ func (lb *lbBalancer) Close() { if lb.ccRemoteLB != nil { lb.ccRemoteLB.Close() } + lb.cc.close() } diff --git a/grpclb_util.go b/grpclb_util.go index 93ab2db3..0552c303 100644 --- a/grpclb_util.go +++ b/grpclb_util.go @@ -19,7 +19,12 @@ package grpc import ( + "fmt" + "sync" + "time" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/resolver" ) @@ -88,3 +93,122 @@ func (r *lbManualResolver) NewAddress(addrs []resolver.Address) { func (r *lbManualResolver) NewServiceConfig(sc string) { r.ccr.NewServiceConfig(sc) } + +const subConnCacheTime = time.Second * 10 + +// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache. +// SubConns will be kept in cache for subConnCacheTime before being removed. +// +// Its new and remove methods are updated to do cache first. +type lbCacheClientConn struct { + cc balancer.ClientConn + timeout time.Duration + + mu sync.Mutex + // subConnCache only keeps subConns that are being deleted. + subConnCache map[resolver.Address]*subConnCacheEntry + subConnToAddr map[balancer.SubConn]resolver.Address +} + +type subConnCacheEntry struct { + sc balancer.SubConn + + cancel func() + abortDeleting bool +} + +func newLBCacheClientConn(cc balancer.ClientConn) *lbCacheClientConn { + return &lbCacheClientConn{ + cc: cc, + timeout: subConnCacheTime, + subConnCache: make(map[resolver.Address]*subConnCacheEntry), + subConnToAddr: make(map[balancer.SubConn]resolver.Address), + } +} + +func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + if len(addrs) != 1 { + return nil, fmt.Errorf("grpclb calling NewSubConn with addrs of length %v", len(addrs)) + } + addr := addrs[0] + + ccc.mu.Lock() + defer ccc.mu.Unlock() + if entry, ok := ccc.subConnCache[addr]; ok { + // If entry is in subConnCache, the SubConn was being deleted. + // cancel function will never be nil. + entry.cancel() + delete(ccc.subConnCache, addr) + return entry.sc, nil + } + + scNew, err := ccc.cc.NewSubConn(addrs, opts) + if err != nil { + return nil, err + } + + ccc.subConnToAddr[scNew] = addr + return scNew, nil +} + +func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) { + ccc.mu.Lock() + defer ccc.mu.Unlock() + addr, ok := ccc.subConnToAddr[sc] + if !ok { + return + } + + if entry, ok := ccc.subConnCache[addr]; ok { + if entry.sc != sc { + // This could happen if NewSubConn was called multiple times for the + // same address, and those SubConns are all removed. We remove sc + // immediately here. + delete(ccc.subConnToAddr, sc) + ccc.cc.RemoveSubConn(sc) + } + return + } + + entry := &subConnCacheEntry{ + sc: sc, + } + ccc.subConnCache[addr] = entry + + timer := time.AfterFunc(ccc.timeout, func() { + ccc.mu.Lock() + if entry.abortDeleting { + return + } + ccc.cc.RemoveSubConn(sc) + delete(ccc.subConnToAddr, sc) + delete(ccc.subConnCache, addr) + ccc.mu.Unlock() + }) + entry.cancel = func() { + if !timer.Stop() { + // If stop was not successful, the timer has fired (this can only + // happen in a race). But the deleting function is blocked on ccc.mu + // because the mutex was held by the caller of this function. + // + // Set abortDeleting to true to abort the deleting function. When + // the lock is released, the deleting function will acquire the + // lock, check the value of abortDeleting and return. + entry.abortDeleting = true + } + } + return +} + +func (ccc *lbCacheClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) { + ccc.cc.UpdateBalancerState(s, p) +} + +func (ccc *lbCacheClientConn) close() { + ccc.mu.Lock() + // Only cancel all existing timers. There's no need to remove SubConns. + for _, entry := range ccc.subConnCache { + entry.cancel() + } + ccc.mu.Unlock() +} diff --git a/grpclb_util_test.go b/grpclb_util_test.go new file mode 100644 index 00000000..2773fae7 --- /dev/null +++ b/grpclb_util_test.go @@ -0,0 +1,219 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "fmt" + "sync" + "testing" + "time" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" +) + +type mockSubConn struct { + balancer.SubConn +} + +type mockClientConn struct { + balancer.ClientConn + + mu sync.Mutex + subConns map[balancer.SubConn]resolver.Address +} + +func newMockClientConn() *mockClientConn { + return &mockClientConn{ + subConns: make(map[balancer.SubConn]resolver.Address), + } +} + +func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + sc := &mockSubConn{} + mcc.mu.Lock() + defer mcc.mu.Unlock() + mcc.subConns[sc] = addrs[0] + return sc, nil +} + +func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) { + mcc.mu.Lock() + defer mcc.mu.Unlock() + delete(mcc.subConns, sc) +} + +const testCacheTimeout = 100 * time.Millisecond + +func checkMockCC(mcc *mockClientConn, scLen int) error { + mcc.mu.Lock() + defer mcc.mu.Unlock() + if len(mcc.subConns) != scLen { + return fmt.Errorf("mcc = %+v, want len(mcc.subConns) = %v", mcc.subConns, scLen) + } + return nil +} + +func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error { + ccc.mu.Lock() + defer ccc.mu.Unlock() + if len(ccc.subConnCache) != sccLen { + return fmt.Errorf("ccc = %+v, want len(ccc.subConnCache) = %v", ccc.subConnCache, sccLen) + } + if len(ccc.subConnToAddr) != sctaLen { + return fmt.Errorf("ccc = %+v, want len(ccc.subConnToAddr) = %v", ccc.subConnToAddr, sctaLen) + } + return nil +} + +// Test that SubConn won't be immediately removed. +func TestLBCacheClientConnExpire(t *testing.T) { + mcc := newMockClientConn() + if err := checkMockCC(mcc, 0); err != nil { + t.Fatal(err) + } + + ccc := newLBCacheClientConn(mcc) + ccc.timeout = testCacheTimeout + if err := checkCacheCC(ccc, 0, 0); err != nil { + t.Fatal(err) + } + + sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + // One subconn in MockCC. + if err := checkMockCC(mcc, 1); err != nil { + t.Fatal(err) + } + // No subconn being deleted, and one in CacheCC. + if err := checkCacheCC(ccc, 0, 1); err != nil { + t.Fatal(err) + } + + ccc.RemoveSubConn(sc) + // One subconn in MockCC before timeout. + if err := checkMockCC(mcc, 1); err != nil { + t.Fatal(err) + } + // One subconn being deleted, and one in CacheCC. + if err := checkCacheCC(ccc, 1, 1); err != nil { + t.Fatal(err) + } + + // Should all become empty after timeout. + var err error + for i := 0; i < 2; i++ { + time.Sleep(testCacheTimeout) + err = checkMockCC(mcc, 0) + if err != nil { + continue + } + err = checkCacheCC(ccc, 0, 0) + if err != nil { + continue + } + } + if err != nil { + t.Fatal(err) + } +} + +// Test that NewSubConn with the same address of a SubConn being removed will +// reuse the SubConn and cancel the removing. +func TestLBCacheClientConnReuse(t *testing.T) { + mcc := newMockClientConn() + if err := checkMockCC(mcc, 0); err != nil { + t.Fatal(err) + } + + ccc := newLBCacheClientConn(mcc) + ccc.timeout = testCacheTimeout + if err := checkCacheCC(ccc, 0, 0); err != nil { + t.Fatal(err) + } + + sc, _ := ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + // One subconn in MockCC. + if err := checkMockCC(mcc, 1); err != nil { + t.Fatal(err) + } + // No subconn being deleted, and one in CacheCC. + if err := checkCacheCC(ccc, 0, 1); err != nil { + t.Fatal(err) + } + + ccc.RemoveSubConn(sc) + // One subconn in MockCC before timeout. + if err := checkMockCC(mcc, 1); err != nil { + t.Fatal(err) + } + // One subconn being deleted, and one in CacheCC. + if err := checkCacheCC(ccc, 1, 1); err != nil { + t.Fatal(err) + } + + // Recreate the old subconn, this should cancel the deleting process. + sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + // One subconn in MockCC. + if err := checkMockCC(mcc, 1); err != nil { + t.Fatal(err) + } + // No subconn being deleted, and one in CacheCC. + if err := checkCacheCC(ccc, 0, 1); err != nil { + t.Fatal(err) + } + + var err error + // Should not become empty after 2*timeout. + time.Sleep(2 * testCacheTimeout) + err = checkMockCC(mcc, 1) + if err != nil { + t.Fatal(err) + } + err = checkCacheCC(ccc, 0, 1) + if err != nil { + t.Fatal(err) + } + + // Call remove again, will delete after timeout. + ccc.RemoveSubConn(sc) + // One subconn in MockCC before timeout. + if err := checkMockCC(mcc, 1); err != nil { + t.Fatal(err) + } + // One subconn being deleted, and one in CacheCC. + if err := checkCacheCC(ccc, 1, 1); err != nil { + t.Fatal(err) + } + + // Should all become empty after timeout. + for i := 0; i < 2; i++ { + time.Sleep(testCacheTimeout) + err = checkMockCC(mcc, 0) + if err != nil { + continue + } + err = checkCacheCC(ccc, 0, 0) + if err != nil { + continue + } + } + if err != nil { + t.Fatal(err) + } +}