From bd20726bd87c29af312486564f018f3fccc585a9 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Thu, 23 Jul 2015 18:14:52 -0700 Subject: [PATCH 1/4] add client streaming response trace --- call.go | 5 ++--- stream.go | 9 ++++++++- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/call.go b/call.go index 3b974721..f0dec479 100644 --- a/call.go +++ b/call.go @@ -133,7 +133,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } - callHdr := &transport.CallHdr{ Host: cc.authority, Method: method, @@ -165,7 +164,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } if EnableTracing { - c.traceInfo.tr.LazyLog(payload{args}, true) + c.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{args}}}, true) } stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) if err != nil { @@ -184,7 +183,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli continue } if EnableTracing { - c.traceInfo.tr.LazyLog(payload{reply}, true) + c.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{reply}}}, true) } t.CloseStream(stream, lastErr) if lastErr != nil { diff --git a/stream.go b/stream.go index c902034d..348fe9a5 100644 --- a/stream.go +++ b/stream.go @@ -166,7 +166,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(payload{m}, true) + cs.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{m}}}, true) } cs.mu.Unlock() } @@ -187,6 +187,13 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { + if cs.tracing { + cs.mu.Lock() + if cs.traceInfo.tr != nil { + cs.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{m}}}, true) + } + cs.mu.Unlock() + } err = recv(cs.p, cs.codec, m) defer func() { // err != nil indicates the termination of the stream. From 6cfd2022afebccbb3129af08aa163cc40d4b41aa Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 24 Jul 2015 11:30:14 -0700 Subject: [PATCH 2/4] add a bool in payload struct --- call.go | 12 ++++++++++-- stream.go | 14 ++++++++++++-- trace.go | 9 +++++++-- 3 files changed, 29 insertions(+), 6 deletions(-) diff --git a/call.go b/call.go index f0dec479..a6dad304 100644 --- a/call.go +++ b/call.go @@ -164,7 +164,11 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } if EnableTracing { - c.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{args}}}, true) + p := &payload{ + sent: true, + msg: args, + } + c.traceInfo.tr.LazyLog(p, true) } stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) if err != nil { @@ -183,7 +187,11 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli continue } if EnableTracing { - c.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{reply}}}, true) + p := &payload{ + sent: false, + msg: reply, + } + c.traceInfo.tr.LazyLog(p, true) } t.CloseStream(stream, lastErr) if lastErr != nil { diff --git a/stream.go b/stream.go index 348fe9a5..256d0f01 100644 --- a/stream.go +++ b/stream.go @@ -166,7 +166,11 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(&fmtStringer{"sent: %v", []interface{}{payload{m}}}, true) + p := &payload{ + sent: true, + msg: m, + } + cs.traceInfo.tr.LazyLog(p, true) } cs.mu.Unlock() } @@ -190,7 +194,13 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - cs.traceInfo.tr.LazyLog(&fmtStringer{"received: %v", []interface{}{payload{m}}}, true) + if cs.traceInfo.tr != nil { + p := &payload{ + sent: false, + msg: m, + } + cs.traceInfo.tr.LazyLog(p, true) + } } cs.mu.Unlock() } diff --git a/trace.go b/trace.go index f8df478d..32c086cb 100644 --- a/trace.go +++ b/trace.go @@ -93,12 +93,17 @@ func (f *firstLine) String() string { // payload represents an RPC request or response payload. type payload struct { - m interface{} // e.g. a proto.Message + sent bool // whether this is a request or response + msg interface{} // e.g. a proto.Message // TODO(dsymonds): add stringifying info to codec, and limit how much we hold here? } func (p payload) String() string { - return fmt.Sprint(p.m) + if p.sent { + return fmt.Sprintf("sent: %v", p.msg) + } else { + return fmt.Sprintf("recv: %v", p.msg) + } } type fmtStringer struct { From be4cb2a81abe9fb107b7fd3c2b26bf85d10932f4 Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 24 Jul 2015 15:39:25 -0700 Subject: [PATCH 3/4] relocate response log --- stream.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/stream.go b/stream.go index 256d0f01..47e8af5a 100644 --- a/stream.go +++ b/stream.go @@ -191,19 +191,6 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } func (cs *clientStream) RecvMsg(m interface{}) (err error) { - if cs.tracing { - cs.mu.Lock() - if cs.traceInfo.tr != nil { - if cs.traceInfo.tr != nil { - p := &payload{ - sent: false, - msg: m, - } - cs.traceInfo.tr.LazyLog(p, true) - } - } - cs.mu.Unlock() - } err = recv(cs.p, cs.codec, m) defer func() { // err != nil indicates the termination of the stream. @@ -212,6 +199,19 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } }() if err == nil { + if cs.tracing { + cs.mu.Lock() + if cs.traceInfo.tr != nil { + if cs.traceInfo.tr != nil { + p := &payload{ + sent: false, + msg: m, + } + cs.traceInfo.tr.LazyLog(p, true) + } + } + cs.mu.Unlock() + } if !cs.desc.ClientStreams || cs.desc.ServerStreams { return } From e79ac3cb4bb5ef1adc69ef487e070dad7eab599e Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Fri, 24 Jul 2015 16:36:12 -0700 Subject: [PATCH 4/4] remove duplicate nil check separate var --- call.go | 12 ++---------- stream.go | 14 ++------------ trace.go | 2 +- 3 files changed, 5 insertions(+), 23 deletions(-) diff --git a/call.go b/call.go index a6dad304..72bd242f 100644 --- a/call.go +++ b/call.go @@ -164,11 +164,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } if EnableTracing { - p := &payload{ - sent: true, - msg: args, - } - c.traceInfo.tr.LazyLog(p, true) + c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) } stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) if err != nil { @@ -187,11 +183,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli continue } if EnableTracing { - p := &payload{ - sent: false, - msg: reply, - } - c.traceInfo.tr.LazyLog(p, true) + c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) } t.CloseStream(stream, lastErr) if lastErr != nil { diff --git a/stream.go b/stream.go index 47e8af5a..20c6b29d 100644 --- a/stream.go +++ b/stream.go @@ -166,11 +166,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - p := &payload{ - sent: true, - msg: m, - } - cs.traceInfo.tr.LazyLog(p, true) + cs.traceInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) } cs.mu.Unlock() } @@ -202,13 +198,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { if cs.tracing { cs.mu.Lock() if cs.traceInfo.tr != nil { - if cs.traceInfo.tr != nil { - p := &payload{ - sent: false, - msg: m, - } - cs.traceInfo.tr.LazyLog(p, true) - } + cs.traceInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) } cs.mu.Unlock() } diff --git a/trace.go b/trace.go index 32c086cb..24635740 100644 --- a/trace.go +++ b/trace.go @@ -93,7 +93,7 @@ func (f *firstLine) String() string { // payload represents an RPC request or response payload. type payload struct { - sent bool // whether this is a request or response + sent bool // whether this is an outgoing payload msg interface{} // e.g. a proto.Message // TODO(dsymonds): add stringifying info to codec, and limit how much we hold here? }