From 3e71fb360ddc2df6701f6d6f4c11bb3351ab0730 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 27 Jun 2016 14:36:59 -0700 Subject: [PATCH 1/5] Support fail-fast mode and make it the default --- balancer.go | 22 +++++++++++++++----- balancer_test.go | 12 +++++------ call.go | 10 ++++++--- clientconn.go | 19 ++++++++++++++---- rpc_util.go | 14 +++++++++++++ stream.go | 22 +++++++++++++++----- test/end2end_test.go | 48 +++++++++++++++++++++++++++++++++++++++++--- 7 files changed, 121 insertions(+), 26 deletions(-) diff --git a/balancer.go b/balancer.go index c298ae91..307e3dc1 100644 --- a/balancer.go +++ b/balancer.go @@ -94,10 +94,10 @@ type Balancer interface { // instead of blocking. // // The function returns put which is called once the rpc has completed or failed. - // put can collect and report RPC stats to a remote load balancer. gRPC internals - // will try to call this again if err is non-nil (unless err is ErrClientConnClosing). + // put can collect and report RPC stats to a remote load balancer. // - // TODO: Add other non-recoverable errors? + // This function should only return the errors Balancer cannot recover by itself. + // gRPC internals will fail the RPC if an error is returned. Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) // Notify returns a channel that is used by gRPC internals to watch the addresses // gRPC needs to connect. The addresses might be from a name resolver or remote @@ -298,8 +298,20 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad } } } - // There is no address available. Wait on rr.waitCh. - // TODO(zhaoq): Handle the case when opts.BlockingWait is false. + // There is no address available. + if !opts.BlockingWait { + if len(rr.addrs) == 0 { + rr.mu.Unlock() + err = fmt.Errorf("there is no address available") + return + } + // Returns the next addr on rr.addrs for failfast RPCs. + addr = rr.addrs[rr.next].addr + rr.next++ + rr.mu.Unlock() + return + } + // Wait on rr.waitCh for non-failfast RPCs. if rr.waitCh == nil { ch = make(chan struct{}) rr.waitCh = ch diff --git a/balancer_test.go b/balancer_test.go index 9d8d2bcd..d0cf0611 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -239,7 +239,7 @@ func TestCloseWithPendingRPC(t *testing.T) { t.Fatalf("Failed to create ClientConn: %v", err) } var reply string - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil { + if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port) } // Remove the server. @@ -251,7 +251,7 @@ func TestCloseWithPendingRPC(t *testing.T) { // Loop until the above update applies. for { ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); Code(err) == codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { break } time.Sleep(10 * time.Millisecond) @@ -262,7 +262,7 @@ func TestCloseWithPendingRPC(t *testing.T) { go func() { defer wg.Done() var reply string - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err == nil { + if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) } }() @@ -270,7 +270,7 @@ func TestCloseWithPendingRPC(t *testing.T) { defer wg.Done() var reply string time.Sleep(5 * time.Millisecond) - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err == nil { + if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err == nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err) } }() @@ -295,7 +295,7 @@ func TestGetOnWaitChannel(t *testing.T) { for { var reply string ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) - if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); Code(err) == codes.DeadlineExceeded { + if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); Code(err) == codes.DeadlineExceeded { break } time.Sleep(10 * time.Millisecond) @@ -305,7 +305,7 @@ func TestGetOnWaitChannel(t *testing.T) { go func() { defer wg.Done() var reply string - if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil { + if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc, FailFast(false)); err != nil { t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want ", err) } }() diff --git a/call.go b/call.go index d6d993b4..fb2144b2 100644 --- a/call.go +++ b/call.go @@ -35,6 +35,7 @@ package grpc import ( "bytes" + //"fmt" "io" "time" @@ -101,7 +102,7 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd // Invoke is called by generated code. Also users can call Invoke directly when it // is really needed in their use cases. func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) { - var c callInfo + c := defaultCallInfo for _, o := range opts { if err := o.before(&c); err != nil { return toRPCErr(err) @@ -165,9 +166,12 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if c.failFast { return toRPCErr(err) } + continue } - // All the remaining cases are treated as retryable. - continue + // ALl the other errors are treated as Internal errors. + return Errorf(codes.Internal, "%v", err) + // All the remaining cases are treated as fatal. + //panic(fmt.Sprintf("ClientConn.getTransport got an unsupported error: %v", err)) } if c.traceInfo.tr != nil { c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) diff --git a/clientconn.go b/clientconn.go index 9b9c78d0..79965ec3 100644 --- a/clientconn.go +++ b/clientconn.go @@ -424,7 +424,6 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { } func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { - // TODO(zhaoq): Implement fail-fast logic. addr, put, err := cc.balancer.Get(ctx, opts) if err != nil { return nil, nil, err @@ -442,7 +441,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) } return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc") } - t, err := ac.wait(ctx) + t, err := ac.wait(ctx, !opts.BlockingWait) if err != nil { if put != nil { put() @@ -649,8 +648,9 @@ func (ac *addrConn) transportMonitor() { } } -// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed. -func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) { +// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or +// iv) transport is in TransientFailure and the RPC is fail-fast. +func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { @@ -662,6 +662,10 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) ac.mu.Unlock() return ct, nil default: + if ac.state == TransientFailure && failFast { + ac.mu.Unlock() + return nil, transport.StreamErrorf(codes.Canceled, "grpc: RPC failed fast due to transport failure") + } ready := ac.ready if ready == nil { ready = make(chan struct{}) @@ -673,6 +677,13 @@ func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) return nil, transport.ContextErr(ctx.Err()) // Wait until the new transport is ready or failed. case <-ready: + ac.mu.Lock() + if ac.state == TransientFailure && failFast { + ac.mu.Unlock() + return nil, transport.StreamErrorf(codes.Canceled, "grpc: RPC failed fast due to transport failure") + } + ac.mu.Unlock() + } } } diff --git a/rpc_util.go b/rpc_util.go index 080ebb14..36c60183 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -141,6 +141,8 @@ type callInfo struct { traceInfo traceInfo // in trace.go } +var defaultCallInfo = callInfo{failFast: true} + // CallOption configures a Call before it starts or extracts information from // a Call after it completes. type CallOption interface { @@ -179,6 +181,18 @@ func Trailer(md *metadata.MD) CallOption { }) } +// FailFast configures the action to take when an RPC is attempted on broken +// connections or unreachable servers. If failfast is true, the RPC will fail +// immediately. Otherwise, the RPC client will block the call until a +// connection is available (or the call is canceled or times out) and will retry +// the call if it fails due to a transient error. +func FailFast(failFast bool) CallOption { + return beforeCall(func(c *callInfo) error { + c.failFast = failFast + return nil + }) +} + // The format of the payload: compressed or not? type payloadFormat uint8 diff --git a/stream.go b/stream.go index 25be4b81..21b73a9c 100644 --- a/stream.go +++ b/stream.go @@ -105,9 +105,14 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth err error put func() ) - // TODO(zhaoq): CallOption is omitted. Add support when it is needed. + c := defaultCallInfo + for _, o := range opts { + if err := o.before(&c); err != nil { + return nil, toRPCErr(err) + } + } gopts := BalancerGetOptions{ - BlockingWait: false, + BlockingWait: !c.failFast, } t, put, err = cc.getTransport(ctx, gopts) if err != nil { @@ -122,6 +127,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth callHdr.SendCompress = cc.dopts.cp.Type() } cs := &clientStream{ + opts: opts, + c: c, desc: desc, put: put, codec: cc.dopts.codec, @@ -167,6 +174,8 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { + opts []CallOption + c callInfo t transport.ClientTransport s *transport.Stream p *parser @@ -312,15 +321,18 @@ func (cs *clientStream) closeTransportStream(err error) { } func (cs *clientStream) finish(err error) { - if !cs.tracing { - return - } cs.mu.Lock() defer cs.mu.Unlock() + for _, o := range cs.opts { + o.after(&cs.c) + } if cs.put != nil { cs.put() cs.put = nil } + if !cs.tracing { + return + } if cs.trInfo.tr != nil { if err == nil || err == io.EOF { cs.trInfo.tr.LazyPrintf("RPC: [OK]") diff --git a/test/end2end_test.go b/test/end2end_test.go index b539584b..5a0759f8 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -550,7 +550,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) } te.srv.Stop() @@ -558,12 +558,54 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { // notification in time the failure path of the 1st invoke of // ClientConn.wait hits the deadline exceeded error. ctx, _ := context.WithTimeout(context.Background(), -1) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded { + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } awaitNewConnLogOutput() } +func TestFailFast(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testFailFast(t, e) + } +} + +func testFailFast(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", + "grpc: Conn.resetTransport failed to create client transport: connection error", + "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", + ) + te.startServer() + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, ", err) + } + // Stop the server and tear down all the exisiting connections. + te.srv.Stop() + // Issue an RPC to make sure the server teardown is propagated to the client already. + ctx, _ := context.WithTimeout(context.Background(), time.Millisecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { + t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) + } + // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Canceled. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Canceled { + t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %d", err, codes.Canceled) + } + if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.Canceled { + t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %d", err, codes.Canceled) + } + + awaitNewConnLogOutput() +} + func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) { ctx, _ := context.WithTimeout(context.Background(), d) hc := healthpb.NewHealthClient(cc) @@ -879,7 +921,7 @@ func performOneRPC(t *testing.T, tc testpb.TestServiceClient, wg *sync.WaitGroup ResponseSize: proto.Int32(respSize), Payload: payload, } - reply, err := tc.UnaryCall(context.Background(), req) + reply, err := tc.UnaryCall(context.Background(), req, grpc.FailFast(false)) if err != nil { t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, ", err) return From 01ef81a4d918548bff15b1d6df8bc17620fea3c7 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 27 Jun 2016 15:30:20 -0700 Subject: [PATCH 2/5] minor fix including removal of debugging logs, error code fix, etc. --- balancer.go | 1 - call.go | 3 --- clientconn.go | 4 ++-- rpc_util.go | 5 +++-- test/end2end_test.go | 10 +++++----- 5 files changed, 10 insertions(+), 13 deletions(-) diff --git a/balancer.go b/balancer.go index 307e3dc1..8c588785 100644 --- a/balancer.go +++ b/balancer.go @@ -298,7 +298,6 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad } } } - // There is no address available. if !opts.BlockingWait { if len(rr.addrs) == 0 { rr.mu.Unlock() diff --git a/call.go b/call.go index fb2144b2..baa912dd 100644 --- a/call.go +++ b/call.go @@ -35,7 +35,6 @@ package grpc import ( "bytes" - //"fmt" "io" "time" @@ -170,8 +169,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } // ALl the other errors are treated as Internal errors. return Errorf(codes.Internal, "%v", err) - // All the remaining cases are treated as fatal. - //panic(fmt.Sprintf("ClientConn.getTransport got an unsupported error: %v", err)) } if c.traceInfo.tr != nil { c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) diff --git a/clientconn.go b/clientconn.go index 79965ec3..864abab7 100644 --- a/clientconn.go +++ b/clientconn.go @@ -664,7 +664,7 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr default: if ac.state == TransientFailure && failFast { ac.mu.Unlock() - return nil, transport.StreamErrorf(codes.Canceled, "grpc: RPC failed fast due to transport failure") + return nil, transport.StreamErrorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") } ready := ac.ready if ready == nil { @@ -680,7 +680,7 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr ac.mu.Lock() if ac.state == TransientFailure && failFast { ac.mu.Unlock() - return nil, transport.StreamErrorf(codes.Canceled, "grpc: RPC failed fast due to transport failure") + return nil, transport.StreamErrorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") } ac.mu.Unlock() diff --git a/rpc_util.go b/rpc_util.go index 36c60183..3bdc7c84 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -183,9 +183,10 @@ func Trailer(md *metadata.MD) CallOption { // FailFast configures the action to take when an RPC is attempted on broken // connections or unreachable servers. If failfast is true, the RPC will fail -// immediately. Otherwise, the RPC client will block the call until a +// immediately. Otherwise, the RPC client will block the call until a // connection is available (or the call is canceled or times out) and will retry -// the call if it fails due to a transient error. +// the call if it fails due to a transient error. Please refer to +// https://github.com/grpc/grpc/blob/master/doc/fail_fast.md func FailFast(failFast bool) CallOption { return beforeCall(func(c *callInfo) error { c.failFast = failFast diff --git a/test/end2end_test.go b/test/end2end_test.go index 5a0759f8..8e399262 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -595,12 +595,12 @@ func testFailFast(t *testing.T, e env) { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) } - // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Canceled. - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Canceled { - t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %d", err, codes.Canceled) + // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { + t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %d", err, codes.Unavailable) } - if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.Canceled { - t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %d", err, codes.Canceled) + if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.Unavailable { + t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %d", err, codes.Unavailable) } awaitNewConnLogOutput() From 213a20c4fe8f5e87d7eaf70598ee3c0413eefca7 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 28 Jun 2016 16:08:19 -0700 Subject: [PATCH 3/5] bug fix, typo fix and slight error refactoring --- balancer.go | 3 +-- call.go | 12 +++++----- clientconn.go | 20 +++++------------ rpc_util.go | 13 +++++++++++ stream.go | 53 ++++++++++++++++++++++++++++++++++---------- test/end2end_test.go | 4 +--- 6 files changed, 68 insertions(+), 37 deletions(-) diff --git a/balancer.go b/balancer.go index 8c588785..dd4be12f 100644 --- a/balancer.go +++ b/balancer.go @@ -40,7 +40,6 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/naming" - "google.golang.org/grpc/transport" ) // Address represents a server the client connects to. @@ -321,7 +320,7 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad for { select { case <-ctx.Done(): - err = transport.ContextErr(ctx.Err()) + err = ctx.Err() return case <-ch: rr.mu.Lock() diff --git a/call.go b/call.go index baa912dd..49406637 100644 --- a/call.go +++ b/call.go @@ -155,19 +155,19 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli t, put, err = cc.getTransport(ctx, gopts) if err != nil { // TODO(zhaoq): Probably revisit the error handling. + if _, ok := err.(rpcError); ok { + return err + } if err == ErrClientConnClosing { return Errorf(codes.FailedPrecondition, "%v", err) } - if _, ok := err.(transport.StreamError); ok { - return toRPCErr(err) - } - if _, ok := err.(transport.ConnectionError); ok { + if err == errConnClosing { if c.failFast { - return toRPCErr(err) + return Errorf(codes.Unavailable, "%v", errConnClosing) } continue } - // ALl the other errors are treated as Internal errors. + // All the other errors are treated as Internal errors. return Errorf(codes.Internal, "%v", err) } if c.traceInfo.tr != nil { diff --git a/clientconn.go b/clientconn.go index 864abab7..2ef9fa4c 100644 --- a/clientconn.go +++ b/clientconn.go @@ -426,7 +426,7 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { addr, put, err := cc.balancer.Get(ctx, opts) if err != nil { - return nil, nil, err + return nil, nil, toRPCErr(err) } cc.mu.RLock() if cc.conns == nil { @@ -439,7 +439,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) if put != nil { put() } - return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc") + return nil, nil, Errorf(codes.Internal, "grpc: failed to find the transport to send the rpc") } t, err := ac.wait(ctx, !opts.BlockingWait) if err != nil { @@ -661,11 +661,10 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr ct := ac.transport ac.mu.Unlock() return ct, nil + case ac.state == TransientFailure && failFast: + ac.mu.Unlock() + return nil, Errorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") default: - if ac.state == TransientFailure && failFast { - ac.mu.Unlock() - return nil, transport.StreamErrorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") - } ready := ac.ready if ready == nil { ready = make(chan struct{}) @@ -674,16 +673,9 @@ func (ac *addrConn) wait(ctx context.Context, failFast bool) (transport.ClientTr ac.mu.Unlock() select { case <-ctx.Done(): - return nil, transport.ContextErr(ctx.Err()) + return nil, toRPCErr(ctx.Err()) // Wait until the new transport is ready or failed. case <-ready: - ac.mu.Lock() - if ac.state == TransientFailure && failFast { - ac.mu.Unlock() - return nil, transport.StreamErrorf(codes.Unavailable, "grpc: RPC failed fast due to transport failure") - } - ac.mu.Unlock() - } } } diff --git a/rpc_util.go b/rpc_util.go index 3bdc7c84..23d60e6c 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -389,6 +389,19 @@ func toRPCErr(err error) error { code: codes.Internal, desc: e.Desc, } + default: + switch err { + case context.DeadlineExceeded: + return rpcError{ + code: codes.DeadlineExceeded, + desc: err.Error(), + } + case context.Canceled: + return rpcError{ + code: codes.Canceled, + desc: err.Error(), + } + } } return Errorf(codes.Unknown, "%v", err) } diff --git a/stream.go b/stream.go index 21b73a9c..60319022 100644 --- a/stream.go +++ b/stream.go @@ -102,6 +102,7 @@ type ClientStream interface { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { var ( t transport.ClientTransport + s *transport.Stream err error put func() ) @@ -111,13 +112,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth return nil, toRPCErr(err) } } - gopts := BalancerGetOptions{ - BlockingWait: !c.failFast, - } - t, put, err = cc.getTransport(ctx, gopts) - if err != nil { - return nil, toRPCErr(err) - } callHdr := &transport.CallHdr{ Host: cc.authority, Method: method, @@ -130,7 +124,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth opts: opts, c: c, desc: desc, - put: put, codec: cc.dopts.codec, cp: cc.dopts.cp, dc: cc.dopts.dc, @@ -149,11 +142,47 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false) ctx = trace.NewContext(ctx, cs.trInfo.tr) } - s, err := t.NewStream(ctx, callHdr) - if err != nil { - cs.finish(err) - return nil, toRPCErr(err) + gopts := BalancerGetOptions{ + BlockingWait: !c.failFast, } + for { + t, put, err = cc.getTransport(ctx, gopts) + if err != nil { + // TODO(zhaoq): Probably revisit the error handling. + if _, ok := err.(rpcError); ok { + return nil, err + } + if err == ErrClientConnClosing { + return nil, Errorf(codes.FailedPrecondition, "%v", err) + } + if err == errConnClosing { + if c.failFast { + return nil, Errorf(codes.Unavailable, "%v", errConnClosing) + } + continue + } + // All the other errors are treated as Internal errors. + return nil, Errorf(codes.Internal, "%v", err) + } + + s, err = t.NewStream(ctx, callHdr) + if err != nil { + if put != nil { + put() + put = nil + } + if _, ok := err.(transport.ConnectionError); ok { + if c.failFast { + cs.finish(err) + return nil, toRPCErr(err) + } + continue + } + return nil, toRPCErr(err) + } + break + } + cs.put = put cs.t = t cs.s = s cs.p = &parser{r: s} diff --git a/test/end2end_test.go b/test/end2end_test.go index 8e399262..b65e505f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -1148,9 +1148,7 @@ func testNoService(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - // Make sure setting ack has been sent. - time.Sleep(20 * time.Millisecond) - stream, err := tc.FullDuplexCall(te.ctx) + stream, err := tc.FullDuplexCall(te.ctx, grpc.FailFast(false)) if err != nil { t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) } From be59d023f27ac0bc543cb7bee7f33519091ea4a0 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 29 Jun 2016 15:21:44 -0700 Subject: [PATCH 4/5] refactor error handling a bit --- call.go | 3 --- clientconn.go | 2 +- rpc_util.go | 6 ++++++ stream.go | 3 --- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/call.go b/call.go index 49406637..93262360 100644 --- a/call.go +++ b/call.go @@ -158,9 +158,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if _, ok := err.(rpcError); ok { return err } - if err == ErrClientConnClosing { - return Errorf(codes.FailedPrecondition, "%v", err) - } if err == errConnClosing { if c.failFast { return Errorf(codes.Unavailable, "%v", errConnClosing) diff --git a/clientconn.go b/clientconn.go index 2ef9fa4c..4e80ad73 100644 --- a/clientconn.go +++ b/clientconn.go @@ -431,7 +431,7 @@ func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) cc.mu.RLock() if cc.conns == nil { cc.mu.RUnlock() - return nil, nil, ErrClientConnClosing + return nil, nil, toRPCErr(ErrClientConnClosing) } ac, ok := cc.conns[addr] cc.mu.RUnlock() diff --git a/rpc_util.go b/rpc_util.go index 23d60e6c..91342bd8 100644 --- a/rpc_util.go +++ b/rpc_util.go @@ -401,7 +401,13 @@ func toRPCErr(err error) error { code: codes.Canceled, desc: err.Error(), } + case ErrClientConnClosing: + return rpcError{ + code: codes.FailedPrecondition, + desc: err.Error(), + } } + } return Errorf(codes.Unknown, "%v", err) } diff --git a/stream.go b/stream.go index 60319022..73d1da23 100644 --- a/stream.go +++ b/stream.go @@ -152,9 +152,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if _, ok := err.(rpcError); ok { return nil, err } - if err == ErrClientConnClosing { - return nil, Errorf(codes.FailedPrecondition, "%v", err) - } if err == errConnClosing { if c.failFast { return nil, Errorf(codes.Unavailable, "%v", errConnClosing) From c477cb3b147be4bc47e5e3f28564b98546077a29 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 30 Jun 2016 14:26:31 -0700 Subject: [PATCH 5/5] fix the flaky test --- test/end2end_test.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index b65e505f..326b8869 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -590,16 +590,18 @@ func testFailFast(t *testing.T, e env) { } // Stop the server and tear down all the exisiting connections. te.srv.Stop() - // Issue an RPC to make sure the server teardown is propagated to the client already. - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded { - t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded) + // Loop until the server teardown is propagated to the client. + for { + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.Unavailable { + break + } + time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable { t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %d", err, codes.Unavailable) } - if _, err := tc.StreamingInputCall(ctx); grpc.Code(err) != codes.Unavailable { + if _, err := tc.StreamingInputCall(context.Background()); grpc.Code(err) != codes.Unavailable { t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %d", err, codes.Unavailable) }