Merge branch 'master' into service_config_pr

This commit is contained in:
lyuxuan
2017-05-07 16:49:32 -07:00
committed by GitHub
35 changed files with 1658 additions and 618 deletions

64
call.go
View File

@ -93,11 +93,7 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran
}
// sendRequest writes out various information of an RPC such as Context and Message.
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, t transport.ClientTransport, args interface{}, opts *transport.Options) (_ *transport.Stream, err error) {
stream, err := t.NewStream(ctx, callHdr)
if err != nil {
return nil, err
}
func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) {
defer func() {
if err != nil {
// If err is connection error, t will be closed, no need to close stream here.
@ -120,7 +116,7 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
}
outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err)
return Errorf(codes.Internal, "grpc: %v", err)
}
if len(outBuf) > *c.maxSendMessageSize {
return nil, Errorf(codes.ResourceExhausted, "Sent message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
@ -134,10 +130,10 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
// does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following
// recvResponse to get the final status.
if err != nil && err != io.EOF {
return nil, err
return err
}
// Sent successfully.
return stream, nil
return nil
}
// Invoke sends the RPC request on the wire and returns after response is received.
@ -194,6 +190,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
ctx = newContextWithRPCInfo(ctx)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
@ -203,17 +200,15 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
FailFast: c.failFast,
}
sh.HandleRPC(ctx, begin)
}
defer func() {
if sh != nil {
defer func() {
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
}
sh.HandleRPC(ctx, end)
}
}()
}()
}
topts := &transport.Options{
Last: true,
Delay: false,
@ -257,19 +252,35 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
stream, err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, t, args, topts)
stream, err = t.NewStream(ctx, callHdr)
if err != nil {
if put != nil {
if _, ok := err.(transport.ConnectionError); ok {
// If error is connection error, transport was sending data on wire,
// and we are not sure if anything has been sent on wire.
// If error is not connection error, we are sure nothing has been sent.
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
put()
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
}
return toRPCErr(err)
}
err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
if err != nil {
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
put()
put = nil
}
// Retry a non-failfast RPC when
// i) there is a connection error; or
// ii) the server started to drain before this RPC was initiated.
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
}
return toRPCErr(err)
@ -277,13 +288,13 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
if err != nil {
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
put()
put = nil
}
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
if c.failFast {
return toRPCErr(err)
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
}
return toRPCErr(err)
@ -293,8 +304,11 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
t.CloseStream(stream, nil)
if put != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
put()
put = nil
}
return stream.Status().Err()
}