diff --git a/stream.go b/stream.go index 1cbb5f32..c1b07e89 100644 --- a/stream.go +++ b/stream.go @@ -117,35 +117,25 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } - cs := &clientStream{ - opts: opts, - c: c, - desc: desc, - codec: cc.dopts.codec, - cp: cc.dopts.cp, - dc: cc.dopts.dc, - tracing: EnableTracing, - } - if cc.dopts.cp != nil { - callHdr.SendCompress = cc.dopts.cp.Type() - cs.cbuf = new(bytes.Buffer) - } - if cs.tracing { - cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) - cs.trInfo.firstLine.client = true + var trInfo traceInfo + if EnableTracing { + trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) + trInfo.firstLine.client = true if deadline, ok := ctx.Deadline(); ok { - cs.trInfo.firstLine.deadline = deadline.Sub(time.Now()) + trInfo.firstLine.deadline = deadline.Sub(time.Now()) } - cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false) - ctx = trace.NewContext(ctx, cs.trInfo.tr) + trInfo.tr.LazyLog(&trInfo.firstLine, false) + ctx = trace.NewContext(ctx, trInfo.tr) + defer func() { + if err != nil { + // Need to call tr.finish() if error is returned. + // Because tr will not be returned to caller. + trInfo.tr.LazyPrintf("RPC: [%v]", err) + trInfo.tr.SetError() + trInfo.tr.Finish() + } + }() } - defer func() { - if err != nil { - // Need to call cs.finish() if error is returned. - // Because cs will not be returned to caller. - cs.finish(err) - } - }() gopts := BalancerGetOptions{ BlockingWait: !c.failFast, } @@ -182,10 +172,25 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } break } - cs.put = put - cs.t = t - cs.s = s - cs.p = &parser{r: s} + cs := &clientStream{ + opts: opts, + c: c, + desc: desc, + codec: cc.dopts.codec, + cp: cc.dopts.cp, + dc: cc.dopts.dc, + + put: put, + t: t, + s: s, + p: &parser{r: s}, + + tracing: EnableTracing, + trInfo: trInfo, + } + if cc.dopts.cp != nil { + cs.cbuf = new(bytes.Buffer) + } // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination // when there is no pending I/O operations on this stream. go func() {