From ec9c18c8c6dda21490f4840e9c9946d883fd3037 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 18 Jan 2019 10:21:46 -0800 Subject: [PATCH] internal: split StateRecordingBalancer in test to balancer and builder (#2578) And instead of setting state notify channel in balancer, create a new notify channel at Build. fixes #2576 --- clientconn_state_transition_test.go | 65 ++++++++++++++++------------- clientconn_test.go | 7 ++-- 2 files changed, 40 insertions(+), 32 deletions(-) diff --git a/clientconn_state_transition_test.go b/clientconn_state_transition_test.go index c878ee36..37fd3ad9 100644 --- a/clientconn_state_transition_test.go +++ b/clientconn_state_transition_test.go @@ -36,10 +36,10 @@ import ( const stateRecordingBalancerName = "state_recoding_balancer" -var testBalancer = &stateRecordingBalancer{} +var testBalancerBuilder = newStateRecordingBalancerBuilder() func init() { - balancer.Register(testBalancer) + balancer.Register(testBalancerBuilder) } // These tests use a pipeListener. This listener is similar to net.Listener @@ -148,9 +148,6 @@ client enters TRANSIENT FAILURE.`, } func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) { - stateNotifications := make(chan connectivity.State, len(want)) - testBalancer.ResetNotifier(stateNotifications) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -173,6 +170,8 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s } defer client.Close() + stateNotifications := testBalancerBuilder.nextStateNotifier() + timeout := time.After(5 * time.Second) for i := 0; i < len(want); i++ { @@ -205,9 +204,6 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) { connectivity.Connecting, } - stateNotifications := make(chan connectivity.State, len(want)) - testBalancer.ResetNotifier(stateNotifications) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -247,6 +243,8 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) { } defer client.Close() + stateNotifications := testBalancerBuilder.nextStateNotifier() + timeout := time.After(5 * time.Second) for i := 0; i < len(want); i++ { @@ -272,9 +270,6 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) connectivity.Ready, } - stateNotifications := make(chan connectivity.State, len(want)) - testBalancer.ResetNotifier(stateNotifications) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -334,6 +329,8 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) } defer client.Close() + stateNotifications := testBalancerBuilder.nextStateNotifier() + timeout := time.After(5 * time.Second) for i := 0; i < len(want); i++ { @@ -369,9 +366,6 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { connectivity.Connecting, } - stateNotifications := make(chan connectivity.State, len(want)) - testBalancer.ResetNotifier(stateNotifications) - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() @@ -431,6 +425,8 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { } defer client.Close() + stateNotifications := testBalancerBuilder.nextStateNotifier() + timeout := time.After(2 * time.Second) for i := 0; i < len(want); i++ { @@ -454,42 +450,53 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) { } type stateRecordingBalancer struct { - mu sync.Mutex notifier chan<- connectivity.State - balancer.Balancer } func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { - b.mu.Lock() b.notifier <- s - b.mu.Unlock() - b.Balancer.HandleSubConnStateChange(sc, s) } func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { - b.mu.Lock() - defer b.mu.Unlock() b.notifier = r } func (b *stateRecordingBalancer) Close() { - b.mu.Lock() - u := b.Balancer - b.mu.Unlock() - u.Close() + b.Balancer.Close() } -func (b *stateRecordingBalancer) Name() string { +type stateRecordingBalancerBuilder struct { + mu sync.Mutex + notifier chan connectivity.State // The notifier used in the last Balancer. +} + +func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder { + return &stateRecordingBalancerBuilder{} +} + +func (b *stateRecordingBalancerBuilder) Name() string { return stateRecordingBalancerName } -func (b *stateRecordingBalancer) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { +func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + stateNotifications := make(chan connectivity.State, 10) b.mu.Lock() - b.Balancer = balancer.Get(PickFirstBalancerName).Build(cc, opts) + b.notifier = stateNotifications b.mu.Unlock() - return b + return &stateRecordingBalancer{ + notifier: stateNotifications, + Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts), + } +} + +func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State { + b.mu.Lock() + defer b.mu.Unlock() + ret := b.notifier + b.notifier = nil + return ret } type noBackoff struct{} diff --git a/clientconn_test.go b/clientconn_test.go index e61ba73e..3a744276 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -1003,9 +1003,6 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) { server2ContactedSecondTime := make(chan struct{}) server3Contacted := make(chan struct{}) - stateNotifications := make(chan connectivity.State, 100) - testBalancer.ResetNotifier(stateNotifications) - // Launch server 1. go func() { // First, let's allow the initial connection to go READY. We need to do @@ -1024,6 +1021,10 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) { return } + // nextStateNotifier() is updated after balancerBuilder.Build(), which is + // called by grpc.Dial. It's safe to do it here because lis1.Accept blocks + // until balancer is built to process the addresses. + stateNotifications := testBalancerBuilder.nextStateNotifier() // Wait for the transport to become ready. for s := range stateNotifications { if s == connectivity.Ready {