fix trace setError

This commit is contained in:
yangzhouhan
2015-07-28 15:27:46 -07:00
parent 82323098b5
commit 3616d6be54
2 changed files with 51 additions and 28 deletions

View File

@ -248,18 +248,15 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
return t.Write(stream, p, opts) return t.Write(stream, p, opts)
} }
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) { func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
var ( var traceInfo traceInfo
traceInfo traceInfo
err error
)
if EnableTracing { if EnableTracing {
traceInfo.tr = trace.New("Recv."+methodFamily(stream.Method()), stream.Method()) traceInfo.tr = trace.New("Recv."+methodFamily(stream.Method()), stream.Method())
defer traceInfo.tr.Finish() defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false traceInfo.firstLine.client = false
traceInfo.tr.LazyLog(&traceInfo.firstLine, false) traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
defer func() { defer func() {
if err != nil { if err != nil || err != io.EOF {
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
traceInfo.tr.SetError() traceInfo.tr.SetError()
} }
@ -270,7 +267,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
pf, req, err := p.recvMsg() pf, req, err := p.recvMsg()
if err == io.EOF { if err == io.EOF {
// The entire stream is done (for unary RPC only). // The entire stream is done (for unary RPC only).
return return err
} }
if err != nil { if err != nil {
switch err := err.(type) { switch err := err.(type) {
@ -283,7 +280,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
default: default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err)) panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
} }
return return err
} }
if traceInfo.tr != nil { if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true) traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true)
@ -303,8 +300,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
return err
} }
return return nil
} }
opts := &transport.Options{ opts := &transport.Options{
Last: true, Last: true,
@ -312,17 +310,19 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
if err := s.sendResponse(t, stream, reply, compressionNone, opts); err != nil { if err := s.sendResponse(t, stream, reply, compressionNone, opts); err != nil {
if _, ok := err.(transport.ConnectionError); ok { if _, ok := err.(transport.ConnectionError); ok {
return return err
} }
if e, ok := err.(transport.StreamError); ok { if e, ok := err.(transport.StreamError); ok {
statusCode = e.Code statusCode = e.Code
statusDesc = e.Desc statusDesc = e.Desc
return err
} else { } else {
statusCode = codes.Unknown statusCode = codes.Unknown
statusDesc = err.Error() statusDesc = err.Error()
return err
} }
} }
t.WriteStatus(stream, statusCode, statusDesc) return t.WriteStatus(stream, statusCode, statusDesc)
if traceInfo.tr != nil { if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) traceInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
} }
@ -332,7 +332,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
} }
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) { func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
ss := &serverStream{ ss := &serverStream{
t: t, t: t,
s: stream, s: stream,
@ -344,23 +344,34 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.traceInfo.tr = trace.New("Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.tr = trace.New("Recv."+methodFamily(stream.Method()), stream.Method())
ss.traceInfo.firstLine.client = false ss.traceInfo.firstLine.client = false
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
defer func() {
ss.mu.Lock()
ss.traceInfo.tr.Finish()
ss.traceInfo.tr = nil
ss.mu.Unlock()
}()
defer func() {
if err != nil {
ss.mu.Lock()
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.traceInfo.tr.SetError()
ss.mu.Unlock()
}
}()
} }
if appErr := sd.Handler(srv.server, ss); appErr != nil { if appErr := sd.Handler(srv.server, ss); appErr != nil {
if err, ok := appErr.(rpcError); ok { if err, ok := appErr.(rpcError); ok {
ss.statusCode = err.code ss.statusCode = err.code
ss.statusDesc = err.desc ss.statusDesc = err.desc
return err
} else { } else {
ss.statusCode = convertCode(appErr) ss.statusCode = convertCode(appErr)
ss.statusDesc = appErr.Error() ss.statusDesc = appErr.Error()
return nil
} }
} }
t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
if ss.tracing {
ss.mu.Lock()
ss.traceInfo.tr.Finish()
ss.traceInfo.tr = nil
ss.mu.Unlock()
}
} }
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {

View File

@ -307,14 +307,21 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
return return
} }
func (ss *serverStream) SendMsg(m interface{}) error { func (ss *serverStream) SendMsg(m interface{}) (err error) {
if ss.tracing { defer func() {
ss.mu.Lock() if ss.tracing {
if ss.traceInfo.tr != nil { ss.mu.Lock()
ss.traceInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) if ss.traceInfo.tr != nil {
if err == nil {
ss.traceInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
} else {
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.traceInfo.tr.SetError()
}
}
ss.mu.Unlock()
} }
ss.mu.Unlock() }()
}
out, err := encode(ss.codec, m, compressionNone) out, err := encode(ss.codec, m, compressionNone)
if err != nil { if err != nil {
err = transport.StreamErrorf(codes.Internal, "grpc: %v", err) err = transport.StreamErrorf(codes.Internal, "grpc: %v", err)
@ -323,12 +330,17 @@ func (ss *serverStream) SendMsg(m interface{}) error {
return ss.t.Write(ss.s, out, &transport.Options{Last: false}) return ss.t.Write(ss.s, out, &transport.Options{Last: false})
} }
func (ss *serverStream) RecvMsg(m interface{}) error { func (ss *serverStream) RecvMsg(m interface{}) (err error) {
defer func() { defer func() {
if ss.tracing { if ss.tracing {
ss.mu.Lock() ss.mu.Lock()
if ss.traceInfo.tr != nil { if ss.traceInfo.tr != nil {
ss.traceInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) if err == nil {
ss.traceInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
} else {
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.traceInfo.tr.SetError()
}
} }
ss.mu.Unlock() ss.mu.Unlock()
} }