grpc: record the description of the status returned by server RPC
handlers in request traces, and mark the trace as an error if the status is not OK. Install the trace into the Context passed to server handlers using trace.NewContext, so that code in the server handlers can annotate the trace using trace.FromContext.
This commit is contained in:
28
server.go
28
server.go
@ -282,12 +282,14 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
|
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
|
||||||
|
ctx := stream.Context()
|
||||||
var traceInfo traceInfo
|
var traceInfo traceInfo
|
||||||
if EnableTracing {
|
if EnableTracing {
|
||||||
traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
|
traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
|
||||||
defer traceInfo.tr.Finish()
|
defer traceInfo.tr.Finish()
|
||||||
traceInfo.firstLine.client = false
|
traceInfo.firstLine.client = false
|
||||||
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
|
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
|
||||||
|
ctx = trace.NewContext(ctx, traceInfo.tr)
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||||
@ -322,7 +324,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
case compressionNone:
|
case compressionNone:
|
||||||
statusCode := codes.OK
|
statusCode := codes.OK
|
||||||
statusDesc := ""
|
statusDesc := ""
|
||||||
reply, appErr := md.Handler(srv.server, stream.Context(), s.opts.codec, req)
|
reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req)
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
if err, ok := appErr.(rpcError); ok {
|
if err, ok := appErr.(rpcError); ok {
|
||||||
statusCode = err.code
|
statusCode = err.code
|
||||||
@ -331,12 +333,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
statusCode = convertCode(appErr)
|
statusCode = convertCode(appErr)
|
||||||
statusDesc = appErr.Error()
|
statusDesc = appErr.Error()
|
||||||
}
|
}
|
||||||
|
if traceInfo.tr != nil && statusCode != codes.OK {
|
||||||
|
traceInfo.tr.LazyLog(stringer(statusDesc), true)
|
||||||
|
traceInfo.tr.SetError()
|
||||||
|
}
|
||||||
|
|
||||||
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
|
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
|
||||||
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if traceInfo.tr != nil {
|
||||||
|
traceInfo.tr.LazyLog(stringer("OK"), true)
|
||||||
|
}
|
||||||
opts := &transport.Options{
|
opts := &transport.Options{
|
||||||
Last: true,
|
Last: true,
|
||||||
Delay: false,
|
Delay: false,
|
||||||
@ -371,11 +381,13 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||||||
p: &parser{s: stream},
|
p: &parser{s: stream},
|
||||||
codec: s.opts.codec,
|
codec: s.opts.codec,
|
||||||
tracing: EnableTracing,
|
tracing: EnableTracing,
|
||||||
|
ctx: stream.Context(),
|
||||||
}
|
}
|
||||||
if ss.tracing {
|
if ss.tracing {
|
||||||
ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
|
ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
|
||||||
ss.traceInfo.firstLine.client = false
|
ss.traceInfo.firstLine.client = false
|
||||||
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
|
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
|
||||||
|
ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)
|
||||||
defer func() {
|
defer func() {
|
||||||
ss.mu.Lock()
|
ss.mu.Lock()
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
@ -396,10 +408,24 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||||||
ss.statusDesc = appErr.Error()
|
ss.statusDesc = appErr.Error()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ss.tracing {
|
||||||
|
ss.mu.Lock()
|
||||||
|
if ss.statusCode != codes.OK {
|
||||||
|
ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true)
|
||||||
|
ss.traceInfo.tr.SetError()
|
||||||
|
} else {
|
||||||
|
ss.traceInfo.tr.LazyLog(stringer("OK"), true)
|
||||||
|
}
|
||||||
|
ss.mu.Unlock()
|
||||||
|
}
|
||||||
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type stringer string
|
||||||
|
|
||||||
|
func (s stringer) String() string { return string(s) }
|
||||||
|
|
||||||
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
|
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
|
||||||
sm := stream.Method()
|
sm := stream.Method()
|
||||||
if sm != "" && sm[0] == '/' {
|
if sm != "" && sm[0] == '/' {
|
||||||
|
@ -113,6 +113,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
|
cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
|
||||||
}
|
}
|
||||||
cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false)
|
cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false)
|
||||||
|
ctx = trace.NewContext(ctx, cs.traceInfo.tr)
|
||||||
}
|
}
|
||||||
t, err := cc.wait(ctx)
|
t, err := cc.wait(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -284,6 +285,7 @@ type serverStream struct {
|
|||||||
statusDesc string
|
statusDesc string
|
||||||
|
|
||||||
tracing bool // set to EnableTracing when the serverStream is created.
|
tracing bool // set to EnableTracing when the serverStream is created.
|
||||||
|
ctx context.Context // provides trace.FromContext when tracing
|
||||||
|
|
||||||
mu sync.Mutex // protects traceInfo
|
mu sync.Mutex // protects traceInfo
|
||||||
// traceInfo.tr is set when the serverStream is created (if EnableTracing is true),
|
// traceInfo.tr is set when the serverStream is created (if EnableTracing is true),
|
||||||
@ -292,7 +294,7 @@ type serverStream struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ss *serverStream) Context() context.Context {
|
func (ss *serverStream) Context() context.Context {
|
||||||
return ss.s.Context()
|
return ss.ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
||||||
@ -317,7 +319,6 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
|
|||||||
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||||
ss.traceInfo.tr.SetError()
|
ss.traceInfo.tr.SetError()
|
||||||
}
|
}
|
||||||
|
|
||||||
ss.mu.Unlock()
|
ss.mu.Unlock()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
Reference in New Issue
Block a user