diff --git a/balancer_test.go b/balancer_test.go index b01b998e..a101fe15 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -320,3 +320,57 @@ func TestGetOnWaitChannel(t *testing.T) { cc.Close() servers[0].stop() } + +func TestOneConnectionDown(t *testing.T) { + // Start 2 servers. + numServers := 2 + servers, r := startServers(t, numServers, math.MaxUint32) + cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{})) + if err != nil { + t.Fatalf("Failed to create ClientConn: %v", err) + } + // Add servers[1] to the service discovery. + var updates []*naming.Update + updates = append(updates, &naming.Update{ + Op: naming.Add, + Addr: "127.0.0.1:" + servers[1].port, + }) + r.w.inject(updates) + req := "port" + var reply string + // Loop until servers[1] is up + for { + if err := Invoke(context.Background(), "/foo/bar", &req, &reply, cc); err != nil && ErrorDesc(err) == servers[1].port { + break + } + time.Sleep(10 * time.Millisecond) + } + + var wg sync.WaitGroup + numRPC := 100 + sleepDuration := 10 * time.Millisecond + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, kill server[0]. + servers[0].stop() + wg.Done() + }() + + // All non-failfast RPCs should not block because there's at least one connection available. + for i := 0; i < numRPC; i++ { + wg.Add(1) + go func() { + time.Sleep(sleepDuration) + // After sleepDuration, invoke RPC. + // server[0] is killed around the same time to make it racey between balancer and gRPC internals. + Invoke(context.Background(), "/foo/bar", &req, &reply, cc, FailFast(false)) + wg.Done() + }() + } + wg.Wait() + cc.Close() + for i := 0; i < numServers; i++ { + servers[i].stop() + } +} diff --git a/clientconn.go b/clientconn.go index abe82cb9..d16ea201 100644 --- a/clientconn.go +++ b/clientconn.go @@ -43,7 +43,6 @@ import ( "golang.org/x/net/context" "golang.org/x/net/trace" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/transport" @@ -502,7 +501,11 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) } return nil, nil, errConnClosing } - t, err := ac.wait(ctx, !opts.BlockingWait) + // ac.wait should block on transient failure only if balancer is nil and RPC is non-failfast. + // - If RPC is failfast, ac.wait should not block. + // - If balancer is not nil, ac.wait should return errConnClosing on transient failure + // so that non-failfast RPCs will try to get a new transport instead of waiting on ac. + t, err := ac.wait(ctx, cc.dopts.balancer == nil && opts.BlockingWait) if err != nil { if put != nil { put() @@ -754,8 +757,8 @@ func (ac *addrConn) transportMonitor() { } // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or -// iv) transport is in TransientFailure and the RPC is fail-fast. -func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) { +// iv) transport is in TransientFailure and blocking is false. +func (ac *addrConn) wait(ctx context.Context, blocking bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { @@ -767,9 +770,9 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr ct := ac.transport ac.mu.Unlock() return ct, nil - case ac.state == TransientFailure && failFast: + case ac.state == TransientFailure && !blocking: ac.mu.Unlock() - return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") + return nil, errConnClosing default: ready := ac.ready if ready == nil {