diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index ff43fb73..e8c485e8 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -187,6 +187,83 @@ func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error { return nil } +// WaitForPickerWithErr waits until an error picker is pushed to this +// ClientConn with the error matching the wanted error. Also drains the +// matching entry from the state channel. Returns an error if the provided +// context expires, including the last received picker error (if any). +func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error { + lastErr := errors.New("received no picker") + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr) + case picker := <-tcc.NewPickerCh: + <-tcc.NewStateCh + for i := 0; i < 5; i++ { + if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr == nil || lastErr.Error() != want.Error() { + break + } + return nil + } + } + } +} + +// WaitForConnectivityState waits until the state pushed to this ClientConn +// matches the wanted state. Also drains the matching entry from the picker +// channel. Returns an error if the provided context expires, including the +// last received state (if any). +func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error { + var lastState connectivity.State = -1 + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState) + case s := <-tcc.NewStateCh: + <-tcc.NewPickerCh + if s == want { + return nil + } + lastState = s + } + } +} + +// WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also +// drains the matching state channel and requires it to be READY to be +// considered. Returns an error if the provided context expires, including the +// last received error from IsRoundRobin or the picker (if any). +func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error { + lastErr := errors.New("received no picker") + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr) + case s := <-tcc.NewStateCh: + p := <-tcc.NewPickerCh + if s != connectivity.Ready { + lastErr = fmt.Errorf("received state %v instead of ready", s) + break + } + var pickerErr error + if err := IsRoundRobin(want, func() balancer.SubConn { + sc, err := p.Pick(balancer.PickInfo{}) + if err != nil { + pickerErr = err + } + return sc.SubConn + }); pickerErr != nil { + lastErr = pickerErr + continue + } else if err != nil { + lastErr = err + continue + } + return nil + } + } +} + // IsRoundRobin checks whether f's return value is roundrobin of elements from // want. But it doesn't check for the order. Note that want can contain // duplicate items, which makes it weight-round-robin. diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index 98bbb5b8..5b96d710 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -37,6 +37,8 @@ import ( "google.golang.org/grpc/resolver" ) +const defaultTestTimeout = 5 * time.Second + type s struct { grpctest.Tester } @@ -83,6 +85,9 @@ func subConnFromPicker(t *testing.T, p balancer.Picker) func() balancer.SubConn // // Init 0 and 1; 0 is up, use 0; add 2, use 0; remove 2, use 0. func (s) TestPriority_HighPriorityReady(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -118,10 +123,8 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p1 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Add p2, it shouldn't cause any updates. @@ -190,12 +193,15 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { // Init 0 and 1; 0 is up, use 0; 0 is down, 1 is up, use 1; add 2, use 1; 1 is // down, use 2; remove 2, use 1. func (s) TestPriority_SwitchPriority(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) defer pb.Close() - // Two localities, with priorities [0, 1], each with one backend. + t.Log("Two localities, with priorities [0, 1], each with one backend.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: []resolver.Address{ @@ -220,30 +226,24 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { } sc0 := <-cc.NewSubConnCh - // p0 is ready. + t.Log("Make p0 ready.") pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p0 := <-cc.NewPickerCh - want := []balancer.SubConn{sc0} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p0)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { + t.Fatal(err.Error()) } - // Turn down 0, will start and use 1. + t.Log("Turn down 0, will start and use 1.") pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - p1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := p1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } - // Handle SubConn creation from 1. + t.Log("Handle SubConn creation from 1.") addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testBackendAddrStrs[1]; got != want { t.Fatalf("sc is created with addr %v, want %v", got, want) @@ -253,15 +253,11 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. - p2 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p2.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } - // Add p2, it shouldn't cause any udpates. + t.Log("Add p2, it shouldn't cause any udpates.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: []resolver.Address{ @@ -292,16 +288,13 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { case <-time.After(time.Millisecond * 100): } - // Turn down 1, use 2 + t.Log("Turn down 1, use 2.") pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 2 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - p3 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := p3.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs2 := <-cc.NewSubConnAddrsCh @@ -313,15 +306,11 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. - p4 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p4.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) } - // Remove 2, use 1. + t.Log("Remove 2, use 1.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: []resolver.Address{ @@ -348,22 +337,18 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { // Should get an update with 1's old transient failure picker, to override // 2's old picker. - p5 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := p5.Pick(balancer.PickInfo{}); err == nil { - t.Fatalf("want pick error non-nil, got nil") - } + if err := cc.WaitForErrPicker(ctx); err != nil { + t.Fatal(err.Error()) } + <-cc.NewStateCh // Drain to match picker pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // Does not change the aggregate state, because round robin does not leave + // TRANIENT_FAILURE if a subconn goes CONNECTING. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) - p6 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p6.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } } @@ -373,6 +358,9 @@ func (s) TestPriority_SwitchPriority(t *testing.T) { // Init 0 and 1; 0 is up, use 0; 0 is connecting, 1 is up, use 1; 0 is ready, // use 0. func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -408,10 +396,8 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p0 := <-cc.NewPickerCh - want := []balancer.SubConn{sc0} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p0)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { + t.Fatal(err.Error()) } // Turn 0 to TransientFailure, will start and use 1. @@ -419,11 +405,8 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - p1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := p1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } // Handle SubConn creation from 1. @@ -436,12 +419,8 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. - p2 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p2.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Turn 0 back to Ready. @@ -454,12 +433,8 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove) } - p3 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p3.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { + t.Fatal(err.Error()) } } @@ -467,6 +442,9 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) { // // Init 0 and 1; 0 and 1 both down; add 2, use 2. func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -497,16 +475,13 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { } sc0 := <-cc.NewSubConnCh - // Turn down 0, 1 is used. + t.Log("Turn down 0, 1 is used.") pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - pFail0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail0.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs1 := <-cc.NewSubConnAddrsCh @@ -514,18 +489,17 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { t.Fatalf("sc is created with addr %v, want %v", got, want) } sc1 := <-cc.NewSubConnCh - // Turn down 1, pick should error. + + t.Log("Turn down 1, pick should error.") pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Test pick failure. - pFail1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail1.Pick(balancer.PickInfo{}); err == nil { - t.Fatalf("want pick error non-nil, got nil") - } + if err := cc.WaitForErrPicker(ctx); err != nil { + t.Fatal(err.Error()) } + <-cc.NewStateCh // Drain to match picker - // Add p2, it should create a new SubConn. + t.Log("Add p2, it should create a new SubConn.") if err := pb.UpdateClientConnState(balancer.ClientConnState{ ResolverState: resolver.State{ Addresses: []resolver.Address{ @@ -547,11 +521,8 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { } // A new connecting picker should be updated for the new priority. - p0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := p0.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs2 := <-cc.NewSubConnAddrsCh @@ -563,12 +534,8 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. - p1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) } } @@ -576,7 +543,8 @@ func (s) TestPriority_HigherDownWhileAddingLower(t *testing.T) { // // Init 0,1,2; 0 and 1 down, use 2; 0 up, close 1 and 2. func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { - // defer time.Sleep(10 * time.Millisecond) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) @@ -614,11 +582,8 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - pFail0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail0.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs1 := <-cc.NewSubConnAddrsCh @@ -631,11 +596,8 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 2 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - pFail1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs2 := <-cc.NewSubConnAddrsCh @@ -647,12 +609,8 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 2. - p2 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p2.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc2, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc2) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) } // When 0 becomes ready, 0 should be used, 1 and 2 should all be closed. @@ -671,12 +629,8 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { } // Test pick with 0. - p0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p0.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { + t.Fatal(err.Error()) } } @@ -685,7 +639,10 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) { // // Init 0,1; 0 is not ready (in connecting), after timeout, use 1. func (s) TestPriority_InitTimeout(t *testing.T) { - const testPriorityInitTimeout = time.Second + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + const testPriorityInitTimeout = 200 * time.Millisecond defer func() func() { old := DefaultPriorityInitTimeout DefaultPriorityInitTimeout = testPriorityInitTimeout @@ -742,23 +699,21 @@ func (s) TestPriority_InitTimeout(t *testing.T) { // After the init timer of p0, when switching to p1, a connecting picker // will be sent to the parent. Clear it here. - <-cc.NewPickerCh pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 1. - p1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - gotSCSt, _ := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) - } + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } } // EDS removes all priorities, and re-adds them. func (s) TestPriority_RemovesAllPriorities(t *testing.T) { - const testPriorityInitTimeout = time.Second + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + const testPriorityInitTimeout = 200 * time.Millisecond defer func() func() { old := DefaultPriorityInitTimeout DefaultPriorityInitTimeout = testPriorityInitTimeout @@ -800,10 +755,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p0 := <-cc.NewPickerCh - want := []balancer.SubConn{sc0} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p0)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil { + t.Fatal(err.Error()) } // Remove all priorities. @@ -826,11 +779,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { } // Test pick return TransientFailure. - pFail := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail.Pick(balancer.PickInfo{}); err != ErrAllPrioritiesRemoved { - t.Fatalf("want pick error %v, got %v", ErrAllPrioritiesRemoved, err) - } + if err := cc.WaitForPickerWithErr(ctx, ErrAllPrioritiesRemoved); err != nil { + t.Fatal(err.Error()) } // Re-add two localities, with previous priorities, but different backends. @@ -861,7 +811,6 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Don't send any update to p0, so to not override the old state of p0. // Later, connect to p1 and then remove p1. This will fallback to p0, and // will send p0's old picker if they are not correctly removed. - <-cc.NewPickerCh // Clear the picker from old p0. // p1 will be used after priority init timeout. addrs11 := <-cc.NewSubConnAddrsCh @@ -873,10 +822,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { pb.UpdateSubConnState(sc11, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. - p1 := <-cc.NewPickerCh - want = []balancer.SubConn{sc11} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc11); err != nil { + t.Fatal(err.Error()) } // Remove p1, to fallback to p0. @@ -903,11 +850,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { } // Test pick return NoSubConn. - pFail1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if scst, err := pFail1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error _, %v, got %v, _ ,%v", balancer.ErrNoSubConnAvailable, scst, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } // Send an ready update for the p0 sc that was received when re-adding @@ -916,10 +860,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { pb.UpdateSubConnState(sc01, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p2 := <-cc.NewPickerCh - want = []balancer.SubConn{sc01} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc01); err != nil { + t.Fatal(err.Error()) } select { @@ -936,6 +878,9 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) { // Test the case where the high priority contains no backends. The low priority // will be used. func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -971,10 +916,8 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p1 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Remove addresses from priority 0, should use p1. @@ -1008,11 +951,8 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - pFail1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } // p1 is ready. @@ -1020,16 +960,14 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) { pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. - p2 := <-cc.NewPickerCh - want = []balancer.SubConn{sc2} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) } } // Test the case where the first and only priority is removed. func (s) TestPriority_FirstPriorityUnavailable(t *testing.T) { - const testPriorityInitTimeout = time.Second + const testPriorityInitTimeout = 200 * time.Millisecond defer func(t time.Duration) { DefaultPriorityInitTimeout = t }(DefaultPriorityInitTimeout) @@ -1078,6 +1016,9 @@ func (s) TestPriority_FirstPriorityUnavailable(t *testing.T) { // // Init a(p0) and b(p1); a(p0) is up, use a; move b to p0, a to p1, use b. func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1113,10 +1054,8 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p1 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Swap child with p0 and p1, the child at lower priority should now be the @@ -1143,11 +1082,8 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // balancer should immediately update the picker so the picker from old // child is not used. In this case, the picker becomes a // no-subconn-available picker because this child is just started. - pFail := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } // Old subconn should be removed. @@ -1167,10 +1103,8 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only new subconns. - p2 := <-cc.NewPickerCh - want2 := []balancer.SubConn{sc2} - if err := testutils.IsRoundRobin(want2, subConnFromPicker(t, p2)); err != nil { - t.Fatalf("want %v, got %v", want2, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) } } @@ -1179,6 +1113,9 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) { // // Init a(p0) and b(p1); a(p0) is down, use b; move b to p0, a to p1, use b. func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1213,11 +1150,8 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - pFail0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail0.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs1 := <-cc.NewSubConnAddrsCh @@ -1229,10 +1163,8 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. - p0 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p0)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Swap child with p0 and p1, the child at lower priority should now be the @@ -1279,6 +1211,9 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) { // // Init a(p0) and b(p1); a(p0) is down, use b; move b to p0, a to p1, use b. func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1313,11 +1248,8 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - pFail0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail0.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs1 := <-cc.NewSubConnAddrsCh @@ -1329,10 +1261,8 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p1 subconns. - p0 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p0)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Remove child with p1, the child at higher priority should now be used. @@ -1358,12 +1288,10 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove) } - pFail := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail.Pick(balancer.PickInfo{}); err == nil { - t.Fatalf("want pick error , got %v", err) - } + if err := cc.WaitForErrPicker(ctx); err != nil { + t.Fatal(err.Error()) } + <-cc.NewStateCh // Drain to match picker // Because there was no new child, no new subconn should be created. select { @@ -1378,6 +1306,9 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) { // Init 0; 0 is up, use 0; remove 0, only picker is updated, no subconn is // removed; re-add 0, picker is updated. func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + const testChildCacheTimeout = time.Second defer func() func() { old := balancergroup.DefaultSubBalancerCloseTimeout @@ -1420,10 +1351,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p1 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Remove the child, it shouldn't cause any conn changed, but picker should @@ -1435,11 +1364,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - pFail := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := pFail.Pick(balancer.PickInfo{}); err != ErrAllPrioritiesRemoved { - t.Fatalf("want pick error %v, got %v", ErrAllPrioritiesRemoved, err) - } + if err := cc.WaitForPickerWithErr(ctx, ErrAllPrioritiesRemoved); err != nil { + t.Fatal(err.Error()) } // But no conn changes should happen. Child balancer is in cache. @@ -1469,10 +1395,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { } // Test roundrobin with only p0 subconns. - p2 := <-cc.NewPickerCh - want2 := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want2, subConnFromPicker(t, p2)); err != nil { - t.Fatalf("want %v, got %v", want2, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // But no conn changes should happen. Child balancer is just taken out from @@ -1490,6 +1414,9 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) { // // Init 0; 0 is up, use 0; change 0's policy, 0 is used. func (s) TestPriority_ChildPolicyChange(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1523,10 +1450,8 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test roundrobin with only p0 subconns. - p1 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(t, p1)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } // Change the policy for the child (still roundrobin, but with a different @@ -1563,10 +1488,8 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) { pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pickfirst with the new subconns. - p2 := <-cc.NewPickerCh - want2 := []balancer.SubConn{sc2} - if err := testutils.IsRoundRobin(want2, subConnFromPicker(t, p2)); err != nil { - t.Fatalf("want %v, got %v", want2, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) } } @@ -1590,6 +1513,9 @@ func init() { // (e.g., roundrobin handling empty addresses). There could be deadlock caused // by acquiring a locked mutex. func (s) TestPriority_ChildPolicyUpdatePickerInline(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1612,18 +1538,17 @@ func (s) TestPriority_ChildPolicyUpdatePickerInline(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - p0 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - _, err := p0.Pick(balancer.PickInfo{}) - if err != errTestInlineStateUpdate { - t.Fatalf("picker.Pick, got err %q, want err %q", err, errTestInlineStateUpdate) - } + if err := cc.WaitForPickerWithErr(ctx, errTestInlineStateUpdate); err != nil { + t.Fatal(err.Error()) } } // When the child policy's configured to ignore reresolution requests, the // ResolveNow() calls from this child should be all ignored. func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1650,8 +1575,6 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() // This is the balancer.ClientConn that the inner resolverNowBalancer is // built with. balancerCCI, err := resolveNowBalancerCCCh.Receive(ctx) @@ -1705,6 +1628,9 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { // ResolveNow() calls from this child should be all ignored, from the other // children are forwarded. func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1735,8 +1661,6 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() // This is the balancer.ClientConn from p0. balancerCCI0, err := resolveNowBalancerCCCh.Receive(ctx) if err != nil { @@ -1795,6 +1719,10 @@ func init() { stub.Register(fmt.Sprintf("%s-%d", initIdleBalancerName, ii), stub.BalancerFuncs{ UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error { bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{}) + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}, + }) return nil }, UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { @@ -1817,6 +1745,9 @@ func init() { // // Init 0, 1; 0 is Idle, use 0; 0 is down, start 1; 1 is Idle, use 1. func (s) TestPriority_HighPriorityInitIdle(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1849,20 +1780,16 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) { // Send an Idle state update to trigger an Idle picker update. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) - p0 := <-cc.NewPickerCh - if pr, err := p0.Pick(balancer.PickInfo{}); err != errsTestInitIdle[0] { - t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[0]) + if err := cc.WaitForPickerWithErr(ctx, errsTestInitIdle[0]); err != nil { + t.Fatal(err.Error()) } // Turn p0 down, to start p1. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs // will retry. - p1 := <-cc.NewPickerCh - for i := 0; i < 5; i++ { - if _, err := p1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable { - t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } addrs1 := <-cc.NewSubConnAddrsCh @@ -1872,9 +1799,8 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) { sc1 := <-cc.NewSubConnCh // Idle picker from p1 should also be forwarded. pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle}) - p2 := <-cc.NewPickerCh - if pr, err := p2.Pick(balancer.PickInfo{}); err != errsTestInitIdle[1] { - t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[1]) + if err := cc.WaitForPickerWithErr(ctx, errsTestInitIdle[1]); err != nil { + t.Fatal(err.Error()) } } @@ -1885,6 +1811,9 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) { // // Init 0; 0 is Idle, use 0; add 1, use 0. func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) pb := bb.Build(cc, balancer.BuildOptions{}) @@ -1915,9 +1844,8 @@ func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) { // Send an Idle state update to trigger an Idle picker update. pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) - p0 := <-cc.NewPickerCh - if pr, err := p0.Pick(balancer.PickInfo{}); err != errsTestInitIdle[0] { - t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[0]) + if err := cc.WaitForPickerWithErr(ctx, errsTestInitIdle[0]); err != nil { + t.Fatal(err.Error()) } // Add 1, should keep using 0.