diff --git a/.travis.yml b/.travis.yml index b0ce1f7d..3f83776e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,7 +6,6 @@ before_install: - go get golang.org/x/tools/cmd/cover install: - - export GOPATH="$HOME/gopath" - mkdir -p "$GOPATH/src/google.golang.org" - mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc" diff --git a/README.md b/README.md index 60d5c5d2..94dc739e 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,10 @@ Prerequisites This requires Go 1.4 or above. +Constraints +----------- +The grpc package should only depend on standard Go packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants. + Documentation ------------- You can find more detailed documentation and examples in the [examples directory](examples/). diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go index 619c450c..74e13c9e 100644 --- a/benchmark/grpc_testing/test.pb.go +++ b/benchmark/grpc_testing/test.pb.go @@ -419,9 +419,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/call.go b/call.go index 0115a28d..8b688091 100644 --- a/call.go +++ b/call.go @@ -116,7 +116,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli o.after(&c) } }() - if EnableTracing { c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) defer c.traceInfo.tr.Finish() @@ -133,10 +132,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } - callHdr := &transport.CallHdr{ - Host: cc.authority, - Method: method, - } topts := &transport.Options{ Last: true, Delay: false, @@ -149,13 +144,25 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli err error t transport.ClientTransport stream *transport.Stream + conn *Conn ) // TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs. if lastErr != nil && c.failFast { return toRPCErr(lastErr) } - t, err = cc.wait(ctx) + conn, err = cc.dopts.picker.Pick() if err != nil { + return toRPCErr(err) + } + callHdr := &transport.CallHdr{ + Host: conn.authority, + Method: method, + } + t, err = conn.Wait(ctx) + if err != nil { + if err == ErrTransientFailure { + continue + } if lastErr != nil { // This was a retry; return the error from the last attempt. return toRPCErr(lastErr) @@ -165,7 +172,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if c.traceInfo.tr != nil { c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) } - stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts) + stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts) if err != nil { if _, ok := err.(transport.ConnectionError); ok { lastErr = err @@ -177,7 +184,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli return toRPCErr(err) } // Receive the response - lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply) + lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply) if _, ok := lastErr.(transport.ConnectionError); ok { continue } diff --git a/clientconn.go b/clientconn.go index 87f302fc..ea3ccd0b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -65,6 +65,8 @@ var ( // ErrClientConnTimeout indicates that the connection could not be // established or re-established within the specified timeout. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") + // ErrTransientFailure indicates the connection failed due to a transient error. + ErrTransientFailure = errors.New("grpc: transient connection failure") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -73,6 +75,7 @@ var ( // values passed to Dial. type dialOptions struct { codec Codec + picker Picker block bool insecure bool copts transport.ConnectOptions @@ -142,90 +145,21 @@ func WithUserAgent(s string) DialOption { // Dial creates a client connection the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { - if target == "" { - return nil, ErrUnspecTarget - } cc := &ClientConn{ - target: target, - shutdownChan: make(chan struct{}), - } - if EnableTracing { - cc.events = trace.NewEventLog("grpc.ClientConn", target) + target: target, } for _, opt := range opts { opt(&cc.dopts) } - if !cc.dopts.insecure { - var ok bool - for _, c := range cc.dopts.copts.AuthOptions { - if _, ok := c.(credentials.TransportAuthenticator); !ok { - continue - } - ok = true - } - if !ok { - return nil, ErrNoTransportSecurity - } - } else { - for _, c := range cc.dopts.copts.AuthOptions { - if c.RequireTransportSecurity() { - return nil, ErrCredentialsMisuse - } - } + if cc.dopts.picker == nil { + cc.dopts.picker = &unicastPicker{} } - colonPos := strings.LastIndex(target, ":") - if colonPos == -1 { - colonPos = len(target) - } - cc.authority = target[:colonPos] - if cc.dopts.codec == nil { - // Set the default codec. - cc.dopts.codec = protoCodec{} - } - cc.stateCV = sync.NewCond(&cc.mu) - if cc.dopts.block { - if err := cc.resetTransport(false); err != nil { - cc.mu.Lock() - cc.errorf("dial failed: %v", err) - cc.mu.Unlock() - cc.Close() - return nil, err - } - // Start to monitor the error status of transport. - go cc.transportMonitor() - } else { - // Start a goroutine connecting to the server asynchronously. - go func() { - if err := cc.resetTransport(false); err != nil { - cc.mu.Lock() - cc.errorf("dial failed: %v", err) - cc.mu.Unlock() - grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) - cc.Close() - return - } - go cc.transportMonitor() - }() + if err := cc.dopts.picker.Init(cc); err != nil { + return nil, err } return cc, nil } -// printf records an event in cc's event log, unless cc has been closed. -// REQUIRES cc.mu is held. -func (cc *ClientConn) printf(format string, a ...interface{}) { - if cc.events != nil { - cc.events.Printf(format, a...) - } -} - -// errorf records an error in cc's event log, unless cc has been closed. -// REQUIRES cc.mu is held. -func (cc *ClientConn) errorf(format string, a ...interface{}) { - if cc.events != nil { - cc.events.Errorf(format, a...) - } -} - // ConnectivityState indicates the state of a client connection. type ConnectivityState int @@ -261,6 +195,30 @@ func (s ConnectivityState) String() string { // ClientConn represents a client connection to an RPC service. type ClientConn struct { + target string + dopts dialOptions +} + +// State returns the connectivity state of cc. +// This is EXPERIMENTAL API. +func (cc *ClientConn) State() ConnectivityState { + return cc.dopts.picker.State() +} + +// WaitForStateChange blocks until the state changes to something other than the sourceState +// or timeout fires on cc. It returns false if timeout fires, and true otherwise. +// This is EXPERIMENTAL API. +func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { + return cc.dopts.picker.WaitForStateChange(timeout, sourceState) +} + +// Close starts to tear down the ClientConn. +func (cc *ClientConn) Close() error { + return cc.dopts.picker.Close() +} + +// Conn is a client connection to a single destination. +type Conn struct { target string authority string dopts dialOptions @@ -276,8 +234,86 @@ type ClientConn struct { transport transport.ClientTransport } -// State returns the connectivity state of the ClientConn -func (cc *ClientConn) State() ConnectivityState { +// NewConn creates a Conn. +func NewConn(cc *ClientConn) (*Conn, error) { + if cc.target == "" { + return nil, ErrUnspecTarget + } + c := &Conn{ + target: cc.target, + dopts: cc.dopts, + shutdownChan: make(chan struct{}), + } + if EnableTracing { + c.events = trace.NewEventLog("grpc.ClientConn", c.target) + } + if !c.dopts.insecure { + var ok bool + for _, cd := range c.dopts.copts.AuthOptions { + if _, ok := cd.(credentials.TransportAuthenticator); !ok { + continue + } + ok = true + } + if !ok { + return nil, ErrNoTransportSecurity + } + } else { + for _, cd := range c.dopts.copts.AuthOptions { + if cd.RequireTransportSecurity() { + return nil, ErrCredentialsMisuse + } + } + } + colonPos := strings.LastIndex(c.target, ":") + if colonPos == -1 { + colonPos = len(c.target) + } + c.authority = c.target[:colonPos] + if c.dopts.codec == nil { + // Set the default codec. + c.dopts.codec = protoCodec{} + } + c.stateCV = sync.NewCond(&c.mu) + if c.dopts.block { + if err := c.resetTransport(false); err != nil { + c.Close() + return nil, err + } + // Start to monitor the error status of transport. + go c.transportMonitor() + } else { + // Start a goroutine connecting to the server asynchronously. + go func() { + if err := c.resetTransport(false); err != nil { + grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err) + c.Close() + return + } + go c.transportMonitor() + }() + } + return c, nil +} + +// printf records an event in cc's event log, unless cc has been closed. +// REQUIRES cc.mu is held. +func (cc *Conn) printf(format string, a ...interface{}) { + if cc.events != nil { + cc.events.Printf(format, a...) + } +} + +// errorf records an error in cc's event log, unless cc has been closed. +// REQUIRES cc.mu is held. +func (cc *Conn) errorf(format string, a ...interface{}) { + if cc.events != nil { + cc.events.Errorf(format, a...) + } +} + +// State returns the connectivity state of the Conn +func (cc *Conn) State() ConnectivityState { cc.mu.Lock() defer cc.mu.Unlock() return cc.state @@ -285,7 +321,8 @@ func (cc *ClientConn) State() ConnectivityState { // WaitForStateChange blocks until the state changes to something other than the sourceState // or timeout fires. It returns false if timeout fires and true otherwise. -func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { +// TODO(zhaoq): Rewrite for complex Picker. +func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { start := time.Now() cc.mu.Lock() defer cc.mu.Unlock() @@ -317,7 +354,7 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn return true } -func (cc *ClientConn) resetTransport(closeTransport bool) error { +func (cc *Conn) resetTransport(closeTransport bool) error { var retries int start := time.Now() for { @@ -361,6 +398,10 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { cc.errorf("transient failure: %v", err) cc.state = TransientFailure cc.stateCV.Broadcast() + if cc.ready != nil { + close(cc.ready) + cc.ready = nil + } cc.mu.Unlock() sleepTime -= time.Since(connectTime) if sleepTime < 0 { @@ -402,7 +443,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { // Run in a goroutine to track the error in transport and create the // new transport if an error happens. It returns when the channel is closing. -func (cc *ClientConn) transportMonitor() { +func (cc *Conn) transportMonitor() { for { select { // shutdownChan is needed to detect the teardown when @@ -427,9 +468,8 @@ func (cc *ClientConn) transportMonitor() { } } -// When wait returns, either the new transport is up or ClientConn is -// closing. -func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, error) { +// Wait blocks until i) the new transport is up or ii) ctx is done or iii) +func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) { for { cc.mu.Lock() switch { @@ -439,6 +479,11 @@ func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, erro case cc.state == Ready: cc.mu.Unlock() return cc.transport, nil + case cc.state == TransientFailure: + cc.mu.Unlock() + // Break out so that the caller gets chance to pick another transport to + // perform rpc instead of sticking to this transport. + return nil, ErrTransientFailure default: ready := cc.ready if ready == nil { @@ -456,12 +501,12 @@ func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, erro } } -// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if +// Close starts to tear down the Conn. Returns ErrClientConnClosing if // it has been closed (mostly due to dial time-out). // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many ClientConn's in a // tight loop. -func (cc *ClientConn) Close() error { +func (cc *Conn) Close() error { cc.mu.Lock() defer cc.mu.Unlock() if cc.state == Shutdown { diff --git a/examples/helloworld/helloworld/helloworld.pb.go b/examples/helloworld/helloworld/helloworld.pb.go index 1ff931a3..366b23b6 100644 --- a/examples/helloworld/helloworld/helloworld.pb.go +++ b/examples/helloworld/helloworld/helloworld.pb.go @@ -84,9 +84,9 @@ func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) { s.RegisterService(&_Greeter_serviceDesc, srv) } -func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(HelloRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(GreeterServer).SayHello(ctx, in) diff --git a/examples/route_guide/routeguide/route_guide.pb.go b/examples/route_guide/routeguide/route_guide.pb.go index fcf5c748..9ac9029a 100644 --- a/examples/route_guide/routeguide/route_guide.pb.go +++ b/examples/route_guide/routeguide/route_guide.pb.go @@ -310,9 +310,9 @@ func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) { s.RegisterService(&_RouteGuide_serviceDesc, srv) } -func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(Point) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(RouteGuideServer).GetFeature(ctx, in) diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go index c333a975..96eba6f8 100644 --- a/health/grpc_health_v1alpha/health.pb.go +++ b/health/grpc_health_v1alpha/health.pb.go @@ -108,9 +108,9 @@ func RegisterHealthServer(s *grpc.Server, srv HealthServer) { s.RegisterService(&_Health_serviceDesc, srv) } -func _Health_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(HealthCheckRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(HealthServer).Check(ctx, in) diff --git a/interop/grpc_testing/test.pb.go b/interop/grpc_testing/test.pb.go index b25e98b8..bd492fef 100755 --- a/interop/grpc_testing/test.pb.go +++ b/interop/grpc_testing/test.pb.go @@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(Empty) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).EmptyCall(ctx, in) @@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec return out, nil } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go index e140068a..362649da 100644 --- a/naming/etcd/etcd.go +++ b/naming/etcd/etcd.go @@ -1,3 +1,36 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + package etcd import ( @@ -5,41 +38,73 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/naming" ) +// update defines an etcd key-value update. +type update struct { + key, val string +} + +// getNode reports the set of changes starting from node recursively. +func getNode(node *etcdcl.Node) (updates []*update) { + for _, v := range node.Nodes { + updates = append(updates, getNode(v)...) + } + if !node.Dir { + u := &update{ + key: node.Key, + val: node.Value, + } + updates = []*update{u} + } + return +} type watcher struct { wr etcdcl.Watcher ctx context.Context cancel context.CancelFunc + kv map[string]string } -func (w *watcher) Next() (*naming.Update, error) { +func (w *watcher) Next() (nu []*naming.Update, _ error) { for { resp, err := w.wr.Next(w.ctx) if err != nil { return nil, err } - if resp.Node.Dir { - continue - } - var act naming.OP - if resp.Action == "set" { - if resp.PrevNode == nil { - act = naming.Add - } else { - act = naming.Modify + updates := getNode(resp.Node) + for _, u := range updates { + switch resp.Action { + case "set": + if resp.PrevNode == nil { + w.kv[u.key] = u.val + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: u.val, + }) + } else { + nu = append(nu, &naming.Update{ + Op: naming.Delete, + Addr: w.kv[u.key], + }) + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: u.val, + }) + w.kv[u.key] = u.val + } + case "delete": + nu = append(nu, &naming.Update{ + Op: naming.Delete, + Addr: w.kv[u.key], + }) + delete(w.kv, u.key) } - } else if resp.Action == "delete" { - act = naming.Delete } - if act == naming.No { - continue + if len(nu) > 0 { + break } - return &naming.Update{ - Op: act, - Key: resp.Node.Key, - Val: resp.Node.Value, - }, nil } + return nu, nil } func (w *watcher) Stop() { @@ -48,41 +113,36 @@ func (w *watcher) Stop() { type resolver struct { kapi etcdcl.KeysAPI + kv map[string]string } func (r *resolver) NewWatcher(target string) naming.Watcher { ctx, cancel := context.WithCancel(context.Background()) - return &watcher{ + w := &watcher{ wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}), ctx: ctx, cancel: cancel, } - + for k, v := range r.kv { + w.kv[k] = v + } + return w } -// getNode reports the naming.Update starting from node recursively. -func getNode(node *etcdcl.Node) (updates []*naming.Update) { - for _, v := range node.Nodes { - updates = append(updates, getNode(v)...) - } - if !node.Dir { - entry := &naming.Update{ - Op: naming.Add, - Key: node.Key, - Val: node.Value, - } - updates = []*naming.Update{entry} - } - return -} - -func (r *resolver) Resolve(target string) ([]*naming.Update, error) { +func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) { resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true}) if err != nil { return nil, err } updates := getNode(resp.Node) - return updates, nil + for _, u := range updates { + r.kv[u.key] = u.val + nu = append(nu, &naming.Update{ + Op: naming.Add, + Addr: u.val, + }) + } + return nu, nil } // NewResolver creates an etcd-based naming.Resolver. @@ -93,5 +153,6 @@ func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) { } return &resolver{ kapi: etcdcl.NewKeysAPI(c), + kv: make(map[string]string), }, nil } diff --git a/naming/naming.go b/naming/naming.go index a1fd3357..610eb811 100644 --- a/naming/naming.go +++ b/naming/naming.go @@ -1,17 +1,48 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +// Package naming defines the naming API and related data structures for gRPC. +// The interface is EXPERIMENTAL and may be suject to change. package naming // OP defines the corresponding operations for a name resolution change. type OP uint8 const ( - // No indicates there are no changes. - No OP = iota // Add indicates a new address is added. - Add + Add = iota // Delete indicates an exisiting address is deleted. Delete - // Modify indicates an existing address is modified. - Modify ) type ServiceConfig interface{} @@ -20,8 +51,7 @@ type ServiceConfig interface{} type Update struct { // Op indicates the operation of the update. Op OP - Key string - Val string + Addr string Config ServiceConfig } @@ -36,8 +66,9 @@ type Resolver interface { // Watcher watches the updates for a particular target. type Watcher interface { - // Next blocks until an update or error happens. - Next() (*Update, error) + // Next blocks until an update or error happens. It may return one or more + // updates. + Next() ([]*Update, error) // Stop stops the Watcher. Stop() } diff --git a/picker.go b/picker.go new file mode 100644 index 00000000..79f98868 --- /dev/null +++ b/picker.go @@ -0,0 +1,90 @@ +/* + * + * Copyright 2014, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package grpc + +import ( + "time" +) + +// Picker picks a Conn for RPC requests. +// This is EXPERIMENTAL and Please do not implement your own Picker for now. +type Picker interface { + // Init does initial processing for the Picker, e.g., initiate some connections. + Init(cc *ClientConn) error + // Pick returns the Conn to use for the upcoming RPC. It may return different + // Conn's up to the implementation. + Pick() (*Conn, error) + // State returns the connectivity state of the underlying connections. + State() ConnectivityState + // WaitForStateChange blocks until the state changes to something other than + // the sourceState or timeout fires on cc. It returns false if timeout fires, + // and true otherwise. + WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool + // Close closes all the Conn's owned by this Picker. + Close() error +} + +// unicastPicker is the default Picker which is used when there is no custom Picker +// specified by users. It always picks the same Conn. +type unicastPicker struct { + conn *Conn +} + +func (p *unicastPicker) Init(cc *ClientConn) error { + c, err := NewConn(cc) + if err != nil { + return err + } + p.conn = c + return nil +} + +func (p *unicastPicker) Pick() (*Conn, error) { + return p.conn, nil +} + +func (p *unicastPicker) State() ConnectivityState { + return p.conn.State() +} + +func (p *unicastPicker) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { + return p.conn.WaitForStateChange(timeout, sourceState) +} + +func (p *unicastPicker) Close() error { + if p.conn != nil { + return p.conn.Close() + } + return nil +} diff --git a/server.go b/server.go index 274f7329..ee44d1e7 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ import ( "runtime" "strings" "sync" + "time" "golang.org/x/net/context" "golang.org/x/net/trace" @@ -52,7 +53,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { @@ -284,12 +285,19 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() var traceInfo traceInfo if EnableTracing { traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) defer traceInfo.tr.Finish() traceInfo.firstLine.client = false + traceInfo.firstLine.remoteAddr = t.RemoteAddr() + if dl, ok := ctx.Deadline(); ok { + traceInfo.firstLine.deadline = dl.Sub(time.Now()) + } traceInfo.tr.LazyLog(&traceInfo.firstLine, false) + ctx = trace.NewContext(ctx, traceInfo.tr) defer func() { if err != nil && err != io.EOF { traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) @@ -317,14 +325,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return err } - if traceInfo.tr != nil { - traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true) - } switch pf { case compressionNone: statusCode := codes.OK statusDesc := "" - reply, appErr := md.Handler(srv.server, stream.Context(), s.opts.codec, req) + df := func(v interface{}) error { + if err := s.opts.codec.Unmarshal(req, v); err != nil { + return err + } + if traceInfo.tr != nil { + traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) + } + return nil + } + reply, appErr := md.Handler(srv.server, ctx, df) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -333,12 +347,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. statusCode = convertCode(appErr) statusDesc = appErr.Error() } + if traceInfo.tr != nil && statusCode != codes.OK { + traceInfo.tr.LazyLog(stringer(statusDesc), true) + traceInfo.tr.SetError() + } + if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil { grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err) return err } return nil } + if traceInfo.tr != nil { + traceInfo.tr.LazyLog(stringer("OK"), false) + } opts := &transport.Options{ Last: true, Delay: false, @@ -367,9 +389,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) { + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() ss := &serverStream{ t: t, s: stream, + ctx: ctx, p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, @@ -377,7 +402,12 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp if ss.tracing { ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.firstLine.client = false + ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr() + if dl, ok := ctx.Deadline(); ok { + ss.traceInfo.firstLine.deadline = dl.Sub(time.Now()) + } ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) + ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) defer func() { ss.mu.Lock() if err != nil && err != io.EOF { @@ -398,6 +428,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss.statusDesc = appErr.Error() } } + if ss.tracing { + ss.mu.Lock() + if ss.statusCode != codes.OK { + ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true) + ss.traceInfo.tr.SetError() + } else { + ss.traceInfo.tr.LazyLog(stringer("OK"), false) + } + ss.mu.Unlock() + } return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc) } diff --git a/stream.go b/stream.go index e14664cb..34774f03 100644 --- a/stream.go +++ b/stream.go @@ -96,14 +96,33 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { + var ( + conn *Conn + t transport.ClientTransport + err error + ) + for { + conn, err = cc.dopts.picker.Pick() + if err != nil { + return nil, toRPCErr(err) + } + t, err = conn.Wait(ctx) + if err != nil { + if err == ErrTransientFailure { + continue + } + return nil, toRPCErr(err) + } + break + } // TODO(zhaoq): CallOption is omitted. Add support when it is needed. callHdr := &transport.CallHdr{ - Host: cc.authority, + Host: conn.authority, Method: method, } cs := &clientStream{ desc: desc, - codec: cc.dopts.codec, + codec: conn.dopts.codec, tracing: EnableTracing, } if cs.tracing { @@ -113,10 +132,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) } cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false) - } - t, err := cc.wait(ctx) - if err != nil { - return nil, toRPCErr(err) + ctx = trace.NewContext(ctx, cs.traceInfo.tr) } s, err := t.NewStream(ctx, callHdr) if err != nil { @@ -278,6 +294,7 @@ type ServerStream interface { type serverStream struct { t transport.ServerTransport s *transport.Stream + ctx context.Context // provides trace.FromContext when tracing p *parser codec Codec statusCode codes.Code @@ -292,7 +309,7 @@ type serverStream struct { } func (ss *serverStream) Context() context.Context { - return ss.s.Context() + return ss.ctx } func (ss *serverStream) SendHeader(md metadata.MD) error { @@ -317,7 +334,6 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) ss.traceInfo.tr.SetError() } - ss.mu.Unlock() } }() diff --git a/test/grpc_testing/test.pb.go b/test/grpc_testing/test.pb.go index b25e98b8..bd492fef 100644 --- a/test/grpc_testing/test.pb.go +++ b/test/grpc_testing/test.pb.go @@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) { s.RegisterService(&_TestService_serviceDesc, srv) } -func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(Empty) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).EmptyCall(ctx, in) @@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec return out, nil } -func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) { +func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) { in := new(SimpleRequest) - if err := codec.Unmarshal(buf, in); err != nil { + if err := dec(in); err != nil { return nil, err } out, err := srv.(TestServiceServer).UnaryCall(ctx, in) diff --git a/trace.go b/trace.go index 24635740..cde04fbf 100644 --- a/trace.go +++ b/trace.go @@ -114,3 +114,7 @@ type fmtStringer struct { func (f *fmtStringer) String() string { return fmt.Sprintf(f.format, f.a...) } + +type stringer string + +func (s stringer) String() string { return string(s) } diff --git a/transport/http2_server.go b/transport/http2_server.go index 057d9368..c9a2a36b 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -689,3 +689,7 @@ func (t *http2Server) closeStream(s *Stream) { // other goroutines. s.cancel() } + +func (t *http2Server) RemoteAddr() net.Addr { + return t.conn.RemoteAddr() +} diff --git a/transport/transport.go b/transport/transport.go index 2dd38a83..d33f2de7 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -390,6 +390,8 @@ type ServerTransport interface { // should not be accessed any more. All the pending streams and their // handlers will be terminated asynchronously. Close() error + // RemoteAddr returns the remote network address. + RemoteAddr() net.Addr } // StreamErrorf creates an StreamError with the specified error code and description.