New implementation of roundrobin and pickfirst (#1506)

This commit is contained in:
Menghan Li
2017-10-02 09:22:57 -07:00
committed by dfawley
parent 8443e311d3
commit 4bbdf230d7
21 changed files with 2146 additions and 274 deletions

View File

@ -182,6 +182,10 @@ type Picker interface {
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
@ -196,6 +200,7 @@ type Balancer interface {
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer.
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}

View File

@ -0,0 +1,241 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package roundrobin defines a roundrobin balancer. Roundrobin balancer is
// installed as one of the default balancers in gRPC, users don't need to
// explicitly install this balancer.
package roundrobin
import (
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return &rrBuilder{}
}
func init() {
balancer.Register(newBuilder())
}
type rrBuilder struct{}
func (*rrBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &rrBalancer{
cc: cc,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &connectivityStateEvaluator{},
// Initialize picker to a picker that always return
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
// may call UpdateBalancerState with this picker.
picker: newPicker([]balancer.SubConn{}, nil),
}
}
func (*rrBuilder) Name() string {
return "roundrobin"
}
type rrBalancer struct {
cc balancer.ClientConn
csEvltr *connectivityStateEvaluator
state connectivity.State
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker *picker
}
func (b *rrBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
grpclog.Infof("roundrobin.rrBalancer: HandleResolvedAddrs called with error %v", err)
return
}
grpclog.Infoln("roundrobin.rrBalancer: got new resolved addresses: ", addrs)
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
addrsSet[a] = struct{}{}
if _, ok := b.subConns[a]; !ok {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("roundrobin.rrBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
b.scStates[sc] = connectivity.Idle
sc.Connect()
}
}
for a, sc := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(sc)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in HandleSubConnStateChange.
}
}
}
// regeneratePicker takes a snapshot of the balancer, and generates a picker
// from it. The picker
// - always returns ErrTransientFailure if the balancer is in TransientFailure,
// - or does round robin selection of all READY SubConns otherwise.
func (b *rrBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = newPicker(nil, balancer.ErrTransientFailure)
return
}
var readySCs []balancer.SubConn
for sc, st := range b.scStates {
if st == connectivity.Ready {
readySCs = append(readySCs, sc)
}
}
b.picker = newPicker(readySCs, nil)
}
func (b *rrBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("roundrobin.rrBalancer: handle SubConn state change: %p, %v", sc, s)
oldS, ok := b.scStates[sc]
if !ok {
grpclog.Infof("roundrobin.rrBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
b.scStates[sc] = s
switch s {
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}
oldAggrState := b.state
b.state = b.csEvltr.recordTransition(oldS, s)
// Regenerate picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
b.regeneratePicker()
}
b.cc.UpdateBalancerState(b.state, b.picker)
return
}
// Close is a nop because roundrobin balancer doesn't internal state to clean
// up, and it doesn't need to call RemoveSubConn for the SubConns.
func (b *rrBalancer) Close() {
}
type picker struct {
// If err is not nil, Pick always returns this err. It's immutable after
// picker is created.
err error
// subConns is the snapshot of the roundrobin balancer when this picker was
// created. The slice is immutable. Each Get() will do a round robin
// selection from it and return the selected SubConn.
subConns []balancer.SubConn
mu sync.Mutex
next int
}
func newPicker(scs []balancer.SubConn, err error) *picker {
grpclog.Infof("roundrobinPicker: newPicker called with scs: %v, %v", scs, err)
if err != nil {
return &picker{err: err}
}
return &picker{
subConns: scs,
}
}
func (p *picker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
if p.err != nil {
return nil, nil, p.err
}
if len(p.subConns) <= 0 {
return nil, nil, balancer.ErrNoSubConnAvailable
}
p.mu.Lock()
sc := p.subConns[p.next]
p.next = (p.next + 1) % len(p.subConns)
p.mu.Unlock()
return sc, nil, nil
}
// connectivityStateEvaluator gets updated by addrConns when their
// states transition, based on which it evaluates the state of
// ClientConn.
type connectivityStateEvaluator struct {
numReady uint64 // Number of addrConns in ready state.
numConnecting uint64 // Number of addrConns in connecting state.
numTransientFailure uint64 // Number of addrConns in transientFailure.
}
// recordTransition records state change happening in every subConn and based on
// that it evaluates what aggregated state should be.
// It can only transition between Ready, Connecting and TransientFailure. Other states,
// Idle and Shutdown are transitioned into by ClientConn; in the begining of the connection
// before any subConn is created ClientConn is in idle state. In the end when ClientConn
// closes it is in Shutdown state.
//
// recordTransition should only be called synchronously from the same goroutine.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
switch state {
case connectivity.Ready:
cse.numReady += updateVal
case connectivity.Connecting:
cse.numConnecting += updateVal
case connectivity.TransientFailure:
cse.numTransientFailure += updateVal
}
}
// Evaluate.
if cse.numReady > 0 {
return connectivity.Ready
}
if cse.numConnecting > 0 {
return connectivity.Connecting
}
return connectivity.TransientFailure
}

