From b6c9c5a70f2466da74774f338f5397e6b9a362eb Mon Sep 17 00:00:00 2001 From: Sameer Ajmani Date: Wed, 23 Sep 2015 17:07:35 -0400 Subject: [PATCH 01/27] grpc: record the description of the status returned by server RPC handlers in request traces, and mark the trace as an error if the status is not OK. Install the trace into the Context passed to server handlers using trace.NewContext, so that code in the server handlers can annotate the trace using trace.FromContext. --- server.go | 28 +++++++++++++++++++++++++++- stream.go | 7 ++++--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index d0a0e9a8..904d9156 100644 --- a/server.go +++ b/server.go @@ -282,12 +282,14 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { + ctx := stream.Context() var traceInfo traceInfo if EnableTracing { traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) defer traceInfo.tr.Finish() traceInfo.firstLine.client = false traceInfo.tr.LazyLog(&traceInfo.firstLine, false) + ctx = trace.NewContext(ctx, traceInfo.tr) defer func() { if err != nil && err != io.EOF { traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) @@ -322,7 +324,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. case compressionNone: statusCode := codes.OK statusDesc := "" - reply, appErr := md.Handler(srv.server, stream.Context(), s.opts.codec, req) + reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -331,12 +333,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusCode = convertCode(appErr) statusDesc = appErr.Error() } + if traceInfo.tr != nil && statusCode != codes.OK { + traceInfo.tr.LazyLog(stringer(statusDesc), true) + traceInfo.tr.SetError() + } + if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) return err } return nil } + if traceInfo.tr != nil { + traceInfo.tr.LazyLog(stringer("OK"), true) + } opts := &transport.Options{ Last: true, Delay: false, @@ -371,11 +381,13 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, + ctx: stream.Context(), } if ss.tracing { ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.firstLine.client = false ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) + ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) defer func() { ss.mu.Lock() if err != nil && err != io.EOF { @@ -396,10 +408,24 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.statusDesc = appErr.Error() } } + if ss.tracing { + ss.mu.Lock() + if ss.statusCode != codes.OK { + ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true) + ss.traceInfo.tr.SetError() + } else { + ss.traceInfo.tr.LazyLog(stringer("OK"), true) + } + ss.mu.Unlock() + } return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) } +type stringer string + +func (s stringer) String() string { return string(s) } + func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { sm := stream.Method() if sm != "" && sm[0] == '/' { diff --git a/stream.go b/stream.go index e14664cb..91d8115d 100644 --- a/stream.go +++ b/stream.go @@ -113,6 +113,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) } cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false) + ctx = trace.NewContext(ctx, cs.traceInfo.tr) } t, err := cc.wait(ctx) if err != nil { @@ -283,7 +284,8 @@ type serverStream struct { statusCode codes.Code statusDesc string - tracing bool // set to EnableTracing when the serverStream is created. + tracing bool // set to EnableTracing when the serverStream is created. + ctx context.Context // provides trace.FromContext when tracing mu sync.Mutex // protects traceInfo // traceInfo.tr is set when the serverStream is created (if EnableTracing is true), @@ -292,7 +294,7 @@ type serverStream struct { } func (ss *serverStream) Context() context.Context { - return ss.s.Context() + return ss.ctx } func (ss *serverStream) SendHeader(md metadata.MD) error { @@ -317,7 +319,6 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) ss.traceInfo.tr.SetError() } - ss.mu.Unlock() } }() From 9afcd0c6977d24efce56e0102c0e050e1cec554b Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 23 Sep 2015 19:09:37 -0700 Subject: [PATCH 02/27] preliminary refactoring for custom naming and load balancing --- call.go | 13 ++-- clientconn.go | 211 ++++++++++++++++++++++++++++++-------------------- picker.go | 45 +++++++++++ stream.go | 10 ++- 4 files changed, 185 insertions(+), 94 deletions(-) create mode 100644 picker.go diff --git a/call.go b/call.go index 0115a28d..7ca088a8 100644 --- a/call.go +++ b/call.go @@ -116,7 +116,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli o.after(&c) } }() - + conn, err := cc.picker.Pick() + if err != nil { + return toRPCErr(err) + } if EnableTracing { c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) defer c.traceInfo.tr.Finish() @@ -134,7 +137,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli }() } callHdr := &transport.CallHdr{ - Host: cc.authority, + Host: conn.authority, Method: method, } topts := &transport.Options{ @@ -154,7 +157,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if lastErr != nil && c.failFast { return toRPCErr(lastErr) } - t, err = cc.wait(ctx) + t, err = conn.wait(ctx) if err != nil { if lastErr != nil { // This was a retry; return the error from the last attempt. @@ -165,7 +168,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if c.traceInfo.tr != nil { c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) } - stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) + stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts) if err != nil { if _, ok := err.(transport.ConnectionError); ok { lastErr = err @@ -177,7 +180,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } // Receive the response - lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply) + lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply) if _, ok := lastErr.(transport.ConnectionError); ok { continue } diff --git a/clientconn.go b/clientconn.go index 87f302fc..2b8e699d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -73,6 +73,7 @@ var ( // values passed to Dial. type dialOptions struct { codec Codec + picker Picker block bool insecure bool copts transport.ConnectOptions @@ -142,88 +143,18 @@ func WithUserAgent(s string) DialOption { // Dial creates a client connection the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { - if target == "" { - return nil, ErrUnspecTarget - } - cc := &ClientConn{ - target: target, - shutdownChan: make(chan struct{}), - } - if EnableTracing { - cc.events = trace.NewEventLog("grpc.ClientConn", target) - } + var dopts dialOptions for _, opt := range opts { - opt(&cc.dopts) + opt(&dopts) } - if !cc.dopts.insecure { - var ok bool - for _, c := range cc.dopts.copts.AuthOptions { - if _, ok := c.(credentials.TransportAuthenticator); !ok { - continue - } - ok = true - } - if !ok { - return nil, ErrNoTransportSecurity - } - } else { - for _, c := range cc.dopts.copts.AuthOptions { - if c.RequireTransportSecurity() { - return nil, ErrCredentialsMisuse - } - } - } - colonPos := strings.LastIndex(target, ":") - if colonPos == -1 { - colonPos = len(target) - } - cc.authority = target[:colonPos] - if cc.dopts.codec == nil { - // Set the default codec. - cc.dopts.codec = protoCodec{} - } - cc.stateCV = sync.NewCond(&cc.mu) - if cc.dopts.block { - if err := cc.resetTransport(false); err != nil { - cc.mu.Lock() - cc.errorf("dial failed: %v", err) - cc.mu.Unlock() - cc.Close() + if dopts.picker == nil { + p, err := newSimplePicker(target, dopts) + if err != nil { return nil, err } - // Start to monitor the error status of transport. - go cc.transportMonitor() - } else { - // Start a goroutine connecting to the server asynchronously. - go func() { - if err := cc.resetTransport(false); err != nil { - cc.mu.Lock() - cc.errorf("dial failed: %v", err) - cc.mu.Unlock() - grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) - cc.Close() - return - } - go cc.transportMonitor() - }() - } - return cc, nil -} - -// printf records an event in cc's event log, unless cc has been closed. -// REQUIRES cc.mu is held. -func (cc *ClientConn) printf(format string, a ...interface{}) { - if cc.events != nil { - cc.events.Printf(format, a...) - } -} - -// errorf records an error in cc's event log, unless cc has been closed. -// REQUIRES cc.mu is held. -func (cc *ClientConn) errorf(format string, a ...interface{}) { - if cc.events != nil { - cc.events.Errorf(format, a...) + dopts.picker = p } + return &ClientConn{dopts.picker}, nil } // ConnectivityState indicates the state of a client connection. @@ -261,6 +192,36 @@ func (s ConnectivityState) String() string { // ClientConn represents a client connection to an RPC service. type ClientConn struct { + picker Picker +} + +// State returns the connectivity state of the Conn used for next upcoming RPC. +func (cc *ClientConn) State() ConnectivityState { + c := cc.picker.Peek() + if c == nil { + return Idle + } + return c.getState() +} + +// WaitForStateChange blocks until the state changes to something other than the sourceState +// or timeout fires on the Conn used for next upcoming RPC. It returns false if the Conn is nil +// or timeout fires, and true otherwise. +func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { + c := cc.picker.Peek() + if c == nil { + return false + } + return c.waitForStateChange(timeout, sourceState) +} + +// Close starts to tear down the ClientConn. +func (cc *ClientConn) Close() error { + return cc.picker.Close() +} + +// Conn is a client connection to a single destination. +type Conn struct { target string authority string dopts dialOptions @@ -276,16 +237,94 @@ type ClientConn struct { transport transport.ClientTransport } -// State returns the connectivity state of the ClientConn -func (cc *ClientConn) State() ConnectivityState { +// NewConn creates a Conn. +func NewConn(target string, dopts dialOptions) (*Conn, error) { + if target == "" { + return nil, ErrUnspecTarget + } + c := &Conn{ + target: target, + dopts: dopts, + shutdownChan: make(chan struct{}), + } + if EnableTracing { + c.events = trace.NewEventLog("grpc.ClientConn", target) + } + if !c.dopts.insecure { + var ok bool + for _, cd := range c.dopts.copts.AuthOptions { + if _, ok := cd.(credentials.TransportAuthenticator); !ok { + continue + } + ok = true + } + if !ok { + return nil, ErrNoTransportSecurity + } + } else { + for _, cd := range c.dopts.copts.AuthOptions { + if cd.RequireTransportSecurity() { + return nil, ErrCredentialsMisuse + } + } + } + colonPos := strings.LastIndex(target, ":") + if colonPos == -1 { + colonPos = len(target) + } + c.authority = target[:colonPos] + if c.dopts.codec == nil { + // Set the default codec. + c.dopts.codec = protoCodec{} + } + c.stateCV = sync.NewCond(&c.mu) + if c.dopts.block { + if err := c.resetTransport(false); err != nil { + c.Close() + return nil, err + } + // Start to monitor the error status of transport. + go c.transportMonitor() + } else { + // Start a goroutine connecting to the server asynchronously. + go func() { + if err := c.resetTransport(false); err != nil { + grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) + c.Close() + return + } + go c.transportMonitor() + }() + } + return c, nil +} + +// printf records an event in cc's event log, unless cc has been closed. +// REQUIRES cc.mu is held. +func (cc *Conn) printf(format string, a ...interface{}) { + if cc.events != nil { + cc.events.Printf(format, a...) + } +} + +// errorf records an error in cc's event log, unless cc has been closed. +// REQUIRES cc.mu is held. +func (cc *Conn) errorf(format string, a ...interface{}) { + if cc.events != nil { + cc.events.Errorf(format, a...) + } +} + +// getState returns the connectivity state of the Conn +func (cc *Conn) getState() ConnectivityState { cc.mu.Lock() defer cc.mu.Unlock() return cc.state } -// WaitForStateChange blocks until the state changes to something other than the sourceState +// waitForStateChange blocks until the state changes to something other than the sourceState // or timeout fires. It returns false if timeout fires and true otherwise. -func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { +func (cc *Conn) waitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { start := time.Now() cc.mu.Lock() defer cc.mu.Unlock() @@ -317,7 +356,7 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn return true } -func (cc *ClientConn) resetTransport(closeTransport bool) error { +func (cc *Conn) resetTransport(closeTransport bool) error { var retries int start := time.Now() for { @@ -402,7 +441,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { // Run in a goroutine to track the error in transport and create the // new transport if an error happens. It returns when the channel is closing. -func (cc *ClientConn) transportMonitor() { +func (cc *Conn) transportMonitor() { for { select { // shutdownChan is needed to detect the teardown when @@ -429,7 +468,7 @@ func (cc *ClientConn) transportMonitor() { // When wait returns, either the new transport is up or ClientConn is // closing. -func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, error) { +func (cc *Conn) wait(ctx context.Context) (transport.ClientTransport, error) { for { cc.mu.Lock() switch { @@ -456,12 +495,12 @@ func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, erro } } -// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if +// Close starts to tear down the Conn. Returns ErrClientConnClosing if // it has been closed (mostly due to dial time-out). // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many ClientConn's in a // tight loop. -func (cc *ClientConn) Close() error { +func (cc *Conn) Close() error { cc.mu.Lock() defer cc.mu.Unlock() if cc.state == Shutdown { diff --git a/picker.go b/picker.go new file mode 100644 index 00000000..7c633471 --- /dev/null +++ b/picker.go @@ -0,0 +1,45 @@ +package grpc + +// Picker picks a Conn for RPC requests. +// This is EXPERIMENTAL and Please do not implement your own Picker for now. +type Picker interface { + // Pick returns the Conn to use for the upcoming RPC. It may return different + // Conn's up to the implementation. + Pick() (*Conn, error) + // Peek returns the Conn use use for the next upcoming RPC. It returns the same + // Conn until next time Pick gets invoked. + Peek() *Conn + // Close closes all the Conn's owned by this Picker. + Close() error +} + +func newSimplePicker(target string, dopts dialOptions) (Picker, error) { + c, err := NewConn(target, dopts) + if err != nil { + return nil, err + } + return &simplePicker{ + conn: c, + }, nil +} + +// simplePicker is default Picker which is used when there is no custom Picker +// specified by users. It always picks the same Conn. +type simplePicker struct { + conn *Conn +} + +func (p *simplePicker) Pick() (*Conn, error) { + return p.conn, nil +} + +func (p *simplePicker) Peek() *Conn { + return p.conn +} + +func (p *simplePicker) Close() error { + if p.conn != nil { + return p.conn.Close() + } + return nil +} diff --git a/stream.go b/stream.go index e14664cb..d66e1a41 100644 --- a/stream.go +++ b/stream.go @@ -96,14 +96,18 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { + conn, err := cc.picker.Pick() + if err != nil { + return nil, toRPCErr(err) + } // TODO(zhaoq): CallOption is omitted. Add support when it is needed. callHdr := &transport.CallHdr{ - Host: cc.authority, + Host: conn.authority, Method: method, } cs := &clientStream{ desc: desc, - codec: cc.dopts.codec, + codec: conn.dopts.codec, tracing: EnableTracing, } if cs.tracing { @@ -114,7 +118,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false) } - t, err := cc.wait(ctx) + t, err := conn.wait(ctx) if err != nil { return nil, toRPCErr(err) } From ee98c48bb4306dd103e2fb61ad11854a3687f522 Mon Sep 17 00:00:00 2001 From: Sameer Ajmani Date: Wed, 23 Sep 2015 22:17:37 -0400 Subject: [PATCH 03/27] Incorporate dsymonds' comments. Fix another bug: cancel the Context provided to an RPC server handler as soon as that handler returns, so that goroutines started by that handler can detect that the handler is done and exit. Without this fix, goroutines started by a handler will keep running, unless the handler itself arranges to cancel the context. --- server.go | 15 +++++++-------- stream.go | 4 ++-- trace.go | 4 ++++ 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/server.go b/server.go index 904d9156..cd26c061 100644 --- a/server.go +++ b/server.go @@ -282,7 +282,8 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { - ctx := stream.Context() + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() var traceInfo traceInfo if EnableTracing { traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) @@ -345,7 +346,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. return nil } if traceInfo.tr != nil { - traceInfo.tr.LazyLog(stringer("OK"), true) + traceInfo.tr.LazyLog(stringer("OK"), false) } opts := &transport.Options{ Last: true, @@ -375,13 +376,15 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() ss := &serverStream{ t: t, s: stream, + ctx: ctx, p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, - ctx: stream.Context(), } if ss.tracing { ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) @@ -414,7 +417,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true) ss.traceInfo.tr.SetError() } else { - ss.traceInfo.tr.LazyLog(stringer("OK"), true) + ss.traceInfo.tr.LazyLog(stringer("OK"), false) } ss.mu.Unlock() } @@ -422,10 +425,6 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp } -type stringer string - -func (s stringer) String() string { return string(s) } - func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream) { sm := stream.Method() if sm != "" && sm[0] == '/' { diff --git a/stream.go b/stream.go index 91d8115d..edd9bc84 100644 --- a/stream.go +++ b/stream.go @@ -279,13 +279,13 @@ type ServerStream interface { type serverStream struct { t transport.ServerTransport s *transport.Stream + ctx context.Context // provides trace.FromContext when tracing p *parser codec Codec statusCode codes.Code statusDesc string - tracing bool // set to EnableTracing when the serverStream is created. - ctx context.Context // provides trace.FromContext when tracing + tracing bool // set to EnableTracing when the serverStream is created. mu sync.Mutex // protects traceInfo // traceInfo.tr is set when the serverStream is created (if EnableTracing is true), diff --git a/trace.go b/trace.go index 24635740..cde04fbf 100644 --- a/trace.go +++ b/trace.go @@ -114,3 +114,7 @@ type fmtStringer struct { func (f *fmtStringer) String() string { return fmt.Sprintf(f.format, f.a...) } + +type stringer string + +func (s stringer) String() string { return string(s) } From ffbdf88f0fe2bbd6a170f643df5f279dd2b0deec Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 24 Sep 2015 10:36:56 -0700 Subject: [PATCH 04/27] add more comments --- clientconn.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clientconn.go b/clientconn.go index 2b8e699d..d0d6af5c 100644 --- a/clientconn.go +++ b/clientconn.go @@ -196,6 +196,7 @@ type ClientConn struct { } // State returns the connectivity state of the Conn used for next upcoming RPC. +// This is EXPERIMENTAL API. func (cc *ClientConn) State() ConnectivityState { c := cc.picker.Peek() if c == nil { @@ -207,6 +208,7 @@ func (cc *ClientConn) State() ConnectivityState { // WaitForStateChange blocks until the state changes to something other than the sourceState // or timeout fires on the Conn used for next upcoming RPC. It returns false if the Conn is nil // or timeout fires, and true otherwise. +// This is EXPERIEMENTAL API. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { c := cc.picker.Peek() if c == nil { From c956ccf146604cf87bfc42bfeabfb23f3537aa01 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 24 Sep 2015 16:58:50 -0700 Subject: [PATCH 05/27] add test coverage --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 232ffaa1..60d5c5d2 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ #gRPC-Go -[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc) +[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc) [![Coverage Status](https://coveralls.io/repos/grpc/grpc-go/badge.svg?branch=master&service=github)](https://coveralls.io/github/grpc/grpc-go?branch=master) The Go implementation of [gRPC](http://www.grpc.io/): A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start](http://www.grpc.io/docs/) guide. From 1d1e4bf24de7c1322949c13faa56e05c4c1071d8 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 24 Sep 2015 17:03:21 -0700 Subject: [PATCH 06/27] revert README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 60d5c5d2..232ffaa1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ #gRPC-Go -[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc) [![Coverage Status](https://coveralls.io/repos/grpc/grpc-go/badge.svg?branch=master&service=github)](https://coveralls.io/github/grpc/grpc-go?branch=master) +[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc) The Go implementation of [gRPC](http://www.grpc.io/): A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start](http://www.grpc.io/docs/) guide. From ec99a32572e9a94a9376c2e9396177a105eb93f3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 25 Sep 2015 13:21:25 -0700 Subject: [PATCH 07/27] redesign the API --- call.go | 30 +++++++++++++++++++++--------- clientconn.go | 39 +++++++++++++++++++++------------------ picker.go | 29 ++++++++++++++++++----------- stream.go | 25 ++++++++++++++++++------- 4 files changed, 78 insertions(+), 45 deletions(-) diff --git a/call.go b/call.go index 7ca088a8..2f77175f 100644 --- a/call.go +++ b/call.go @@ -116,10 +116,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli o.after(&c) } }() - conn, err := cc.picker.Pick() - if err != nil { - return toRPCErr(err) - } + //conn, err := cc.picker.Pick() + //if err != nil { + // return toRPCErr(err) + //} if EnableTracing { c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) defer c.traceInfo.tr.Finish() @@ -136,10 +136,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } - callHdr := &transport.CallHdr{ - Host: conn.authority, - Method: method, - } + //callHdr := &transport.CallHdr{ + // Host: conn.authority, + // Method: method, + //} topts := &transport.Options{ Last: true, Delay: false, @@ -152,13 +152,25 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli err error t transport.ClientTransport stream *transport.Stream + conn *Conn ) // TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs. if lastErr != nil && c.failFast { return toRPCErr(lastErr) } - t, err = conn.wait(ctx) + conn, err = cc.picker.Pick() if err != nil { + return toRPCErr(err) + } + callHdr := &transport.CallHdr{ + Host: conn.authority, + Method: method, + } + t, err = conn.Wait(ctx) + if err != nil { + if err == ErrTransientFailure { + continue + } if lastErr != nil { // This was a retry; return the error from the last attempt. return toRPCErr(lastErr) diff --git a/clientconn.go b/clientconn.go index d0d6af5c..33994b16 100644 --- a/clientconn.go +++ b/clientconn.go @@ -65,6 +65,8 @@ var ( // ErrClientConnTimeout indicates that the connection could not be // established or re-established within the specified timeout. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") + // ErrTransientFailure indicates the connection failed due to a transient error. + ErrTransientFailure = errors.New("transient connection failure") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -148,7 +150,7 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { opt(&dopts) } if dopts.picker == nil { - p, err := newSimplePicker(target, dopts) + p, err := newUnicastPicker(target, dopts) if err != nil { return nil, err } @@ -198,11 +200,7 @@ type ClientConn struct { // State returns the connectivity state of the Conn used for next upcoming RPC. // This is EXPERIMENTAL API. func (cc *ClientConn) State() ConnectivityState { - c := cc.picker.Peek() - if c == nil { - return Idle - } - return c.getState() + return cc.picker.State() } // WaitForStateChange blocks until the state changes to something other than the sourceState @@ -210,11 +208,7 @@ func (cc *ClientConn) State() ConnectivityState { // or timeout fires, and true otherwise. // This is EXPERIEMENTAL API. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { - c := cc.picker.Peek() - if c == nil { - return false - } - return c.waitForStateChange(timeout, sourceState) + return cc.picker.WaitForStateChange(timeout, sourceState) } // Close starts to tear down the ClientConn. @@ -317,16 +311,17 @@ func (cc *Conn) errorf(format string, a ...interface{}) { } } -// getState returns the connectivity state of the Conn -func (cc *Conn) getState() ConnectivityState { +// State returns the connectivity state of the Conn +func (cc *Conn) State() ConnectivityState { cc.mu.Lock() defer cc.mu.Unlock() return cc.state } -// waitForStateChange blocks until the state changes to something other than the sourceState +// WaitForStateChange blocks until the state changes to something other than the sourceState // or timeout fires. It returns false if timeout fires and true otherwise. -func (cc *Conn) waitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { +// TODO(zhaoq): Rewrite for complex Picker. +func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { start := time.Now() cc.mu.Lock() defer cc.mu.Unlock() @@ -402,6 +397,10 @@ func (cc *Conn) resetTransport(closeTransport bool) error { cc.errorf("transient failure: %v", err) cc.state = TransientFailure cc.stateCV.Broadcast() + if cc.ready != nil { + close(cc.ready) + cc.ready = nil + } cc.mu.Unlock() sleepTime -= time.Since(connectTime) if sleepTime < 0 { @@ -468,9 +467,8 @@ func (cc *Conn) transportMonitor() { } } -// When wait returns, either the new transport is up or ClientConn is -// closing. -func (cc *Conn) wait(ctx context.Context) (transport.ClientTransport, error) { +// Wait blocks until i) the new transport is up or ii) ctx is done or iii) +func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) { for { cc.mu.Lock() switch { @@ -480,6 +478,11 @@ func (cc *Conn) wait(ctx context.Context) (transport.ClientTransport, error) { case cc.state == Ready: cc.mu.Unlock() return cc.transport, nil + case cc.state == TransientFailure: + cc.mu.Unlock() + // Break out so that the caller gets chance to pick another transport to + // perform rpc instead of sticking to this transport. + return nil, ErrTransientFailure default: ready := cc.ready if ready == nil { diff --git a/picker.go b/picker.go index 7c633471..53ec317e 100644 --- a/picker.go +++ b/picker.go @@ -1,43 +1,50 @@ package grpc +import ( + "time" +) + // Picker picks a Conn for RPC requests. // This is EXPERIMENTAL and Please do not implement your own Picker for now. type Picker interface { // Pick returns the Conn to use for the upcoming RPC. It may return different // Conn's up to the implementation. Pick() (*Conn, error) - // Peek returns the Conn use use for the next upcoming RPC. It returns the same - // Conn until next time Pick gets invoked. - Peek() *Conn + State() ConnectivityState + WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool // Close closes all the Conn's owned by this Picker. Close() error } -func newSimplePicker(target string, dopts dialOptions) (Picker, error) { +func newUnicastPicker(target string, dopts dialOptions) (Picker, error) { c, err := NewConn(target, dopts) if err != nil { return nil, err } - return &simplePicker{ + return &unicastPicker{ conn: c, }, nil } -// simplePicker is default Picker which is used when there is no custom Picker +// unicastPicker is the default Picker which is used when there is no custom Picker // specified by users. It always picks the same Conn. -type simplePicker struct { +type unicastPicker struct { conn *Conn } -func (p *simplePicker) Pick() (*Conn, error) { +func (p *unicastPicker) Pick() (*Conn, error) { return p.conn, nil } -func (p *simplePicker) Peek() *Conn { - return p.conn +func (p *unicastPicker) State() ConnectivityState { + return p.conn.State() } -func (p *simplePicker) Close() error { +func (p *unicastPicker) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { + return p.conn.WaitForStateChange(timeout, sourceState) +} + +func (p *unicastPicker) Close() error { if p.conn != nil { return p.conn.Close() } diff --git a/stream.go b/stream.go index d66e1a41..a9d7c49c 100644 --- a/stream.go +++ b/stream.go @@ -96,9 +96,24 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { - conn, err := cc.picker.Pick() - if err != nil { - return nil, toRPCErr(err) + var ( + conn *Conn + t transport.ClientTransport + err error + ) + for { + conn, err = cc.picker.Pick() + if err != nil { + return nil, toRPCErr(err) + } + t, err = conn.Wait(ctx) + if err != nil { + if err == ErrTransientFailure { + continue + } + return nil, toRPCErr(err) + } + break } // TODO(zhaoq): CallOption is omitted. Add support when it is needed. callHdr := &transport.CallHdr{ @@ -118,10 +133,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false) } - t, err := conn.wait(ctx) - if err != nil { - return nil, toRPCErr(err) - } s, err := t.NewStream(ctx, callHdr) if err != nil { return nil, toRPCErr(err) From 23fea5c44a35f628e322125ceca92394b4d83639 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 25 Sep 2015 13:32:35 -0700 Subject: [PATCH 08/27] comments --- call.go | 8 -------- clientconn.go | 5 ++--- picker.go | 4 ++++ 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/call.go b/call.go index 2f77175f..5b64c243 100644 --- a/call.go +++ b/call.go @@ -116,10 +116,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli o.after(&c) } }() - //conn, err := cc.picker.Pick() - //if err != nil { - // return toRPCErr(err) - //} if EnableTracing { c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) defer c.traceInfo.tr.Finish() @@ -136,10 +132,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } - //callHdr := &transport.CallHdr{ - // Host: conn.authority, - // Method: method, - //} topts := &transport.Options{ Last: true, Delay: false, diff --git a/clientconn.go b/clientconn.go index 33994b16..1cf0a4c1 100644 --- a/clientconn.go +++ b/clientconn.go @@ -197,15 +197,14 @@ type ClientConn struct { picker Picker } -// State returns the connectivity state of the Conn used for next upcoming RPC. +// State returns the connectivity state of cc. // This is EXPERIMENTAL API. func (cc *ClientConn) State() ConnectivityState { return cc.picker.State() } // WaitForStateChange blocks until the state changes to something other than the sourceState -// or timeout fires on the Conn used for next upcoming RPC. It returns false if the Conn is nil -// or timeout fires, and true otherwise. +// or timeout fires on cc. It returns false if timeout fires, and true otherwise. // This is EXPERIEMENTAL API. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { return cc.picker.WaitForStateChange(timeout, sourceState) diff --git a/picker.go b/picker.go index 53ec317e..9f34fad4 100644 --- a/picker.go +++ b/picker.go @@ -10,7 +10,11 @@ type Picker interface { // Pick returns the Conn to use for the upcoming RPC. It may return different // Conn's up to the implementation. Pick() (*Conn, error) + // State returns the connectivity state of the underlying connections. State() ConnectivityState + // WaitForStateChange blocks until the state changes to something other than + // the sourceState or timeout fires on cc. It returns false if timeout fires, + // and true otherwise. WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool // Close closes all the Conn's owned by this Picker. Close() error From 2899844430ddc3ae3c244fead67c0472f04d03ac Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 25 Sep 2015 13:38:17 -0700 Subject: [PATCH 09/27] fix a typo --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index 1cf0a4c1..7186a23d 100644 --- a/clientconn.go +++ b/clientconn.go @@ -205,7 +205,7 @@ func (cc *ClientConn) State() ConnectivityState { // WaitForStateChange blocks until the state changes to something other than the sourceState // or timeout fires on cc. It returns false if timeout fires, and true otherwise. -// This is EXPERIEMENTAL API. +// This is EXPERIMENTAL API. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { return cc.picker.WaitForStateChange(timeout, sourceState) } From 6a445c544a75512573bbec15b154060af1ecfbf2 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 25 Sep 2015 16:10:16 -0700 Subject: [PATCH 10/27] document the no-deps design constraints --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 232ffaa1..ef66797a 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,10 @@ Prerequisites This requires Go 1.4 or above. +Constraints +----------- +The grpc package should only depend on standard Golang packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants. + Documentation ------------- You can find more detailed documentation and examples in the [examples directory](examples/). From d9d230278edc80a7774e19c356b0d2a285029bb9 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 25 Sep 2015 17:17:35 -0700 Subject: [PATCH 11/27] golang->go --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e648b2c9..94dc739e 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ This requires Go 1.4 or above. Constraints ----------- -The grpc package should only depend on standard Golang packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants. +The grpc package should only depend on standard Go packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants. Documentation ------------- From 86db82df28b6de13aa7657d709d8524a4ce17dfb Mon Sep 17 00:00:00 2001 From: Sameer Ajmani Date: Mon, 28 Sep 2015 10:22:26 -0400 Subject: [PATCH 12/27] add TODO to fix trace --- server.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index cd26c061..2ebde5d0 100644 --- a/server.go +++ b/server.go @@ -319,7 +319,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. return err } if traceInfo.tr != nil { - traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true) + // TODO: set payload.msg to something that + // prints usefully with %s; req is a []byte. + traceInfo.tr.LazyLog(&payload{sent: false}, true) } switch pf { case compressionNone: From 8b2dce3164688085c4eb7a59725db56110d0a3a0 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 28 Sep 2015 13:16:26 -0700 Subject: [PATCH 13/27] revise naming API --- naming/etcd/etcd.go | 106 ++++++++++++++++++++++++++++---------------- naming/naming.go | 14 +++--- 2 files changed, 72 insertions(+), 48 deletions(-) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index e140068a..2744a7a7 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -5,41 +5,73 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/naming" ) +// update defines an etcd key-value update. +type update struct { + key, val string +} + +// getNode reports the set of changes starting from node recursively. +func getNode(node *etcdcl.Node) (updates []*update) { + for _, v := range node.Nodes { + updates = append(updates, getNode(v)...) + } + if !node.Dir { + u := &update{ + key: node.Key, + val: node.Value, + } + updates = []*update{u} + } + return +} type watcher struct { wr etcdcl.Watcher ctx context.Context cancel context.CancelFunc + kv map[string]string } -func (w *watcher) Next() (*naming.Update, error) { +func (w *watcher) Next() (nu []*naming.Update, _ error) { for { resp, err := w.wr.Next(w.ctx) if err != nil { return nil, err } - if resp.Node.Dir { - continue - } - var act naming.OP - if resp.Action == "set" { - if resp.PrevNode == nil { - act = naming.Add - } else { - act = naming.Modify + updates := getNode(resp.Node) + for _, u := range updates { + switch resp.Action { + case "set": + if resp.PrevNode == nil { + w.kv[u.key] = u.val + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: u.val, + }) + } else { + nu = append(nu, &naming.Update{ + Op: naming.Delete, + Addr: w.kv[u.key], + }) + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: u.val, + }) + w.kv[u.key] = u.val + } + case "delete": + nu = append(nu, &naming.Update{ + Op: naming.Delete, + Addr: w.kv[u.key], + }) + delete(w.kv, u.key) } - } else if resp.Action == "delete" { - act = naming.Delete } - if act == naming.No { - continue + if len(nu) > 0 { + break } - return &naming.Update{ - Op: act, - Key: resp.Node.Key, - Val: resp.Node.Value, - }, nil } + return nu, nil } func (w *watcher) Stop() { @@ -48,41 +80,36 @@ func (w *watcher) Stop() { type resolver struct { kapi etcdcl.KeysAPI + kv map[string]string } func (r *resolver) NewWatcher(target string) naming.Watcher { ctx, cancel := context.WithCancel(context.Background()) - return &watcher{ + w := &watcher{ wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}), ctx: ctx, cancel: cancel, } - + for k, v := range r.kv { + w.kv[k] = v + } + return w } -// getNode reports the naming.Update starting from node recursively. -func getNode(node *etcdcl.Node) (updates []*naming.Update) { - for _, v := range node.Nodes { - updates = append(updates, getNode(v)...) - } - if !node.Dir { - entry := &naming.Update{ - Op: naming.Add, - Key: node.Key, - Val: node.Value, - } - updates = []*naming.Update{entry} - } - return -} - -func (r *resolver) Resolve(target string) ([]*naming.Update, error) { +func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) { resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true}) if err != nil { return nil, err } updates := getNode(resp.Node) - return updates, nil + for _, u := range updates { + r.kv[u.key] = u.val + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: u.val, + }) + } + return nu, nil } // NewResolver creates an etcd-based naming.Resolver. @@ -93,5 +120,6 @@ func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) { } return &resolver{ kapi: etcdcl.NewKeysAPI(c), + kv: make(map[string]string), }, nil } diff --git a/naming/naming.go b/naming/naming.go index a1fd3357..b2fbc0d8 100644 --- a/naming/naming.go +++ b/naming/naming.go @@ -4,14 +4,10 @@ package naming type OP uint8 const ( - // No indicates there are no changes. - No OP = iota // Add indicates a new address is added. - Add + Add = iota // Delete indicates an exisiting address is deleted. Delete - // Modify indicates an existing address is modified. - Modify ) type ServiceConfig interface{} @@ -20,8 +16,7 @@ type ServiceConfig interface{} type Update struct { // Op indicates the operation of the update. Op OP - Key string - Val string + Addr string Config ServiceConfig } @@ -36,8 +31,9 @@ type Resolver interface { // Watcher watches the updates for a particular target. type Watcher interface { - // Next blocks until an update or error happens. - Next() (*Update, error) + // Next blocks until an update or error happens. It may return one or more + // updates. + Next() ([]*Update, error) // Stop stops the Watcher. Stop() } From bc3c9fde77d7f13b9e9b8217ffadfc38d8c1bc68 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 28 Sep 2015 13:26:25 -0700 Subject: [PATCH 14/27] add license --- naming/etcd/etcd.go | 33 +++++++++++++++++++++++++++++++++ naming/naming.go | 35 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index 2744a7a7..362649da 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -1,3 +1,36 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + package etcd import ( diff --git a/naming/naming.go b/naming/naming.go index b2fbc0d8..610eb811 100644 --- a/naming/naming.go +++ b/naming/naming.go @@ -1,3 +1,38 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Package naming defines the naming API and related data structures for gRPC. +// The interface is EXPERIMENTAL and may be suject to change. package naming // OP defines the corresponding operations for a name resolution change. From 6fa4aea17e88a09fa1c283ff008e991bc8f28be8 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Mon, 28 Sep 2015 13:33:15 -0700 Subject: [PATCH 15/27] add missing license --- picker.go | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/picker.go b/picker.go index 9f34fad4..05759f3a 100644 --- a/picker.go +++ b/picker.go @@ -1,3 +1,36 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + package grpc import ( From c01ea6e3591de0f3055e8119f469427ec35cbfde Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 29 Sep 2015 10:24:03 -0700 Subject: [PATCH 16/27] revise Picker API --- call.go | 2 +- clientconn.go | 48 +++++++++++++++++++++++++----------------------- picker.go | 21 +++++++++++---------- stream.go | 2 +- 4 files changed, 38 insertions(+), 35 deletions(-) diff --git a/call.go b/call.go index 5b64c243..8b688091 100644 --- a/call.go +++ b/call.go @@ -150,7 +150,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if lastErr != nil && c.failFast { return toRPCErr(lastErr) } - conn, err = cc.picker.Pick() + conn, err = cc.dopts.picker.Pick() if err != nil { return toRPCErr(err) } diff --git a/clientconn.go b/clientconn.go index 7186a23d..ea3ccd0b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -66,7 +66,7 @@ var ( // established or re-established within the specified timeout. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") // ErrTransientFailure indicates the connection failed due to a transient error. - ErrTransientFailure = errors.New("transient connection failure") + ErrTransientFailure = errors.New("grpc: transient connection failure") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -145,18 +145,19 @@ func WithUserAgent(s string) DialOption { // Dial creates a client connection the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { - var dopts dialOptions + cc := &ClientConn{ + target: target, + } for _, opt := range opts { - opt(&dopts) + opt(&cc.dopts) } - if dopts.picker == nil { - p, err := newUnicastPicker(target, dopts) - if err != nil { - return nil, err - } - dopts.picker = p + if cc.dopts.picker == nil { + cc.dopts.picker = &unicastPicker{} } - return &ClientConn{dopts.picker}, nil + if err := cc.dopts.picker.Init(cc); err != nil { + return nil, err + } + return cc, nil } // ConnectivityState indicates the state of a client connection. @@ -194,25 +195,26 @@ func (s ConnectivityState) String() string { // ClientConn represents a client connection to an RPC service. type ClientConn struct { - picker Picker + target string + dopts dialOptions } // State returns the connectivity state of cc. // This is EXPERIMENTAL API. func (cc *ClientConn) State() ConnectivityState { - return cc.picker.State() + return cc.dopts.picker.State() } // WaitForStateChange blocks until the state changes to something other than the sourceState // or timeout fires on cc. It returns false if timeout fires, and true otherwise. // This is EXPERIMENTAL API. func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { - return cc.picker.WaitForStateChange(timeout, sourceState) + return cc.dopts.picker.WaitForStateChange(timeout, sourceState) } // Close starts to tear down the ClientConn. func (cc *ClientConn) Close() error { - return cc.picker.Close() + return cc.dopts.picker.Close() } // Conn is a client connection to a single destination. @@ -233,17 +235,17 @@ type Conn struct { } // NewConn creates a Conn. -func NewConn(target string, dopts dialOptions) (*Conn, error) { - if target == "" { +func NewConn(cc *ClientConn) (*Conn, error) { + if cc.target == "" { return nil, ErrUnspecTarget } c := &Conn{ - target: target, - dopts: dopts, + target: cc.target, + dopts: cc.dopts, shutdownChan: make(chan struct{}), } if EnableTracing { - c.events = trace.NewEventLog("grpc.ClientConn", target) + c.events = trace.NewEventLog("grpc.ClientConn", c.target) } if !c.dopts.insecure { var ok bool @@ -263,11 +265,11 @@ func NewConn(target string, dopts dialOptions) (*Conn, error) { } } } - colonPos := strings.LastIndex(target, ":") + colonPos := strings.LastIndex(c.target, ":") if colonPos == -1 { - colonPos = len(target) + colonPos = len(c.target) } - c.authority = target[:colonPos] + c.authority = c.target[:colonPos] if c.dopts.codec == nil { // Set the default codec. c.dopts.codec = protoCodec{} @@ -284,7 +286,7 @@ func NewConn(target string, dopts dialOptions) (*Conn, error) { // Start a goroutine connecting to the server asynchronously. go func() { if err := c.resetTransport(false); err != nil { - grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) + grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err) c.Close() return } diff --git a/picker.go b/picker.go index 05759f3a..79f98868 100644 --- a/picker.go +++ b/picker.go @@ -40,6 +40,8 @@ import ( // Picker picks a Conn for RPC requests. // This is EXPERIMENTAL and Please do not implement your own Picker for now. type Picker interface { + // Init does initial processing for the Picker, e.g., initiate some connections. + Init(cc *ClientConn) error // Pick returns the Conn to use for the upcoming RPC. It may return different // Conn's up to the implementation. Pick() (*Conn, error) @@ -53,22 +55,21 @@ type Picker interface { Close() error } -func newUnicastPicker(target string, dopts dialOptions) (Picker, error) { - c, err := NewConn(target, dopts) - if err != nil { - return nil, err - } - return &unicastPicker{ - conn: c, - }, nil -} - // unicastPicker is the default Picker which is used when there is no custom Picker // specified by users. It always picks the same Conn. type unicastPicker struct { conn *Conn } +func (p *unicastPicker) Init(cc *ClientConn) error { + c, err := NewConn(cc) + if err != nil { + return err + } + p.conn = c + return nil +} + func (p *unicastPicker) Pick() (*Conn, error) { return p.conn, nil } diff --git a/stream.go b/stream.go index a9d7c49c..605b873d 100644 --- a/stream.go +++ b/stream.go @@ -102,7 +102,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth err error ) for { - conn, err = cc.picker.Pick() + conn, err = cc.dopts.picker.Pick() if err != nil { return nil, toRPCErr(err) } From ce2ef8c9696e4de0c4e0bf049b9bc6c23d08ed48 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 29 Sep 2015 19:05:13 -0700 Subject: [PATCH 17/27] Enable payload tracing for unary rpc --- server.go | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/server.go b/server.go index ba68a215..c0728dce 100644 --- a/server.go +++ b/server.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, decodeFunc func([]byte, interface{}) error, buf []byte) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { @@ -320,16 +320,25 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return err } - if traceInfo.tr != nil { + //if traceInfo.tr != nil { // TODO: set payload.msg to something that // prints usefully with %s; req is a []byte. - traceInfo.tr.LazyLog(&payload{sent: false}, true) - } + //traceInfo.tr.LazyLog(&payload{sent: false}, true) + //} switch pf { case compressionNone: statusCode := codes.OK statusDesc := "" - reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req) + df := func(b []byte, v interface{}) error { + if err := s.opts.codec.Unmarshal(b, v); err != nil { + return err + } + if traceInfo.tr != nil { + traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) + } + return nil + } + reply, appErr := md.Handler(srv.server, ctx, df, req) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code From ef43a528c10e18bc77ed76b5d84379ceca813852 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 29 Sep 2015 19:07:43 -0700 Subject: [PATCH 18/27] example generated code --- benchmark/grpc_testing/test.pb.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go index 619c450c..5561c231 100644 --- a/benchmark/grpc_testing/test.pb.go +++ b/benchmark/grpc_testing/test.pb.go @@ -419,9 +419,10 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func([]byte, interface{}) error, buf []byte) (interface{}, error) { in := new(SimpleRequest) - if err := codec.Unmarshal(buf, in); err != nil { + //if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(buf, in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) From a27bb5b14ae74c8e9cb1a3385c96dd83042bc507 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 29 Sep 2015 19:08:55 -0700 Subject: [PATCH 19/27] remove dead code --- benchmark/grpc_testing/test.pb.go | 1 - server.go | 5 ----- 2 files changed, 6 deletions(-) diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go index 5561c231..d8511b85 100644 --- a/benchmark/grpc_testing/test.pb.go +++ b/benchmark/grpc_testing/test.pb.go @@ -421,7 +421,6 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func([]byte, interface{}) error, buf []byte) (interface{}, error) { in := new(SimpleRequest) - //if err := codec.Unmarshal(buf, in); err != nil { if err := decodeFunc(buf, in); err != nil { return nil, err } diff --git a/server.go b/server.go index c0728dce..0be82b7e 100644 --- a/server.go +++ b/server.go @@ -320,11 +320,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return err } - //if traceInfo.tr != nil { - // TODO: set payload.msg to something that - // prints usefully with %s; req is a []byte. - //traceInfo.tr.LazyLog(&payload{sent: false}, true) - //} switch pf { case compressionNone: statusCode := codes.OK From a71db53441b6d1cb85477c56894683f0f719e543 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 29 Sep 2015 20:10:20 -0700 Subject: [PATCH 20/27] remove slice param --- benchmark/grpc_testing/test.pb.go | 4 ++-- server.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go index d8511b85..889525ad 100644 --- a/benchmark/grpc_testing/test.pb.go +++ b/benchmark/grpc_testing/test.pb.go @@ -419,9 +419,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func([]byte, interface{}) error, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := decodeFunc(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/server.go b/server.go index 0be82b7e..6df688ce 100644 --- a/server.go +++ b/server.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, decodeFunc func([]byte, interface{}) error, buf []byte) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { @@ -324,8 +324,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. case compressionNone: statusCode := codes.OK statusDesc := "" - df := func(b []byte, v interface{}) error { - if err := s.opts.codec.Unmarshal(b, v); err != nil { + df := func(v interface{}) error { + if err := s.opts.codec.Unmarshal(req, v); err != nil { return err } if traceInfo.tr != nil { @@ -333,7 +333,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return nil } - reply, appErr := md.Handler(srv.server, ctx, df, req) + reply, appErr := md.Handler(srv.server, ctx, df) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code From 1d81cf4f2213b8029f0df1de9a7720ca5d566cf9 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 14:35:07 -0700 Subject: [PATCH 21/27] fix all the generated code --- examples/helloworld/helloworld/helloworld.pb.go | 4 ++-- examples/route_guide/routeguide/route_guide.pb.go | 4 ++-- health/grpc_health_v1alpha/health.pb.go | 4 ++-- interop/grpc_testing/test.pb.go | 8 ++++---- test/grpc_testing/test.pb.go | 8 ++++---- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/examples/helloworld/helloworld/helloworld.pb.go b/examples/helloworld/helloworld/helloworld.pb.go index 1ff931a3..f9390dbc 100644 --- a/examples/helloworld/helloworld/helloworld.pb.go +++ b/examples/helloworld/helloworld/helloworld.pb.go @@ -84,9 +84,9 @@ func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } -func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(HelloRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(GreeterServer).SayHello(ctx, in) diff --git a/examples/route_guide/routeguide/route_guide.pb.go b/examples/route_guide/routeguide/route_guide.pb.go index fcf5c748..487a4dae 100644 --- a/examples/route_guide/routeguide/route_guide.pb.go +++ b/examples/route_guide/routeguide/route_guide.pb.go @@ -310,9 +310,9 @@ func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) { s.RegisterService(&_RouteGuide_serviceDesc, srv) } -func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(Point) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(RouteGuideServer).GetFeature(ctx, in) diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go index c333a975..6b5ef31f 100644 --- a/health/grpc_health_v1alpha/health.pb.go +++ b/health/grpc_health_v1alpha/health.pb.go @@ -108,9 +108,9 @@ func RegisterHealthServer(s *grpc.Server, srv HealthServer) { s.RegisterService(&_Health_serviceDesc, srv) } -func _Health_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _Health_Check_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(HealthCheckRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(HealthServer).Check(ctx, in) diff --git a/interop/grpc_testing/test.pb.go b/interop/grpc_testing/test.pb.go index b25e98b8..26d1a44b 100755 --- a/interop/grpc_testing/test.pb.go +++ b/interop/grpc_testing/test.pb.go @@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(Empty) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).EmptyCall(ctx, in) @@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec return out, nil } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/test/grpc_testing/test.pb.go b/test/grpc_testing/test.pb.go index b25e98b8..26d1a44b 100644 --- a/test/grpc_testing/test.pb.go +++ b/test/grpc_testing/test.pb.go @@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(Empty) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).EmptyCall(ctx, in) @@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec return out, nil } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := decodeFunc(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) From a15e3b9562b16d2dd5caf686a79e23d3711b9893 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 16:26:00 -0700 Subject: [PATCH 22/27] decodeFunc -> dec --- benchmark/grpc_testing/test.pb.go | 4 ++-- examples/helloworld/helloworld/helloworld.pb.go | 4 ++-- examples/route_guide/routeguide/route_guide.pb.go | 4 ++-- health/grpc_health_v1alpha/health.pb.go | 4 ++-- interop/grpc_testing/test.pb.go | 8 ++++---- server.go | 2 +- test/grpc_testing/test.pb.go | 8 ++++---- 7 files changed, 17 insertions(+), 17 deletions(-) diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go index 889525ad..74e13c9e 100644 --- a/benchmark/grpc_testing/test.pb.go +++ b/benchmark/grpc_testing/test.pb.go @@ -419,9 +419,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/examples/helloworld/helloworld/helloworld.pb.go b/examples/helloworld/helloworld/helloworld.pb.go index f9390dbc..366b23b6 100644 --- a/examples/helloworld/helloworld/helloworld.pb.go +++ b/examples/helloworld/helloworld/helloworld.pb.go @@ -84,9 +84,9 @@ func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } -func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(HelloRequest) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(GreeterServer).SayHello(ctx, in) diff --git a/examples/route_guide/routeguide/route_guide.pb.go b/examples/route_guide/routeguide/route_guide.pb.go index 487a4dae..9ac9029a 100644 --- a/examples/route_guide/routeguide/route_guide.pb.go +++ b/examples/route_guide/routeguide/route_guide.pb.go @@ -310,9 +310,9 @@ func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) { s.RegisterService(&_RouteGuide_serviceDesc, srv) } -func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(Point) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(RouteGuideServer).GetFeature(ctx, in) diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go index 6b5ef31f..96eba6f8 100644 --- a/health/grpc_health_v1alpha/health.pb.go +++ b/health/grpc_health_v1alpha/health.pb.go @@ -108,9 +108,9 @@ func RegisterHealthServer(s *grpc.Server, srv HealthServer) { s.RegisterService(&_Health_serviceDesc, srv) } -func _Health_Check_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(HealthCheckRequest) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(HealthServer).Check(ctx, in) diff --git a/interop/grpc_testing/test.pb.go b/interop/grpc_testing/test.pb.go index 26d1a44b..bd492fef 100755 --- a/interop/grpc_testing/test.pb.go +++ b/interop/grpc_testing/test.pb.go @@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(Empty) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).EmptyCall(ctx, in) @@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, decode return out, nil } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/server.go b/server.go index 6df688ce..181e38cc 100644 --- a/server.go +++ b/server.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { diff --git a/test/grpc_testing/test.pb.go b/test/grpc_testing/test.pb.go index 26d1a44b..bd492fef 100644 --- a/test/grpc_testing/test.pb.go +++ b/test/grpc_testing/test.pb.go @@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(Empty) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).EmptyCall(ctx, in) @@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, decode return out, nil } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, decodeFunc func(interface{}) error) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := decodeFunc(in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) From 2bd5f5b1e1f5abeceb60ca32d8bc62fff9b47d38 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 17:43:05 -0700 Subject: [PATCH 23/27] show RemoteAddr in server trace --- server.go | 2 ++ transport/http2_server.go | 4 ++++ transport/transport.go | 2 ++ 3 files changed, 8 insertions(+) diff --git a/server.go b/server.go index 181e38cc..e817f386 100644 --- a/server.go +++ b/server.go @@ -291,6 +291,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) defer traceInfo.tr.Finish() traceInfo.firstLine.client = false + traceInfo.firstLine.remoteAddr = t.RemoteAddr() traceInfo.tr.LazyLog(&traceInfo.firstLine, false) ctx = trace.NewContext(ctx, traceInfo.tr) defer func() { @@ -397,6 +398,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if ss.tracing { ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.firstLine.client = false + ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr() ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) defer func() { diff --git a/transport/http2_server.go b/transport/http2_server.go index 057d9368..c9a2a36b 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -689,3 +689,7 @@ func (t *http2Server) closeStream(s *Stream) { // other goroutines. s.cancel() } + +func (t *http2Server) RemoteAddr() net.Addr { + return t.conn.RemoteAddr() +} diff --git a/transport/transport.go b/transport/transport.go index 2dd38a83..d33f2de7 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -390,6 +390,8 @@ type ServerTransport interface { // should not be accessed any more. All the pending streams and their // handlers will be terminated asynchronously. Close() error + // RemoteAddr returns the remote network address. + RemoteAddr() net.Addr } // StreamErrorf creates an StreamError with the specified error code and description. From 00c7deef344414c20e199e2f7af42fc5d935eade Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 18:07:38 -0700 Subject: [PATCH 24/27] show deadline on server side trace --- server.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/server.go b/server.go index e817f386..08e2ea98 100644 --- a/server.go +++ b/server.go @@ -292,6 +292,9 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. defer traceInfo.tr.Finish() traceInfo.firstLine.client = false traceInfo.firstLine.remoteAddr = t.RemoteAddr() + if dl, ok := ctx.Deadline(); ok { + traceInfo.firstLine.deadline = dl.Sub(time.Now()) + } traceInfo.tr.LazyLog(&traceInfo.firstLine, false) ctx = trace.NewContext(ctx, traceInfo.tr) defer func() { @@ -399,6 +402,9 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.firstLine.client = false ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr() + if dl, ok := ctx.Deadline(); ok { + traceInfo.firstLine.deadline = dl.Sub(time.Now()) + } ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) defer func() { From d12126f864d2660829b33a77bdf2cd3a578b9935 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 18:08:09 -0700 Subject: [PATCH 25/27] show deadline on server side trace --- server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server.go b/server.go index 08e2ea98..049e8099 100644 --- a/server.go +++ b/server.go @@ -403,7 +403,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.traceInfo.firstLine.client = false ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr() if dl, ok := ctx.Deadline(); ok { - traceInfo.firstLine.deadline = dl.Sub(time.Now()) + ss.traceInfo.firstLine.deadline = dl.Sub(time.Now()) } ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) From ffba849039fcce986e367f41acbba92ade35bd68 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 18:20:59 -0700 Subject: [PATCH 26/27] add the missing commit in previous PR --- server.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server.go b/server.go index 049e8099..ee44d1e7 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ import ( "runtime" "strings" "sync" + "time" "golang.org/x/net/context" "golang.org/x/net/trace" From 8555e5d5a137d0a30deb5f07a84d30d9c432a39d Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 2 Oct 2015 10:52:53 -0700 Subject: [PATCH 27/27] try to address the issues caused by 2 repo names --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 25035601..d4ca5329 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,9 @@ language: go +install: + - export GOPATH="$HOME/gopath" + - mkdir -p "$GOPATH/src/google.golang.org" + - mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc" + script: - make test testrace