internal: split StateRecordingBalancer in test to balancer and builder (#2578)

And instead of setting state notify channel in balancer, create a new notify
channel at Build.

fixes #2576
This commit is contained in:
Menghan Li
2019-01-18 10:21:46 -08:00
committed by GitHub
parent 9f93218b13
commit ec9c18c8c6
2 changed files with 40 additions and 32 deletions

View File

@ -36,10 +36,10 @@ import (
const stateRecordingBalancerName = "state_recoding_balancer" const stateRecordingBalancerName = "state_recoding_balancer"
var testBalancer = &stateRecordingBalancer{} var testBalancerBuilder = newStateRecordingBalancerBuilder()
func init() { func init() {
balancer.Register(testBalancer) balancer.Register(testBalancerBuilder)
} }
// These tests use a pipeListener. This listener is similar to net.Listener // These tests use a pipeListener. This listener is similar to net.Listener
@ -148,9 +148,6 @@ client enters TRANSIENT FAILURE.`,
} }
func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) { func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, server func(net.Listener) net.Conn) {
stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -173,6 +170,8 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
} }
defer client.Close() defer client.Close()
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(5 * time.Second) timeout := time.After(5 * time.Second)
for i := 0; i < len(want); i++ { for i := 0; i < len(want); i++ {
@ -205,9 +204,6 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
connectivity.Connecting, connectivity.Connecting,
} }
stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -247,6 +243,8 @@ func (s) TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
} }
defer client.Close() defer client.Close()
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(5 * time.Second) timeout := time.After(5 * time.Second)
for i := 0; i < len(want); i++ { for i := 0; i < len(want); i++ {
@ -272,9 +270,6 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
connectivity.Ready, connectivity.Ready,
} }
stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -334,6 +329,8 @@ func (s) TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T)
} }
defer client.Close() defer client.Close()
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(5 * time.Second) timeout := time.After(5 * time.Second)
for i := 0; i < len(want); i++ { for i := 0; i < len(want); i++ {
@ -369,9 +366,6 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
connectivity.Connecting, connectivity.Connecting,
} }
stateNotifications := make(chan connectivity.State, len(want))
testBalancer.ResetNotifier(stateNotifications)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() defer cancel()
@ -431,6 +425,8 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
} }
defer client.Close() defer client.Close()
stateNotifications := testBalancerBuilder.nextStateNotifier()
timeout := time.After(2 * time.Second) timeout := time.After(2 * time.Second)
for i := 0; i < len(want); i++ { for i := 0; i < len(want); i++ {
@ -454,42 +450,53 @@ func (s) TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
} }
type stateRecordingBalancer struct { type stateRecordingBalancer struct {
mu sync.Mutex
notifier chan<- connectivity.State notifier chan<- connectivity.State
balancer.Balancer balancer.Balancer
} }
func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
b.mu.Lock()
b.notifier <- s b.notifier <- s
b.mu.Unlock()
b.Balancer.HandleSubConnStateChange(sc, s) b.Balancer.HandleSubConnStateChange(sc, s)
} }
func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) { func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {
b.mu.Lock()
defer b.mu.Unlock()
b.notifier = r b.notifier = r
} }
func (b *stateRecordingBalancer) Close() { func (b *stateRecordingBalancer) Close() {
b.mu.Lock() b.Balancer.Close()
u := b.Balancer
b.mu.Unlock()
u.Close()
} }
func (b *stateRecordingBalancer) Name() string { type stateRecordingBalancerBuilder struct {
mu sync.Mutex
notifier chan connectivity.State // The notifier used in the last Balancer.
}
func newStateRecordingBalancerBuilder() *stateRecordingBalancerBuilder {
return &stateRecordingBalancerBuilder{}
}
func (b *stateRecordingBalancerBuilder) Name() string {
return stateRecordingBalancerName return stateRecordingBalancerName
} }
func (b *stateRecordingBalancer) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { func (b *stateRecordingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
stateNotifications := make(chan connectivity.State, 10)
b.mu.Lock() b.mu.Lock()
b.Balancer = balancer.Get(PickFirstBalancerName).Build(cc, opts) b.notifier = stateNotifications
b.mu.Unlock() b.mu.Unlock()
return b return &stateRecordingBalancer{
notifier: stateNotifications,
Balancer: balancer.Get(PickFirstBalancerName).Build(cc, opts),
}
}
func (b *stateRecordingBalancerBuilder) nextStateNotifier() <-chan connectivity.State {
b.mu.Lock()
defer b.mu.Unlock()
ret := b.notifier
b.notifier = nil
return ret
} }
type noBackoff struct{} type noBackoff struct{}

View File

@ -1003,9 +1003,6 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {
server2ContactedSecondTime := make(chan struct{}) server2ContactedSecondTime := make(chan struct{})
server3Contacted := make(chan struct{}) server3Contacted := make(chan struct{})
stateNotifications := make(chan connectivity.State, 100)
testBalancer.ResetNotifier(stateNotifications)
// Launch server 1. // Launch server 1.
go func() { go func() {
// First, let's allow the initial connection to go READY. We need to do // First, let's allow the initial connection to go READY. We need to do
@ -1024,6 +1021,10 @@ func (s) TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {
return return
} }
// nextStateNotifier() is updated after balancerBuilder.Build(), which is
// called by grpc.Dial. It's safe to do it here because lis1.Accept blocks
// until balancer is built to process the addresses.
stateNotifications := testBalancerBuilder.nextStateNotifier()
// Wait for the transport to become ready. // Wait for the transport to become ready.
for s := range stateNotifications { for s := range stateNotifications {
if s == connectivity.Ready { if s == connectivity.Ready {