From 7c8ee356b11a3fba4a34c519a39e76f979e00dca Mon Sep 17 00:00:00 2001
From: David Symonds <dsymonds@golang.org>
Date: Thu, 25 Jun 2015 12:52:33 +1000
Subject: [PATCH] Include client streaming payloads in trace, and centralise
 trace termination logic.

---
 stream.go | 57 +++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 43 insertions(+), 14 deletions(-)

diff --git a/stream.go b/stream.go
index 31d88e3c..c902034d 100644
--- a/stream.go
+++ b/stream.go
@@ -36,6 +36,7 @@ package grpc
 import (
 	"errors"
 	"io"
+	"sync"
 	"time"
 
 	"golang.org/x/net/context"
@@ -101,10 +102,11 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
 		Method: method,
 	}
 	cs := &clientStream{
-		desc:  desc,
-		codec: cc.dopts.codec,
+		desc:    desc,
+		codec:   cc.dopts.codec,
+		tracing: EnableTracing,
 	}
-	if EnableTracing {
+	if cs.tracing {
 		cs.traceInfo.tr = trace.New("Sent."+methodFamily(method), method)
 		cs.traceInfo.firstLine.client = true
 		if deadline, ok := ctx.Deadline(); ok {
@@ -128,11 +130,17 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
 
 // clientStream implements a client side Stream.
 type clientStream struct {
-	t         transport.ClientTransport
-	s         *transport.Stream
-	p         *parser
-	desc      *StreamDesc
-	codec     Codec
+	t     transport.ClientTransport
+	s     *transport.Stream
+	p     *parser
+	desc  *StreamDesc
+	codec Codec
+
+	tracing bool // set to EnableTracing when the clientStream is created.
+
+	mu sync.Mutex // protects traceInfo
+	// traceInfo.tr is set when the clientStream is created (if EnableTracing is true),
+	// and is set to nil when the clientStream's finish method is called.
 	traceInfo traceInfo
 }
 
@@ -155,6 +163,13 @@ func (cs *clientStream) Trailer() metadata.MD {
 }
 
 func (cs *clientStream) SendMsg(m interface{}) (err error) {
+	if cs.tracing {
+		cs.mu.Lock()
+		if cs.traceInfo.tr != nil {
+			cs.traceInfo.tr.LazyLog(payload{m}, true)
+		}
+		cs.mu.Unlock()
+	}
 	defer func() {
 		if err == nil || err == io.EOF {
 			return
@@ -175,12 +190,8 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
 	err = recv(cs.p, cs.codec, m)
 	defer func() {
 		// err != nil indicates the termination of the stream.
-		if EnableTracing && err != nil {
-			if err != io.EOF {
-				cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-				cs.traceInfo.tr.SetError()
-			}
-			cs.traceInfo.tr.Finish()
+		if err != nil {
+			cs.finish(err)
 		}
 	}()
 	if err == nil {
@@ -226,6 +237,24 @@ func (cs *clientStream) CloseSend() (err error) {
 	return
 }
 
+func (cs *clientStream) finish(err error) {
+	if !cs.tracing {
+		return
+	}
+	cs.mu.Lock()
+	defer cs.mu.Unlock()
+	if cs.traceInfo.tr != nil {
+		if err == nil || err == io.EOF {
+			cs.traceInfo.tr.LazyPrintf("RPC: [OK]")
+		} else {
+			cs.traceInfo.tr.LazyPrintf("RPC: [%v]", err)
+			cs.traceInfo.tr.SetError()
+		}
+		cs.traceInfo.tr.Finish()
+		cs.traceInfo.tr = nil
+	}
+}
+
 // ServerStream defines the interface a server stream has to satisfy.
 type ServerStream interface {
 	// SendHeader sends the header metadata. It should not be called