diff --git a/clientconn.go b/clientconn.go index 2e9d18fc..b67a59bc 100644 --- a/clientconn.go +++ b/clientconn.go @@ -35,6 +35,7 @@ package grpc import ( "errors" + "log" "sync" "time" @@ -138,6 +139,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { closeTransport = false time.Sleep(backoff(retries)) retries++ + log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target) continue } cc.mu.Lock() diff --git a/metadata/metadata.go b/metadata/metadata.go index 8db7ce8b..8b4ed9e1 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -101,7 +101,7 @@ func New(m map[string]string) MD { // Pairs panics if len(kv) is odd. func Pairs(kv ...string) MD { if len(kv)%2 == 1 { - panic(fmt.Sprintf("Got the odd number of input pairs for metadata: %d", len(kv))) + panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv))) } md := MD{} var k string diff --git a/server.go b/server.go index 570dc810..9152d7ae 100644 --- a/server.go +++ b/server.go @@ -123,12 +123,12 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { defer s.mu.Unlock() // Does some sanity checks. if _, ok := s.m[sd.ServiceName]; ok { - log.Fatalf("rpc: Duplicate service registration for %q", sd.ServiceName) + log.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { - log.Fatalf("rpc: The handler of type %v that does not satisfy %v", st, ht) + log.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) } srv := &service{ server: ss, @@ -186,7 +186,7 @@ func (s *Server) Serve(lis net.Listener) error { if err != nil { s.mu.Unlock() c.Close() - log.Println("failed to create ServerTransport: ", err) + log.Println("grpc: Server.Serve failed to create ServerTransport: ", err) continue } s.conns[st] = true @@ -213,7 +213,7 @@ func (s *Server) sendProto(t transport.ServerTransport, stream *transport.Stream // TODO(zhaoq): There exist other options also such as only closing the // faulty stream locally and remotely (Other streams can keep going). Find // the optimal option. - log.Fatalf("Server: failed to encode proto message %v", err) + log.Fatalf("grpc: Server failed to encode proto message %v", err) } return t.Write(stream, p, opts) } @@ -223,7 +223,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. for { pf, req, err := p.recvMsg() if err == io.EOF { - // The entire stream is done (for unary rpc only). + // The entire stream is done (for unary RPC only). return } if err != nil { @@ -231,9 +231,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. case transport.ConnectionError: // Nothing to do here. case transport.StreamError: - t.WriteStatus(stream, err.Code, err.Desc) + if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil { + log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) + } default: - panic(fmt.Sprintf("BUG: Unexpected error (%T) from recvMsg: %v", err, err)) + panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err)) } return } @@ -241,7 +243,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. case compressionNone: reply, appErr := md.Handler(srv.server, stream.Context(), req) if appErr != nil { - t.WriteStatus(stream, convertCode(appErr), appErr.Error()) + if err := t.WriteStatus(stream, convertCode(appErr), appErr.Error()); err != nil { + log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) + } return } opts := &transport.Options{ @@ -262,7 +266,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusDesc = err.Error() } } - t.WriteStatus(stream, statusCode, statusDesc) + if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { + log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) + } default: panic(fmt.Sprintf("payload format to be supported: %d", pf)) } @@ -279,7 +285,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.statusCode = convertCode(err) ss.statusDesc = err.Error() } - t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) + if err := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc); err != nil { + log.Printf("grpc: Server.processStreamingRPC failed to write status: %v", err) + } } func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { @@ -289,14 +297,18 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str } pos := strings.LastIndex(sm, "/") if pos == -1 { - t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())) + if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil { + log.Printf("grpc: Server.handleStream failed to write status: %v", err) + } return } service := sm[:pos] method := sm[pos+1:] srv, ok := s.m[service] if !ok { - t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)) + if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil { + log.Printf("grpc: Server.handleStream failed to write status: %v", err) + } return } // Unary RPC or Streaming RPC? @@ -308,7 +320,9 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str s.processStreamingRPC(t, stream, srv, sd) return } - t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)) + if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil { + log.Printf("grpc: Server.handleStream failed to write status: %v", err) + } } // Stop stops the gRPC server. Once Stop returns, the server stops accepting @@ -351,7 +365,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error { } t := stream.ServerTransport() if t == nil { - log.Fatalf("rpc.SendHeader: %v has no ServerTransport to send header metadata.", stream) + log.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream) } return t.WriteHeader(stream, md) } diff --git a/transport/http2_client.go b/transport/http2_client.go index 9b2840a1..6eab9894 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -265,7 +265,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea err = t.framer.WriteContinuation(t.nextID, endHeaders, t.hBuf.Next(size)) } if err != nil { - t.notifyError() + t.notifyError(err) return nil, ConnectionErrorf("transport: %v", err) } } @@ -395,7 +395,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { // by http2Client.Close(). No explicit CloseStream() needs to be // invoked. if err := t.framer.WriteData(s.id, endStream, p); err != nil { - t.notifyError() + t.notifyError(err) return ConnectionErrorf("transport: %v", err) } t.writableChan <- 0 @@ -477,7 +477,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { s.state = streamDone s.statusCode, ok = http2RSTErrConvTab[http2.ErrCode(f.ErrCode)] if !ok { - log.Println("No gRPC status found for http2 error ", f.ErrCode) + log.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode) } s.mu.Unlock() s.write(recvMsg{err: io.EOF}) @@ -492,11 +492,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) { } func (t *http2Client) handlePing(f *http2.PingFrame) { - log.Println("PingFrame handler to be implemented") + // TODO(zhaoq): PingFrame handler to be implemented" } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { - log.Println("GoAwayFrame handler to be implemented") + // TODO(zhaoq): GoAwayFrame handler to be implemented" } func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { @@ -565,12 +565,12 @@ func (t *http2Client) reader() { // Check the validity of server preface. frame, err := t.framer.ReadFrame() if err != nil { - t.notifyError() + t.notifyError(err) return } sf, ok := frame.(*http2.SettingsFrame) if !ok { - t.notifyError() + t.notifyError(err) return } t.handleSettings(sf) @@ -581,7 +581,7 @@ func (t *http2Client) reader() { for { frame, err := t.framer.ReadFrame() if err != nil { - t.notifyError() + t.notifyError(err) return } switch frame := frame.(type) { @@ -610,7 +610,7 @@ func (t *http2Client) reader() { case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) default: - log.Printf("http2Client: unhandled frame type %v.", frame) + log.Printf("transport: http2Client.reader got unhandled frame type %v.", frame) } } } @@ -632,7 +632,7 @@ func (t *http2Client) controller() { case *resetStream: t.framer.WriteRSTStream(i.streamID, i.code) default: - log.Printf("http2Client.controller got unexpected item type %v\n", i) + log.Printf("transport: http2Client.controller got unexpected item type %v\n", i) } t.writableChan <- 0 continue @@ -649,12 +649,13 @@ func (t *http2Client) Error() <-chan struct{} { return t.errorChan } -func (t *http2Client) notifyError() { +func (t *http2Client) notifyError(err error) { t.mu.Lock() defer t.mu.Unlock() // make sure t.errorChan is closed only once. if t.state == reachable { t.state = unreachable close(t.errorChan) + log.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) } } diff --git a/transport/http2_server.go b/transport/http2_server.go index 53a09bf3..208153e4 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -132,7 +132,7 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header }() endHeaders, err := hDec.decodeServerHTTP2Headers(s, frame) if err != nil { - log.Print(err) + log.Printf("transport: http2Server.operateHeader found %v", err) if se, ok := err.(StreamError); ok { t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]}) } @@ -194,12 +194,12 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { // Check the validity of client preface. preface := make([]byte, len(clientPreface)) if _, err := io.ReadFull(t.conn, preface); err != nil { - log.Printf("failed to receive the preface from client: %v", err) + log.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) t.Close() return } if !bytes.Equal(preface, clientPreface) { - log.Printf("received bogus greeting from client: %q", preface) + log.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) t.Close() return } @@ -211,7 +211,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { } sf, ok := frame.(*http2.SettingsFrame) if !ok { - log.Printf("invalid preface type %T from client", frame) + log.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) t.Close() return } @@ -232,7 +232,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { id := frame.Header().StreamID if id%2 != 1 || id <= t.maxStreamID { // illegal gRPC stream id. - log.Println("http2Server: received an illegal stream id: ", id) + log.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id) t.Close() break } @@ -262,7 +262,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) { case *http2.WindowUpdateFrame: t.handleWindowUpdate(frame) default: - log.Printf("http2Server: unhandled frame type %v.", frame) + log.Printf("transport: http2Server.HanldeStreams found unhandled frame type %v.", frame) } } } @@ -346,7 +346,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) { } func (t *http2Server) handlePing(f *http2.PingFrame) { - log.Println("PingFrame handler to be implemented") + // TODO(zhaoq): PingFrame handler to be implemented } func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { @@ -430,7 +430,6 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s } s.mu.RUnlock() if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil { - // TODO(zhaoq): Print some errors using glog, e.g., glog.V(1). return err } t.hBuf.Reset() @@ -550,7 +549,7 @@ func (t *http2Server) controller() { case *resetStream: t.framer.WriteRSTStream(i.streamID, i.code) default: - log.Printf("http2Server.controller got unexpected item type %v\n", i) + log.Printf("transport: http2Server.controller got unexpected item type %v\n", i) } t.writableChan <- 0 continue