Tune Invoke behavior for the new Balancer.

This commit is contained in:
iamqizhao
2016-05-18 11:18:10 -07:00
parent e9e748b978
commit 162d8d2d33
3 changed files with 43 additions and 49 deletions

51
call.go
View File

@ -132,20 +132,14 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
Last: true, Last: true,
Delay: false, Delay: false,
} }
var ( var put func()
lastErr error // record the error that happened
put func()
)
for { for {
var ( var (
err error err error
t transport.ClientTransport t transport.ClientTransport
stream *transport.Stream stream *transport.Stream
) )
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs. // TODO(zhaoq): Need a formal spec of retry strategy for non-failFast rpcs.
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
}
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
@ -155,11 +149,19 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
} }
t, put, err = cc.getTransport(ctx) t, put, err = cc.getTransport(ctx)
if err != nil { if err != nil {
if lastErr != nil { if err == ErrClientConnClosing {
// This was a retry; return the error from the last attempt. return toRPCErr(err)
return toRPCErr(lastErr)
} }
return toRPCErr(err) if _, ok := err.(transport.StreamError); ok {
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); ok {
if c.failFast {
return toRPCErr(err)
}
}
// All the remaining cases are treated as retryable.
continue
} }
if c.traceInfo.tr != nil { if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
@ -168,28 +170,31 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if err != nil { if err != nil {
put() put()
if _, ok := err.(transport.ConnectionError); ok { if _, ok := err.(transport.ConnectionError); ok {
lastErr = err if c.failFast {
return toRPCErr(err)
}
continue continue
} }
if lastErr != nil {
return toRPCErr(lastErr)
}
return toRPCErr(err) return toRPCErr(err)
} }
// Receive the response // Receive the response
lastErr = recvResponse(cc.dopts, t, &c, stream, reply) err = recvResponse(cc.dopts, t, &c, stream, reply)
if _, ok := lastErr.(transport.ConnectionError); ok { if err != nil {
put() put()
continue if _, ok := err.(transport.ConnectionError); ok {
if c.failFast {
return toRPCErr(err)
}
continue
}
t.CloseStream(stream, err)
return toRPCErr(err)
} }
if c.traceInfo.tr != nil { if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
} }
t.CloseStream(stream, lastErr) t.CloseStream(stream, nil)
put() put()
if lastErr != nil {
return toRPCErr(lastErr)
}
return Errorf(stream.StatusCode(), "%s", stream.StatusDesc()) return Errorf(stream.StatusCode(), "%s", stream.StatusDesc())
} }
} }

View File

@ -61,14 +61,15 @@ var (
ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)") ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
// ErrClientConnClosing indicates that the operation is illegal because // ErrClientConnClosing indicates that the operation is illegal because
// the ClientConn is closing. // the ClientConn is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing") ErrClientConnClosing = Errorf(codes.FailedPrecondition, "grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the connection could not be // ErrClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout. // established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
// ErrNetworkIP indicates that the connection is down due to some network I/O error.
ErrNetworkIO = errors.New("grpc: failed with network I/O error") // errNetworkIP indicates that the connection is down due to some network I/O error.
// ErrConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errNetworkIO = errors.New("grpc: failed with network I/O error")
ErrConnDrain = errors.New("grpc: the connection is drained") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
errConnClosing = errors.New("grpc: the addrConn is closing") errConnClosing = errors.New("grpc: the addrConn is closing")
// minimum time to give a connection to complete // minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second minConnectTimeout = 20 * time.Second
@ -301,7 +302,7 @@ func (s ConnectivityState) String() string {
} }
} }
// ClientConn represents a client connection to an RPC service. // ClientConn represents a client connection to an RPC server.
type ClientConn struct { type ClientConn struct {
target string target string
watcher naming.Watcher watcher naming.Watcher
@ -348,7 +349,7 @@ func (cc *ClientConn) watchAddrUpdates() error {
continue continue
} }
cc.mu.RUnlock() cc.mu.RUnlock()
ac.tearDown(ErrConnDrain) ac.tearDown(errConnDrain)
default: default:
grpclog.Println("Unknown update.Op ", update.Op) grpclog.Println("Unknown update.Op ", update.Op)
} }
@ -528,15 +529,6 @@ func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState Connecti
} }
func (ac *addrConn) resetTransport(closeTransport bool) error { func (ac *addrConn) resetTransport(closeTransport bool) error {
/*
ac.cc.mu.Lock()
if ac.cc.conns == nil {
ac.cc.mu.Unlock()
return ErrClientConnClosing
}
ac.cc.conns[ac.addr] = ac
ac.cc.mu.Unlock()
*/
var retries int var retries int
start := time.Now() start := time.Now()
for { for {
@ -548,7 +540,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
return errConnClosing return errConnClosing
} }
if ac.down != nil { if ac.down != nil {
ac.down(ErrNetworkIO) ac.down(errNetworkIO)
ac.down = nil ac.down = nil
} }
ac.state = Connecting ac.state = Connecting
@ -732,7 +724,7 @@ func (ac *addrConn) tearDown(err error) {
ac.ready = nil ac.ready = nil
} }
if ac.transport != nil { if ac.transport != nil {
if err == ErrConnDrain { if err == errConnDrain {
ac.transport.GracefulClose() ac.transport.GracefulClose()
} else { } else {
ac.transport.Close() ac.transport.Close()

View File

@ -159,7 +159,6 @@ func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*
return nil, fmt.Errorf("Unknown server name %q", serverName) return nil, fmt.Errorf("Unknown server name %q", serverName)
} }
} }
// Simulate some service delay. // Simulate some service delay.
time.Sleep(time.Second) time.Sleep(time.Second)
@ -1020,14 +1019,13 @@ func testRPCTimeout(t *testing.T, e env) {
} }
for i := -1; i <= 10; i++ { for i := -1; i <= 10; i++ {
ctx, _ := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond) ctx, _ := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
reply, err := tc.UnaryCall(ctx, req) if _, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.DeadlineExceeded {
if grpc.Code(err) != codes.DeadlineExceeded { t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %d", err, codes.DeadlineExceeded)
t.Fatalf(`TestService/UnaryCallv(_, _) = %v, %v; want <nil>, error code: %d`, reply, err, codes.DeadlineExceeded)
} }
} }
} }
func TestCancelX(t *testing.T) { func TestCancel(t *testing.T) {
defer leakCheck(t)() defer leakCheck(t)()
for _, e := range listTestEnv() { for _, e := range listTestEnv() {
testCancel(t, e) testCancel(t, e)
@ -1058,9 +1056,8 @@ func testCancel(t *testing.T, e env) {
} }
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
time.AfterFunc(1*time.Millisecond, cancel) time.AfterFunc(1*time.Millisecond, cancel)
reply, err := tc.UnaryCall(ctx, req) if r, err := tc.UnaryCall(ctx, req); grpc.Code(err) != codes.Canceled {
if grpc.Code(err) != codes.Canceled { t.Fatalf("TestService/UnaryCall(_, _) = %v, %v; want _, error code: %d", r, err, codes.Canceled)
t.Fatalf(`TestService/UnaryCall(_, _) = %v, %v; want <nil>, error code: %d`, reply, err, codes.Canceled)
} }
awaitNewConnLogOutput() awaitNewConnLogOutput()
} }