diff --git a/stream.go b/stream.go index 31d88e3c..c902034d 100644 --- a/stream.go +++ b/stream.go @@ -36,6 +36,7 @@ package grpc import ( "errors" "io" + "sync" "time" "golang.org/x/net/context" @@ -101,10 +102,11 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth Method: method, } cs := &clientStream{ - desc: desc, - codec: cc.dopts.codec, + desc: desc, + codec: cc.dopts.codec, + tracing: EnableTracing, } - if EnableTracing { + if cs.tracing { cs.traceInfo.tr = trace.New("Sent."+methodFamily(method), method) cs.traceInfo.firstLine.client = true if deadline, ok := ctx.Deadline(); ok { @@ -128,11 +130,17 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // clientStream implements a client side Stream. type clientStream struct { - t transport.ClientTransport - s *transport.Stream - p *parser - desc *StreamDesc - codec Codec + t transport.ClientTransport + s *transport.Stream + p *parser + desc *StreamDesc + codec Codec + + tracing bool // set to EnableTracing when the clientStream is created. + + mu sync.Mutex // protects traceInfo + // traceInfo.tr is set when the clientStream is created (if EnableTracing is true), + // and is set to nil when the clientStream's finish method is called. traceInfo traceInfo } @@ -155,6 +163,13 @@ func (cs *clientStream) Trailer() metadata.MD { } func (cs *clientStream) SendMsg(m interface{}) (err error) { + if cs.tracing { + cs.mu.Lock() + if cs.traceInfo.tr != nil { + cs.traceInfo.tr.LazyLog(payload{m}, true) + } + cs.mu.Unlock() + } defer func() { if err == nil || err == io.EOF { return @@ -175,12 +190,8 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { err = recv(cs.p, cs.codec, m) defer func() { // err != nil indicates the termination of the stream. - if EnableTracing && err != nil { - if err != io.EOF { - cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) - cs.traceInfo.tr.SetError() - } - cs.traceInfo.tr.Finish() + if err != nil { + cs.finish(err) } }() if err == nil { @@ -226,6 +237,24 @@ func (cs *clientStream) CloseSend() (err error) { return } +func (cs *clientStream) finish(err error) { + if !cs.tracing { + return + } + cs.mu.Lock() + defer cs.mu.Unlock() + if cs.traceInfo.tr != nil { + if err == nil || err == io.EOF { + cs.traceInfo.tr.LazyPrintf("RPC: [OK]") + } else { + cs.traceInfo.tr.LazyPrintf("RPC: [%v]", err) + cs.traceInfo.tr.SetError() + } + cs.traceInfo.tr.Finish() + cs.traceInfo.tr = nil + } +} + // ServerStream defines the interface a server stream has to satisfy. type ServerStream interface { // SendHeader sends the header metadata. It should not be called