refactor servier side trace again

This commit is contained in:
iamqizhao
2015-10-05 17:49:53 -07:00
parent 3e7b7e58f4
commit bc49d12737
7 changed files with 130 additions and 129 deletions

View File

@ -247,7 +247,7 @@ func (s *Server) Serve(lis net.Listener) error {
c.Close()
return nil
}
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo, EnableTracing)
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
if err != nil {
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
@ -259,8 +259,24 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Unlock()
go func() {
st.HandleStreams(func(stream *transport.Stream) {
s.handleStream(st, stream)
st.HandleStreams(func(stream *transport.Stream, wg *sync.WaitGroup) {
var trInfo *traceInfo
if EnableTracing {
trInfo = &traceInfo{
tr: trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()),
}
trInfo.firstLine.client = false
trInfo.firstLine.remoteAddr = st.RemoteAddr()
stream.TraceContext(trInfo.tr)
if dl, ok := stream.Context().Deadline(); ok {
trInfo.firstLine.deadline = dl.Sub(time.Now())
}
}
wg.Add(1)
go func() {
s.handleStream(st, stream, trInfo)
wg.Done()
}()
})
s.mu.Lock()
delete(s.conns, st)
@ -284,21 +300,15 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
return t.Write(stream, p, opts)
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
var traceInfo traceInfo
if EnableTracing {
traceInfo.tr = stream.Trace()
defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false
traceInfo.firstLine.remoteAddr = t.RemoteAddr()
if dl, ok := stream.Context().Deadline(); ok {
traceInfo.firstLine.deadline = dl.Sub(time.Now())
}
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
if err != nil && err != io.EOF {
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
traceInfo.tr.SetError()
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
}()
}
@ -330,8 +340,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
@ -344,9 +354,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
}
if traceInfo.tr != nil && statusCode != codes.OK {
traceInfo.tr.LazyLog(stringer(statusDesc), true)
traceInfo.tr.SetError()
if trInfo != nil && statusCode != codes.OK {
trInfo.tr.LazyLog(stringer(statusDesc), true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
@ -355,8 +365,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return nil
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(stringer("OK"), false)
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
}
opts := &transport.Options{
Last: true,
@ -375,8 +385,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return err
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
return t.WriteStatus(stream, statusCode, statusDesc)
default:
@ -385,30 +395,24 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
ss := &serverStream{
t: t,
s: stream,
p: &parser{s: stream},
codec: s.opts.codec,
tracing: EnableTracing,
trInfo: trInfo,
}
if ss.tracing {
ss.traceInfo.tr = stream.Trace()
ss.traceInfo.firstLine.client = false
ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr()
if dl, ok := stream.Context().Deadline(); ok {
ss.traceInfo.firstLine.deadline = dl.Sub(time.Now())
}
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
ss.mu.Lock()
if err != nil && err != io.EOF {
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.traceInfo.tr.SetError()
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
ss.traceInfo.tr.Finish()
ss.traceInfo.tr = nil
trInfo.tr.Finish()
trInfo.tr = nil
ss.mu.Unlock()
}()
}
@ -421,13 +425,13 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.statusDesc = appErr.Error()
}
}
if ss.tracing {
if trInfo != nil {
ss.mu.Lock()
if ss.statusCode != codes.OK {
ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true)
ss.traceInfo.tr.SetError()
trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
trInfo.tr.SetError()
} else {
ss.traceInfo.tr.LazyLog(stringer("OK"), false)
trInfo.tr.LazyLog(stringer("OK"), false)
}
ss.mu.Unlock()
}
@ -435,7 +439,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method()
if sm != "" && sm[0] == '/' {
sm = sm[1:]
@ -458,11 +462,11 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
}
// Unary RPC or Streaming RPC?
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md)
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
if sd, ok := srv.sd[method]; ok {
s.processStreamingRPC(t, stream, srv, sd)
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {