Merge pull request #864 from tamird/retry-timeout

transport: robustly detect temporary errors
This commit is contained in:
Qi Zhao
2016-08-26 15:02:58 -07:00
committed by GitHub
2 changed files with 54 additions and 31 deletions

View File

@ -413,8 +413,7 @@ func newTest(t *testing.T, e env) *test {
// call to te.tearDown to clean up.
func (te *test) startServer(ts testpb.TestServiceServer) {
te.testServer = ts
e := te.e
te.t.Logf("Running test in %s environment...", e.name)
te.t.Logf("Running test in %s environment...", te.e.name)
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
if te.maxMsgSize > 0 {
sopts = append(sopts, grpc.MaxMsgSize(te.maxMsgSize))
@ -432,12 +431,12 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
sopts = append(sopts, grpc.StreamInterceptor(te.streamInt))
}
la := "localhost:0"
switch e.network {
switch te.e.network {
case "unix":
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now())
syscall.Unlink(la)
}
lis, err := net.Listen(e.network, la)
lis, err := net.Listen(te.e.network, la)
if err != nil {
te.t.Fatalf("Failed to listen: %v", err)
}
@ -455,7 +454,7 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
}
s := grpc.NewServer(sopts...)
te.srv = s
if e.httpHandler {
if te.e.httpHandler {
internal.TestingUseHandlerImpl(s)
}
if te.healthServer != nil {
@ -465,7 +464,7 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
testpb.RegisterTestServiceServer(s, te.testServer)
}
addr := la
switch e.network {
switch te.e.network {
case "unix":
default:
_, port, err := net.SplitHostPort(lis.Addr().String())
@ -566,7 +565,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
te.srv.Stop()
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
t.Fatalf("TestService/EmptyCall(%v, _) = _, %v, want _, error code: %s", ctx, err, codes.DeadlineExceeded)
}
awaitNewConnLogOutput()
}
@ -849,10 +848,10 @@ func testFailFast(t *testing.T, e env) {
}
// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) != codes.Unavailable {
t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %d", err, codes.Unavailable)
t.Fatalf("TestService/EmptyCall(_, _, _) = _, %v, want _, error code: %s", err, codes.Unavailable)
}
if _, err := tc.StreamingInputCall(context.Background()); grpc.Code(err) != codes.Unavailable {
t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %d", err, codes.Unavailable)
t.Fatalf("TestService/StreamingInputCall(_) = _, %v, want _, error code: %s", err, codes.Unavailable)
}
awaitNewConnLogOutput()
@ -1112,7 +1111,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
Payload: payload,
}
if _, err := tc.UnaryCall(context.Background(), req); err == nil || grpc.Code(err) != codes.Internal {
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %d", err, codes.Internal)
t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, error code: %s", err, codes.Internal)
}
stream, err := tc.FullDuplexCall(te.ctx)
@ -1139,7 +1138,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, sreq, err)
}
if _, err := stream.Recv(); err == nil || grpc.Code(err) != codes.Internal {
t.Fatalf("%v.Recv() = _, %v, want _, error code: %d", stream, err, codes.Internal)
t.Fatalf("%v.Recv() = _, %v, want _, error code: %s", stream, err, codes.Internal)
}
}
@ -1331,7 +1330,7 @@ func testRPCTimeout(t *testing.T, e env) {
for i := -1; i <= 10; i++ {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %d", err, codes.DeadlineExceeded)
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
}
}
}
@ -1368,7 +1367,7 @@ func testCancel(t *testing.T, e env) {
ctx, cancel := context.WithCancel(context.Background())
time.AfterFunc(1*time.Millisecond, cancel)
if r, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Canceled {
t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %d", r, err, codes.Canceled)
t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %s", r, err, codes.Canceled)
}
awaitNewConnLogOutput()
}
@ -2305,7 +2304,7 @@ func (c clientAlwaysFailCred) Info() credentials.ProtocolInfo {
func TestDialWithBlockErrorOnBadCertificates(t *testing.T) {
te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: true})
te.startServer(&testServer{security: "clientAlwaysFailCred"})
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
var (
@ -2321,7 +2320,7 @@ func TestDialWithBlockErrorOnBadCertificates(t *testing.T) {
func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: true})
te.startServer(&testServer{security: "clientAlwaysFailCred"})
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
cc := te.clientConn()
@ -2333,7 +2332,7 @@ func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
func TestFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: false})
te.startServer(&testServer{security: "clientAlwaysFailCred"})
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
cc := te.clientConn()
@ -2345,7 +2344,7 @@ func TestFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
func TestNonFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) {
te := newTest(t, env{name: "bad-cred", network: "tcp", security: "clientAlwaysFailCred", balancer: false})
te.startServer(&testServer{security: "clientAlwaysFailCred"})
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
cc := te.clientConn()
@ -2376,7 +2375,7 @@ func (c *clientTimeoutCreds) Info() credentials.ProtocolInfo {
func TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: false})
te.userAgent = testAppUA
te.startServer(&testServer{security: "clientTimeoutCreds"})
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()
cc := te.clientConn()

View File

@ -114,14 +114,42 @@ func dial(fn func(context.Context, string) (net.Conn, error), ctx context.Contex
return dialContext(ctx, "tcp", addr)
}
func isTemporary(err error) bool {
switch err {
case io.EOF:
// Connection closures may be resolved upon retry, and are thus
// treated as temporary.
return true
case context.DeadlineExceeded:
// In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
// special case is not needed. Until then, we need to keep this
// clause.
return true
}
switch err := err.(type) {
case interface {
Temporary() bool
}:
return err.Temporary()
case interface {
Timeout() bool
}:
// Timeouts may be resolved upon retry, and are thus treated as
// temporary.
return err.Timeout()
}
return false
}
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ ClientTransport, err error) {
scheme := "http"
conn, connErr := dial(opts.Dialer, ctx, addr)
if connErr != nil {
return nil, ConnectionErrorf(true, connErr, "transport: %v", connErr)
conn, err := dial(opts.Dialer, ctx, addr)
if err != nil {
return nil, ConnectionErrorf(true, err, "transport: %v", err)
}
// Any further errors will close the underlying connection
defer func(conn net.Conn) {
@ -132,17 +160,13 @@ func newHTTP2Client(ctx context.Context, addr string, opts ConnectOptions) (_ Cl
var authInfo credentials.AuthInfo
if creds := opts.TransportCredentials; creds != nil {
scheme = "https"
conn, authInfo, connErr = creds.ClientHandshake(ctx, addr, conn)
conn, authInfo, err = creds.ClientHandshake(ctx, addr, conn)
if err != nil {
// Credentials handshake errors are typically considered permanent
// to avoid retrying on e.g. bad certificates.
temp := isTemporary(err)
return nil, ConnectionErrorf(temp, err, "transport: %v", err)
}
if connErr != nil {
// Credentials handshake error is not a temporary error (unless the error
// was the connection closing or deadline exceeded).
var temp bool
switch connErr {
case io.EOF, context.DeadlineExceeded:
temp = true
}
return nil, ConnectionErrorf(temp, connErr, "transport: %v", connErr)
}
ua := primaryUA
if opts.UserAgent != "" {