From 58dcee752648ab7c1d71745a68b55eed1b08417c Mon Sep 17 00:00:00 2001 From: yangzhouhan Date: Wed, 17 Jun 2015 19:30:57 -0700 Subject: [PATCH] add tracing for streaming rpc --- call.go | 3 --- stream.go | 49 +++++++++++++++++++++++++++++++++++++++---------- trace.go | 4 ++++ 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/call.go b/call.go index 3c0f26a5..765036bf 100644 --- a/call.go +++ b/call.go @@ -102,9 +102,6 @@ type callInfo struct { traceInfo traceInfo // in trace.go } -// EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package. -// This should only be set before any RPCs are sent or received by this program. -var EnableTracing = true // Invoke is called by the generated code. It sends the RPC request on the // wire and returns after response is received. diff --git a/stream.go b/stream.go index 43fdcbec..3f30f148 100644 --- a/stream.go +++ b/stream.go @@ -36,8 +36,10 @@ package grpc import ( "errors" "io" + "time" "golang.org/x/net/context" + "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/transport" @@ -94,6 +96,7 @@ type ClientStream interface { // by generated code. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { // TODO(zhaoq): CallOption is omitted. Add support when it is needed. + var trInfo traceInfo callHdr := &transport.CallHdr{ Host: cc.authority, Method: method, @@ -102,26 +105,36 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if err != nil { return nil, toRPCErr(err) } + if EnableTracing { + trInfo.tr = trace.New("Sent."+methodFamily(desc.StreamName), desc.StreamName) + trInfo.firstLine.client = true + if deadline, ok := ctx.Deadline(); ok { + trInfo.firstLine.deadline = deadline.Sub(time.Now()) + } + trInfo.tr.LazyLog(&trInfo.firstLine, false) + } s, err := t.NewStream(ctx, callHdr) if err != nil { return nil, toRPCErr(err) } return &clientStream{ - t: t, - s: s, - p: &parser{s: s}, - desc: desc, - codec: cc.dopts.codec, + t: t, + s: s, + p: &parser{s: s}, + desc: desc, + codec: cc.dopts.codec, + traceInfo: trInfo, }, nil } // clientStream implements a client side Stream. type clientStream struct { - t transport.ClientTransport - s *transport.Stream - p *parser - desc *StreamDesc - codec Codec + t transport.ClientTransport + s *transport.Stream + p *parser + desc *StreamDesc + codec Codec + traceInfo traceInfo } func (cs *clientStream) Context() context.Context { @@ -152,6 +165,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } err = toRPCErr(err) }() + out, err := encode(cs.codec, m, compressionNone) if err != nil { return transport.StreamErrorf(codes.Internal, "grpc: %v", err) @@ -172,23 +186,38 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { return toRPCErr(errors.New("grpc: client streaming protocol violation: get , want ")) } if err == io.EOF { + if EnableTracing { + defer cs.traceInfo.tr.Finish() + } if cs.s.StatusCode() == codes.OK { return nil } return Errorf(cs.s.StatusCode(), cs.s.StatusDesc()) } + if EnableTracing { + cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + cs.traceInfo.tr.SetError() + } return toRPCErr(err) } if _, ok := err.(transport.ConnectionError); !ok { cs.t.CloseStream(cs.s, err) } if err == io.EOF { + if EnableTracing { + cs.traceInfo.tr.Finish() + } + if cs.s.StatusCode() == codes.OK { // Returns io.EOF to indicate the end of the stream. return } return Errorf(cs.s.StatusCode(), cs.s.StatusDesc()) } + if EnableTracing { + cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + cs.traceInfo.tr.SetError() + } return toRPCErr(err) } diff --git a/trace.go b/trace.go index 31c3a6a7..f8df478d 100644 --- a/trace.go +++ b/trace.go @@ -44,6 +44,10 @@ import ( "golang.org/x/net/trace" ) +// EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package. +// This should only be set before any RPCs are sent or received by this program. +var EnableTracing = true + // methodFamily returns the trace family for the given method. // It turns "/pkg.Service/GetFoo" into "pkg.Service". func methodFamily(m string) string {