From b6c9c5a70f2466da74774f338f5397e6b9a362eb Mon Sep 17 00:00:00 2001 From: Sameer Ajmani <sameer@golang.org> Date: Wed, 23 Sep 2015 17:07:35 -0400 Subject: [PATCH 1/3] grpc: record the description of the status returned by server RPC handlers in request traces, and mark the trace as an error if the status is not OK. Install the trace into the Context passed to server handlers using trace.NewContext, so that code in the server handlers can annotate the trace using trace.FromContext. --- server.go | 28 +++++++++++++++++++++++++++- stream.go | 7 ++++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index d0a0e9a8..904d9156 100644 --- a/server.go +++ b/server.go @@ -282,12 +282,14 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { + ctx := stream.Context() var traceInfo traceInfo if EnableTracing { traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) defer traceInfo.tr.Finish() traceInfo.firstLine.client = false traceInfo.tr.LazyLog(&traceInfo.firstLine, false) + ctx = trace.NewContext(ctx, traceInfo.tr) defer func() { if err != nil && err != io.EOF { traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) @@ -322,7 +324,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. case compressionNone: statusCode := codes.OK statusDesc := "" - reply, appErr := md.Handler(srv.server, stream.Context(), s.opts.codec, req) + reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -331,12 +333,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusCode = convertCode(appErr) statusDesc = appErr.Error() } + if traceInfo.tr != nil && statusCode != codes.OK { + traceInfo.tr.LazyLog(stringer(statusDesc), true) + traceInfo.tr.SetError() + } + if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) return err } return nil } + if traceInfo.tr != nil { + traceInfo.tr.LazyLog(stringer("OK"), true) + } opts := &transport.Options{ Last: true, Delay: false, @@ -371,11 +381,13 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, + ctx: stream.Context(), } if ss.tracing { ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.firstLine.client = false ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) + ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) defer func() { ss.mu.Lock() if err != nil && err != io.EOF { @@ -396,10 +408,24 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.statusDesc = appErr.Error() } } + if ss.tracing { + ss.mu.Lock() + if ss.statusCode != codes.OK { + ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true) + ss.traceInfo.tr.SetError() + } else { + ss.traceInfo.tr.LazyLog(stringer("OK"), true) + } + ss.mu.Unlock() + } return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) } +type stringer string + +func (s stringer) String() string { return string(s) } + func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { sm := stream.Method() if sm != "" && sm[0] == '/' { diff --git a/stream.go b/stream.go index e14664cb..91d8115d 100644 --- a/stream.go +++ b/stream.go @@ -113,6 +113,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) } cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false) + ctx = trace.NewContext(ctx, cs.traceInfo.tr) } t, err := cc.wait(ctx) if err != nil { @@ -283,7 +284,8 @@ type serverStream struct { statusCode codes.Code statusDesc string - tracing bool // set to EnableTracing when the serverStream is created. + tracing bool // set to EnableTracing when the serverStream is created. + ctx context.Context // provides trace.FromContext when tracing mu sync.Mutex // protects traceInfo // traceInfo.tr is set when the serverStream is created (if EnableTracing is true), @@ -292,7 +294,7 @@ type serverStream struct { } func (ss *serverStream) Context() context.Context { - return ss.s.Context() + return ss.ctx } func (ss *serverStream) SendHeader(md metadata.MD) error { @@ -317,7 +319,6 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) ss.traceInfo.tr.SetError() } - ss.mu.Unlock() } }() From ee98c48bb4306dd103e2fb61ad11854a3687f522 Mon Sep 17 00:00:00 2001 From: Sameer Ajmani <sameer@golang.org> Date: Wed, 23 Sep 2015 22:17:37 -0400 Subject: [PATCH 2/3] Incorporate dsymonds' comments. Fix another bug: cancel the Context provided to an RPC server handler as soon as that handler returns, so that goroutines started by that handler can detect that the handler is done and exit. Without this fix, goroutines started by a handler will keep running, unless the handler itself arranges to cancel the context. --- server.go | 15 +++++++-------- stream.go | 4 ++-- trace.go | 4 ++++ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/server.go b/server.go index 904d9156..cd26c061 100644 --- a/server.go +++ b/server.go @@ -282,7 +282,8 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { - ctx := stream.Context() + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() var traceInfo traceInfo if EnableTracing { traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) @@ -345,7 +346,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. return nil } if traceInfo.tr != nil { - traceInfo.tr.LazyLog(stringer("OK"), true) + traceInfo.tr.LazyLog(stringer("OK"), false) } opts := &transport.Options{ Last: true, @@ -375,13 +376,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() ss := &serverStream{ t: t, s: stream, + ctx: ctx, p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, - ctx: stream.Context(), } if ss.tracing { ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) @@ -414,7 +417,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true) ss.traceInfo.tr.SetError() } else { - ss.traceInfo.tr.LazyLog(stringer("OK"), true) + ss.traceInfo.tr.LazyLog(stringer("OK"), false) } ss.mu.Unlock() } @@ -422,10 +425,6 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp } -type stringer string - -func (s stringer) String() string { return string(s) } - func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { sm := stream.Method() if sm != "" && sm[0] == '/' { diff --git a/stream.go b/stream.go index 91d8115d..edd9bc84 100644 --- a/stream.go +++ b/stream.go @@ -279,13 +279,13 @@ type ServerStream interface { type serverStream struct { t transport.ServerTransport s *transport.Stream + ctx context.Context // provides trace.FromContext when tracing p *parser codec Codec statusCode codes.Code statusDesc string - tracing bool // set to EnableTracing when the serverStream is created. - ctx context.Context // provides trace.FromContext when tracing + tracing bool // set to EnableTracing when the serverStream is created. mu sync.Mutex // protects traceInfo // traceInfo.tr is set when the serverStream is created (if EnableTracing is true), diff --git a/trace.go b/trace.go index 24635740..cde04fbf 100644 --- a/trace.go +++ b/trace.go @@ -114,3 +114,7 @@ type fmtStringer struct { func (f *fmtStringer) String() string { return fmt.Sprintf(f.format, f.a...) } + +type stringer string + +func (s stringer) String() string { return string(s) } From 86db82df28b6de13aa7657d709d8524a4ce17dfb Mon Sep 17 00:00:00 2001 From: Sameer Ajmani <sameer@golang.org> Date: Mon, 28 Sep 2015 10:22:26 -0400 Subject: [PATCH 3/3] add TODO to fix trace --- server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index cd26c061..2ebde5d0 100644 --- a/server.go +++ b/server.go @@ -319,7 +319,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. return err } if traceInfo.tr != nil { - traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true) + // TODO: set payload.msg to something that + // prints usefully with %s; req is a []byte. + traceInfo.tr.LazyLog(&payload{sent: false}, true) } switch pf { case compressionNone: