Merge pull request #458 from iamqizhao/master

Finish trace for premature error
This commit is contained in:
Qi Zhao
2015-11-30 16:53:39 -08:00
2 changed files with 41 additions and 8 deletions

View File

@ -410,11 +410,11 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
defer func() { defer func() {
ss.mu.Lock() ss.mu.Lock()
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError() ss.trInfo.tr.SetError()
} }
trInfo.tr.Finish() ss.trInfo.tr.Finish()
trInfo.tr = nil ss.trInfo.tr = nil
ss.mu.Unlock() ss.mu.Unlock()
}() }()
} }
@ -430,10 +430,10 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if trInfo != nil { if trInfo != nil {
ss.mu.Lock() ss.mu.Lock()
if ss.statusCode != codes.OK { if ss.statusCode != codes.OK {
trInfo.tr.LazyLog(stringer(ss.statusDesc), true) ss.trInfo.tr.LazyLog(stringer(ss.statusDesc), true)
trInfo.tr.SetError() ss.trInfo.tr.SetError()
} else { } else {
trInfo.tr.LazyLog(stringer("OK"), false) ss.trInfo.tr.LazyLog(stringer("OK"), false)
} }
ss.mu.Unlock() ss.mu.Unlock()
} }
@ -448,18 +448,40 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
} }
pos := strings.LastIndex(sm, "/") pos := strings.LastIndex(sm, "/")
if pos == -1 { if pos == -1 {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil { if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err) grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
} }
if trInfo != nil {
trInfo.tr.Finish()
}
return return
} }
service := sm[:pos] service := sm[:pos]
method := sm[pos+1:] method := sm[pos+1:]
srv, ok := s.m[service] srv, ok := s.m[service]
if !ok { if !ok {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil { if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err) grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
} }
if trInfo != nil {
trInfo.tr.Finish()
}
return return
} }
// Unary RPC or Streaming RPC? // Unary RPC or Streaming RPC?
@ -471,9 +493,20 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
s.processStreamingRPC(t, stream, srv, sd, trInfo) s.processStreamingRPC(t, stream, srv, sd, trInfo)
return return
} }
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
trInfo.tr.SetError()
}
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil { if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err) grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
} }
if trInfo != nil {
trInfo.tr.Finish()
}
} }
// Stop stops the gRPC server. Once Stop returns, the server stops accepting // Stop stops the gRPC server. Once Stop returns, the server stops accepting

View File

@ -149,7 +149,7 @@ type clientStream struct {
tracing bool // set to EnableTracing when the clientStream is created. tracing bool // set to EnableTracing when the clientStream is created.
mu sync.Mutex mu sync.Mutex
closed bool closed bool
// trInfo.tr is set when the clientStream is created (if EnableTracing is true), // trInfo.tr is set when the clientStream is created (if EnableTracing is true),
// and is set to nil when the clientStream's finish method is called. // and is set to nil when the clientStream's finish method is called.