View File

@ -0,0 +1,470 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package roundrobin
import (
"fmt"
"net"
"sync"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/test/leakcheck"
)
type testServer struct {
testpb.TestServiceServer
}
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
return &testpb.Empty{}, nil
}
func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return nil
}
type test struct {
servers []*grpc.Server
addresses []string
}
func (t *test) cleanup() {
for _, s := range t.servers {
s.Stop()
}
}
func startTestServers(count int) (_ *test, err error) {
t := &test{}
defer func() {
if err != nil {
for _, s := range t.servers {
s.Stop()
}
}
}()
for i := 0; i < count; i++ {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, fmt.Errorf("Failed to listen %v", err)
}
s := grpc.NewServer()
testpb.RegisterTestServiceServer(s, &testServer{})
t.servers = append(t.servers, s)
t.addresses = append(t.addresses, lis.Addr().String())
go func(s *grpc.Server, l net.Listener) {
s.Serve(l)
}(s, lis)
}
return t, nil
}
func TestOneBackend(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
}
func TestBackendsRoundRobin(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
backendCount := 5
test, err := startTestServers(backendCount)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
var resolvedAddrs []resolver.Address
for i := 0; i < backendCount; i++ {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() == test.addresses[si] {
connected = true
break
}
time.Sleep(time.Millisecond)
}
if !connected {
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
}
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
}
func TestAddressesRemoved(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{})
for i := 0; i < 1000; i++ {
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
}
func TestCloseWithPendingRPC(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
testc := testpb.NewTestServiceClient(cc)
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// This RPC blocks until cc is closed.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) == codes.DeadlineExceeded {
t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing")
}
cancel()
}()
}
cc.Close()
wg.Wait()
}
func TestNewAddressWhileBlocking(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
test, err := startTestServers(1)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
// The second RPC should succeed.
ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, nil", err)
}
r.NewAddress([]resolver.Address{})
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// This RPC blocks until NewAddress is called.
testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false))
}()
}
time.Sleep(50 * time.Millisecond)
r.NewAddress([]resolver.Address{{Addr: test.addresses[0]}})
wg.Wait()
}
func TestOneServerDown(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
backendCount := 3
test, err := startTestServers(backendCount)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
var resolvedAddrs []resolver.Address
for i := 0; i < backendCount; i++ {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() == test.addresses[si] {
connected = true
break
}
time.Sleep(time.Millisecond)
}
if !connected {
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
}
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
// Stop one server, RPCs should roundrobin among the remaining servers.
backendCount--
test.servers[backendCount].Stop()
// Loop until see server[backendCount-1] twice without seeing server[backendCount].
var targetSeen int
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
switch p.Addr.String() {
case test.addresses[backendCount-1]:
targetSeen++
case test.addresses[backendCount]:
// Reset targetSeen if peer is server[backendCount].
targetSeen = 0
}
// Break to make sure the last picked address is server[-1], so the following for loop won't be flaky.
if targetSeen >= 2 {
break
}
}
if targetSeen != 2 {
t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]")
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
}
func TestAllServersDown(t *testing.T) {
defer leakcheck.Check(t)
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
backendCount := 3
test, err := startTestServers(backendCount)
if err != nil {
t.Fatalf("failed to start servers: %v", err)
}
defer test.cleanup()
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithBalancerBuilder(newBuilder()))
if err != nil {
t.Fatalf("failed to dial: %v", err)
}
defer cc.Close()
testc := testpb.NewTestServiceClient(cc)
// The first RPC should fail because there's no address.
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
var resolvedAddrs []resolver.Address
for i := 0; i < backendCount; i++ {
resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]})
}
r.NewAddress(resolvedAddrs)
var p peer.Peer
// Make sure connections to all servers are up.
for si := 0; si < backendCount; si++ {
var connected bool
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() == test.addresses[si] {
connected = true
break
}
time.Sleep(time.Millisecond)
}
if !connected {
t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si])
}
}
for i := 0; i < 3*backendCount; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false), grpc.Peer(&p)); err != nil {
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
}
if p.Addr.String() != test.addresses[i%backendCount] {
t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String())
}
}
// All servers are stopped, failfast RPC should fail with unavailable.
for i := 0; i < backendCount; i++ {
test.servers[i].Stop()
}
time.Sleep(100 * time.Millisecond)
for i := 0; i < 1000; i++ {
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.Unavailable {
return
}
time.Sleep(time.Millisecond)
}
t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
}