@ -35,6 +35,7 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -138,6 +139,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
|
|||||||
closeTransport = false
|
closeTransport = false
|
||||||
time.Sleep(backoff(retries))
|
time.Sleep(backoff(retries))
|
||||||
retries++
|
retries++
|
||||||
|
log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
cc.mu.Lock()
|
cc.mu.Lock()
|
||||||
|
@ -101,7 +101,7 @@ func New(m map[string]string) MD {
|
|||||||
// Pairs panics if len(kv) is odd.
|
// Pairs panics if len(kv) is odd.
|
||||||
func Pairs(kv ...string) MD {
|
func Pairs(kv ...string) MD {
|
||||||
if len(kv)%2 == 1 {
|
if len(kv)%2 == 1 {
|
||||||
panic(fmt.Sprintf("Got the odd number of input pairs for metadata: %d", len(kv)))
|
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
|
||||||
}
|
}
|
||||||
md := MD{}
|
md := MD{}
|
||||||
var k string
|
var k string
|
||||||
|
42
server.go
42
server.go
@ -123,12 +123,12 @@ func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
|
|||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
// Does some sanity checks.
|
// Does some sanity checks.
|
||||||
if _, ok := s.m[sd.ServiceName]; ok {
|
if _, ok := s.m[sd.ServiceName]; ok {
|
||||||
log.Fatalf("rpc: Duplicate service registration for %q", sd.ServiceName)
|
log.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
|
||||||
}
|
}
|
||||||
ht := reflect.TypeOf(sd.HandlerType).Elem()
|
ht := reflect.TypeOf(sd.HandlerType).Elem()
|
||||||
st := reflect.TypeOf(ss)
|
st := reflect.TypeOf(ss)
|
||||||
if !st.Implements(ht) {
|
if !st.Implements(ht) {
|
||||||
log.Fatalf("rpc: The handler of type %v that does not satisfy %v", st, ht)
|
log.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
|
||||||
}
|
}
|
||||||
srv := &service{
|
srv := &service{
|
||||||
server: ss,
|
server: ss,
|
||||||
@ -186,7 +186,7 @@ func (s *Server) Serve(lis net.Listener) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
c.Close()
|
c.Close()
|
||||||
log.Println("failed to create ServerTransport: ", err)
|
log.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.conns[st] = true
|
s.conns[st] = true
|
||||||
@ -213,7 +213,7 @@ func (s *Server) sendProto(t transport.ServerTransport, stream *transport.Stream
|
|||||||
// TODO(zhaoq): There exist other options also such as only closing the
|
// TODO(zhaoq): There exist other options also such as only closing the
|
||||||
// faulty stream locally and remotely (Other streams can keep going). Find
|
// faulty stream locally and remotely (Other streams can keep going). Find
|
||||||
// the optimal option.
|
// the optimal option.
|
||||||
log.Fatalf("Server: failed to encode proto message %v", err)
|
log.Fatalf("grpc: Server failed to encode proto message %v", err)
|
||||||
}
|
}
|
||||||
return t.Write(stream, p, opts)
|
return t.Write(stream, p, opts)
|
||||||
}
|
}
|
||||||
@ -223,7 +223,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
for {
|
for {
|
||||||
pf, req, err := p.recvMsg()
|
pf, req, err := p.recvMsg()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
// The entire stream is done (for unary rpc only).
|
// The entire stream is done (for unary RPC only).
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -231,9 +231,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
case transport.ConnectionError:
|
case transport.ConnectionError:
|
||||||
// Nothing to do here.
|
// Nothing to do here.
|
||||||
case transport.StreamError:
|
case transport.StreamError:
|
||||||
t.WriteStatus(stream, err.Code, err.Desc)
|
if err := t.WriteStatus(stream, err.Code, err.Desc); err != nil {
|
||||||
|
log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("BUG: Unexpected error (%T) from recvMsg: %v", err, err))
|
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", err, err))
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -241,7 +243,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
case compressionNone:
|
case compressionNone:
|
||||||
reply, appErr := md.Handler(srv.server, stream.Context(), req)
|
reply, appErr := md.Handler(srv.server, stream.Context(), req)
|
||||||
if appErr != nil {
|
if appErr != nil {
|
||||||
t.WriteStatus(stream, convertCode(appErr), appErr.Error())
|
if err := t.WriteStatus(stream, convertCode(appErr), appErr.Error()); err != nil {
|
||||||
|
log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
opts := &transport.Options{
|
opts := &transport.Options{
|
||||||
@ -262,7 +266,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
statusDesc = err.Error()
|
statusDesc = err.Error()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.WriteStatus(stream, statusCode, statusDesc)
|
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
|
||||||
|
log.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("payload format to be supported: %d", pf))
|
panic(fmt.Sprintf("payload format to be supported: %d", pf))
|
||||||
}
|
}
|
||||||
@ -279,7 +285,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
|
|||||||
ss.statusCode = convertCode(err)
|
ss.statusCode = convertCode(err)
|
||||||
ss.statusDesc = err.Error()
|
ss.statusDesc = err.Error()
|
||||||
}
|
}
|
||||||
t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
|
if err := t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc); err != nil {
|
||||||
|
log.Printf("grpc: Server.processStreamingRPC failed to write status: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
|
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) {
|
||||||
@ -289,14 +297,18 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
|||||||
}
|
}
|
||||||
pos := strings.LastIndex(sm, "/")
|
pos := strings.LastIndex(sm, "/")
|
||||||
if pos == -1 {
|
if pos == -1 {
|
||||||
t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method()))
|
if err := t.WriteStatus(stream, codes.InvalidArgument, fmt.Sprintf("malformed method name: %q", stream.Method())); err != nil {
|
||||||
|
log.Printf("grpc: Server.handleStream failed to write status: %v", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
service := sm[:pos]
|
service := sm[:pos]
|
||||||
method := sm[pos+1:]
|
method := sm[pos+1:]
|
||||||
srv, ok := s.m[service]
|
srv, ok := s.m[service]
|
||||||
if !ok {
|
if !ok {
|
||||||
t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service))
|
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown service %v", service)); err != nil {
|
||||||
|
log.Printf("grpc: Server.handleStream failed to write status: %v", err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// Unary RPC or Streaming RPC?
|
// Unary RPC or Streaming RPC?
|
||||||
@ -308,7 +320,9 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
|
|||||||
s.processStreamingRPC(t, stream, srv, sd)
|
s.processStreamingRPC(t, stream, srv, sd)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method))
|
if err := t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)); err != nil {
|
||||||
|
log.Printf("grpc: Server.handleStream failed to write status: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop stops the gRPC server. Once Stop returns, the server stops accepting
|
// Stop stops the gRPC server. Once Stop returns, the server stops accepting
|
||||||
@ -351,7 +365,7 @@ func SendHeader(ctx context.Context, md metadata.MD) error {
|
|||||||
}
|
}
|
||||||
t := stream.ServerTransport()
|
t := stream.ServerTransport()
|
||||||
if t == nil {
|
if t == nil {
|
||||||
log.Fatalf("rpc.SendHeader: %v has no ServerTransport to send header metadata.", stream)
|
log.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
|
||||||
}
|
}
|
||||||
return t.WriteHeader(stream, md)
|
return t.WriteHeader(stream, md)
|
||||||
}
|
}
|
||||||
|
@ -265,7 +265,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
err = t.framer.WriteContinuation(t.nextID, endHeaders, t.hBuf.Next(size))
|
err = t.framer.WriteContinuation(t.nextID, endHeaders, t.hBuf.Next(size))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.notifyError()
|
t.notifyError(err)
|
||||||
return nil, ConnectionErrorf("transport: %v", err)
|
return nil, ConnectionErrorf("transport: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -395,7 +395,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
|
|||||||
// by http2Client.Close(). No explicit CloseStream() needs to be
|
// by http2Client.Close(). No explicit CloseStream() needs to be
|
||||||
// invoked.
|
// invoked.
|
||||||
if err := t.framer.WriteData(s.id, endStream, p); err != nil {
|
if err := t.framer.WriteData(s.id, endStream, p); err != nil {
|
||||||
t.notifyError()
|
t.notifyError(err)
|
||||||
return ConnectionErrorf("transport: %v", err)
|
return ConnectionErrorf("transport: %v", err)
|
||||||
}
|
}
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
@ -477,7 +477,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
|||||||
s.state = streamDone
|
s.state = streamDone
|
||||||
s.statusCode, ok = http2RSTErrConvTab[http2.ErrCode(f.ErrCode)]
|
s.statusCode, ok = http2RSTErrConvTab[http2.ErrCode(f.ErrCode)]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Println("No gRPC status found for http2 error ", f.ErrCode)
|
log.Println("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error ", f.ErrCode)
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
@ -492,11 +492,11 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
func (t *http2Client) handlePing(f *http2.PingFrame) {
|
||||||
log.Println("PingFrame handler to be implemented")
|
// TODO(zhaoq): PingFrame handler to be implemented"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||||
log.Println("GoAwayFrame handler to be implemented")
|
// TODO(zhaoq): GoAwayFrame handler to be implemented"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||||
@ -565,12 +565,12 @@ func (t *http2Client) reader() {
|
|||||||
// Check the validity of server preface.
|
// Check the validity of server preface.
|
||||||
frame, err := t.framer.ReadFrame()
|
frame, err := t.framer.ReadFrame()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.notifyError()
|
t.notifyError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sf, ok := frame.(*http2.SettingsFrame)
|
sf, ok := frame.(*http2.SettingsFrame)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.notifyError()
|
t.notifyError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.handleSettings(sf)
|
t.handleSettings(sf)
|
||||||
@ -581,7 +581,7 @@ func (t *http2Client) reader() {
|
|||||||
for {
|
for {
|
||||||
frame, err := t.framer.ReadFrame()
|
frame, err := t.framer.ReadFrame()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.notifyError()
|
t.notifyError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch frame := frame.(type) {
|
switch frame := frame.(type) {
|
||||||
@ -610,7 +610,7 @@ func (t *http2Client) reader() {
|
|||||||
case *http2.WindowUpdateFrame:
|
case *http2.WindowUpdateFrame:
|
||||||
t.handleWindowUpdate(frame)
|
t.handleWindowUpdate(frame)
|
||||||
default:
|
default:
|
||||||
log.Printf("http2Client: unhandled frame type %v.", frame)
|
log.Printf("transport: http2Client.reader got unhandled frame type %v.", frame)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -632,7 +632,7 @@ func (t *http2Client) controller() {
|
|||||||
case *resetStream:
|
case *resetStream:
|
||||||
t.framer.WriteRSTStream(i.streamID, i.code)
|
t.framer.WriteRSTStream(i.streamID, i.code)
|
||||||
default:
|
default:
|
||||||
log.Printf("http2Client.controller got unexpected item type %v\n", i)
|
log.Printf("transport: http2Client.controller got unexpected item type %v\n", i)
|
||||||
}
|
}
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
continue
|
continue
|
||||||
@ -649,12 +649,13 @@ func (t *http2Client) Error() <-chan struct{} {
|
|||||||
return t.errorChan
|
return t.errorChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) notifyError() {
|
func (t *http2Client) notifyError(err error) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
defer t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
// make sure t.errorChan is closed only once.
|
// make sure t.errorChan is closed only once.
|
||||||
if t.state == reachable {
|
if t.state == reachable {
|
||||||
t.state = unreachable
|
t.state = unreachable
|
||||||
close(t.errorChan)
|
close(t.errorChan)
|
||||||
|
log.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -132,7 +132,7 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header
|
|||||||
}()
|
}()
|
||||||
endHeaders, err := hDec.decodeServerHTTP2Headers(s, frame)
|
endHeaders, err := hDec.decodeServerHTTP2Headers(s, frame)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print(err)
|
log.Printf("transport: http2Server.operateHeader found %v", err)
|
||||||
if se, ok := err.(StreamError); ok {
|
if se, ok := err.(StreamError); ok {
|
||||||
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
|
t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
|
||||||
}
|
}
|
||||||
@ -194,12 +194,12 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
// Check the validity of client preface.
|
// Check the validity of client preface.
|
||||||
preface := make([]byte, len(clientPreface))
|
preface := make([]byte, len(clientPreface))
|
||||||
if _, err := io.ReadFull(t.conn, preface); err != nil {
|
if _, err := io.ReadFull(t.conn, preface); err != nil {
|
||||||
log.Printf("failed to receive the preface from client: %v", err)
|
log.Printf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
|
||||||
t.Close()
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !bytes.Equal(preface, clientPreface) {
|
if !bytes.Equal(preface, clientPreface) {
|
||||||
log.Printf("received bogus greeting from client: %q", preface)
|
log.Printf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
|
||||||
t.Close()
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -211,7 +211,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
}
|
}
|
||||||
sf, ok := frame.(*http2.SettingsFrame)
|
sf, ok := frame.(*http2.SettingsFrame)
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Printf("invalid preface type %T from client", frame)
|
log.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
|
||||||
t.Close()
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -232,7 +232,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
id := frame.Header().StreamID
|
id := frame.Header().StreamID
|
||||||
if id%2 != 1 || id <= t.maxStreamID {
|
if id%2 != 1 || id <= t.maxStreamID {
|
||||||
// illegal gRPC stream id.
|
// illegal gRPC stream id.
|
||||||
log.Println("http2Server: received an illegal stream id: ", id)
|
log.Println("transport: http2Server.HandleStreams received an illegal stream id: ", id)
|
||||||
t.Close()
|
t.Close()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -262,7 +262,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
|
|||||||
case *http2.WindowUpdateFrame:
|
case *http2.WindowUpdateFrame:
|
||||||
t.handleWindowUpdate(frame)
|
t.handleWindowUpdate(frame)
|
||||||
default:
|
default:
|
||||||
log.Printf("http2Server: unhandled frame type %v.", frame)
|
log.Printf("transport: http2Server.HanldeStreams found unhandled frame type %v.", frame)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -346,7 +346,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||||
log.Println("PingFrame handler to be implemented")
|
// TODO(zhaoq): PingFrame handler to be implemented
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||||
@ -430,7 +430,6 @@ func (t *http2Server) WriteStatus(s *Stream, statusCode codes.Code, statusDesc s
|
|||||||
}
|
}
|
||||||
s.mu.RUnlock()
|
s.mu.RUnlock()
|
||||||
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
|
||||||
// TODO(zhaoq): Print some errors using glog, e.g., glog.V(1).
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
t.hBuf.Reset()
|
t.hBuf.Reset()
|
||||||
@ -550,7 +549,7 @@ func (t *http2Server) controller() {
|
|||||||
case *resetStream:
|
case *resetStream:
|
||||||
t.framer.WriteRSTStream(i.streamID, i.code)
|
t.framer.WriteRSTStream(i.streamID, i.code)
|
||||||
default:
|
default:
|
||||||
log.Printf("http2Server.controller got unexpected item type %v\n", i)
|
log.Printf("transport: http2Server.controller got unexpected item type %v\n", i)
|
||||||
}
|
}
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
continue
|
continue
|
||||||
|
Reference in New Issue
Block a user