From 7aa428f5d640f3ec53f4423b2560deec34a5ea4a Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 30 Nov 2015 16:41:52 -0800 Subject: [PATCH] Finish trace for premature error --- server.go | 47 ++++++++++++++++++++++++++++++++++++++++------- stream.go | 2 +- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/server.go b/server.go index 487a75c5..655e7d86 100644 --- a/server.go +++ b/server.go @@ -410,11 +410,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp defer func() { ss.mu.Lock() if err != nil && err != io.EOF { - trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) - trInfo.tr.SetError() + ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + ss.trInfo.tr.SetError() } - trInfo.tr.Finish() - trInfo.tr = nil + ss.trInfo.tr.Finish() + ss.trInfo.tr = nil ss.mu.Unlock() }() } @@ -430,10 +430,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if trInfo != nil { ss.mu.Lock() if ss.statusCode != codes.OK { - trInfo.tr.LazyLog(stringer(ss.statusDesc), true) - trInfo.tr.SetError() + ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true) + ss.trInfo.tr.SetError() } else { - trInfo.tr.LazyLog(stringer("OK"), false) + ss.trInfo.tr.LazyLog(stringer("OK"), false) } ss.mu.Unlock() } @@ -448,18 +448,40 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str } pos := strings.LastIndex(sm, "/") if pos == -1 { + if trInfo != nil { + trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) + trInfo.tr.SetError() + } if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil { + if trInfo != nil { + trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + trInfo.tr.SetError() + } grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err) } + if trInfo != nil { + trInfo.tr.Finish() + } return } service := sm[:pos] method := sm[pos+1:] srv, ok := s.m[service] if !ok { + if trInfo != nil { + trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) + trInfo.tr.SetError() + } if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil { + if trInfo != nil { + trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + trInfo.tr.SetError() + } grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err) } + if trInfo != nil { + trInfo.tr.Finish() + } return } // Unary RPC or Streaming RPC? @@ -471,9 +493,20 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str s.processStreamingRPC(t, stream, srv, sd, trInfo) return } + if trInfo != nil { + trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) + trInfo.tr.SetError() + } if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil { + if trInfo != nil { + trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) + trInfo.tr.SetError() + } grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err) } + if trInfo != nil { + trInfo.tr.Finish() + } } // Stop stops the gRPC server. Once Stop returns, the server stops accepting diff --git a/stream.go b/stream.go index 2370dd0e..0ee572c2 100644 --- a/stream.go +++ b/stream.go @@ -149,7 +149,7 @@ type clientStream struct { tracing bool // set to EnableTracing when the clientStream is created. - mu sync.Mutex + mu sync.Mutex closed bool // trInfo.tr is set when the clientStream is created (if EnableTracing is true), // and is set to nil when the clientStream's finish method is called.