diff --git a/clientconn.go b/clientconn.go index 3dc42f79..87f302fc 100644 --- a/clientconn.go +++ b/clientconn.go @@ -42,6 +42,7 @@ import ( "time" "golang.org/x/net/context" + "golang.org/x/net/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/transport" @@ -148,6 +149,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { target: target, shutdownChan: make(chan struct{}), } + if EnableTracing { + cc.events = trace.NewEventLog("grpc.ClientConn", target) + } for _, opt := range opts { opt(&cc.dopts) } @@ -181,6 +185,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { cc.stateCV = sync.NewCond(&cc.mu) if cc.dopts.block { if err := cc.resetTransport(false); err != nil { + cc.mu.Lock() + cc.errorf("dial failed: %v", err) + cc.mu.Unlock() cc.Close() return nil, err } @@ -190,6 +197,9 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { // Start a goroutine connecting to the server asynchronously. go func() { if err := cc.resetTransport(false); err != nil { + cc.mu.Lock() + cc.errorf("dial failed: %v", err) + cc.mu.Unlock() grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) cc.Close() return @@ -200,6 +210,22 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { return cc, nil } +// printf records an event in cc's event log, unless cc has been closed. +// REQUIRES cc.mu is held. +func (cc *ClientConn) printf(format string, a ...interface{}) { + if cc.events != nil { + cc.events.Printf(format, a...) + } +} + +// errorf records an error in cc's event log, unless cc has been closed. +// REQUIRES cc.mu is held. +func (cc *ClientConn) errorf(format string, a ...interface{}) { + if cc.events != nil { + cc.events.Errorf(format, a...) + } +} + // ConnectivityState indicates the state of a client connection. type ConnectivityState int @@ -239,14 +265,15 @@ type ClientConn struct { authority string dopts dialOptions shutdownChan chan struct{} + events trace.EventLog mu sync.Mutex state ConnectivityState stateCV *sync.Cond // ready is closed and becomes nil when a new transport is up or failed // due to timeout. - ready chan struct{} - transport transport.ClientTransport + ready chan struct{} + transport transport.ClientTransport } // State returns the connectivity state of the ClientConn @@ -295,6 +322,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { start := time.Now() for { cc.mu.Lock() + cc.printf("connecting") if cc.state == Shutdown { cc.mu.Unlock() return ErrClientConnClosing @@ -330,6 +358,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { newTransport, err := transport.NewClientTransport(cc.target, &copts) if err != nil { cc.mu.Lock() + cc.errorf("transient failure: %v", err) cc.state = TransientFailure cc.stateCV.Broadcast() cc.mu.Unlock() @@ -339,6 +368,9 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { } // Fail early before falling into sleep. if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) { + cc.mu.Lock() + cc.errorf("connection timeout") + cc.mu.Unlock() cc.Close() return ErrClientConnTimeout } @@ -349,6 +381,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { continue } cc.mu.Lock() + cc.printf("ready") if cc.state == Shutdown { // cc.Close() has been invoked. cc.mu.Unlock() @@ -383,6 +416,9 @@ func (cc *ClientConn) transportMonitor() { cc.mu.Unlock() if err := cc.resetTransport(true); err != nil { // The ClientConn is closing. + cc.mu.Lock() + cc.printf("transport exiting: %v", err) + cc.mu.Unlock() grpclog.Printf("grpc: ClientConn.transportMonitor exits due to: %v", err) return } @@ -433,6 +469,10 @@ func (cc *ClientConn) Close() error { } cc.state = Shutdown cc.stateCV.Broadcast() + if cc.events != nil { + cc.events.Finish() + cc.events = nil + } if cc.ready != nil { close(cc.ready) cc.ready = nil diff --git a/server.go b/server.go index feb29887..d0a0e9a8 100644 --- a/server.go +++ b/server.go @@ -39,6 +39,7 @@ import ( "io" "net" "reflect" + "runtime" "strings" "sync" @@ -79,11 +80,12 @@ type service struct { // Server is a gRPC server to serve RPC requests. type Server struct { - opts options - mu sync.Mutex - lis map[net.Listener]bool - conns map[transport.ServerTransport]bool - m map[string]*service // service name -> service info + opts options + mu sync.Mutex + lis map[net.Listener]bool + conns map[transport.ServerTransport]bool + m map[string]*service // service name -> service info + events trace.EventLog } type options struct { @@ -128,12 +130,33 @@ func NewServer(opt ...ServerOption) *Server { // Set the default codec. opts.codec = protoCodec{} } - return &Server{ + s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[transport.ServerTransport]bool), m: make(map[string]*service), } + if EnableTracing { + _, file, line, _ := runtime.Caller(1) + s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) + } + return s +} + +// printf records an event in s's event log, unless s has been stopped. +// REQUIRES s.mu is held. +func (s *Server) printf(format string, a ...interface{}) { + if s.events != nil { + s.events.Printf(format, a...) + } +} + +// errorf records an error in s's event log, unless s has been stopped. +// REQUIRES s.mu is held. +func (s *Server) errorf(format string, a ...interface{}) { + if s.events != nil { + s.events.Errorf(format, a...) + } } // RegisterService register a service and its implementation to the gRPC @@ -151,6 +174,7 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { func (s *Server) register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() + s.printf("RegisterService(%q)", sd.ServiceName) if _, ok := s.m[sd.ServiceName]; ok { grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } @@ -182,6 +206,7 @@ var ( // Service returns when lis.Accept fails. func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() + s.printf("serving") if s.lis == nil { s.mu.Unlock() return ErrServerStopped @@ -197,12 +222,18 @@ func (s *Server) Serve(lis net.Listener) error { for { c, err := lis.Accept() if err != nil { + s.mu.Lock() + s.printf("done serving; Accept = %v", err) + s.mu.Unlock() return err } var authInfo credentials.AuthInfo if creds, ok := s.opts.creds.(credentials.TransportAuthenticator); ok { c, authInfo, err = creds.ServerHandshake(c) if err != nil { + s.mu.Lock() + s.errorf("ServerHandshake(%q) failed: %v", c.RemoteAddr(), err) + s.mu.Unlock() grpclog.Println("grpc: Server.Serve failed to complete security handshake.") continue } @@ -215,6 +246,7 @@ func (s *Server) Serve(lis net.Listener) error { } st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo) if err != nil { + s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) s.mu.Unlock() c.Close() grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err) @@ -418,6 +450,12 @@ func (s *Server) Stop() { for c := range cs { c.Close() } + s.mu.Lock() + if s.events != nil { + s.events.Finish() + s.events = nil + } + s.mu.Unlock() } // TestingCloseConns closes all exiting transports but keeps s.lis accepting new