fix travis

This commit is contained in:
iamqizhao
2015-10-02 11:14:38 -07:00
18 changed files with 473 additions and 170 deletions

View File

@ -6,7 +6,6 @@ before_install:
- go get golang.org/x/tools/cmd/cover
install:
- export GOPATH="$HOME/gopath"
- mkdir -p "$GOPATH/src/google.golang.org"
- mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc"

View File

@ -18,6 +18,10 @@ Prerequisites
This requires Go 1.4 or above.
Constraints
-----------
The grpc package should only depend on standard Go packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants.
Documentation
-------------
You can find more detailed documentation and examples in the [examples directory](examples/).

View File

@ -419,9 +419,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
s.RegisterService(&_TestService_serviceDesc, srv)
}
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(SimpleRequest)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).UnaryCall(ctx, in)

23
call.go
View File

@ -116,7 +116,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
o.after(&c)
}
}()
if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
@ -133,10 +132,6 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,
}
topts := &transport.Options{
Last: true,
Delay: false,
@ -149,13 +144,25 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err error
t transport.ClientTransport
stream *transport.Stream
conn *Conn
)
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
}
t, err = cc.wait(ctx)
conn, err = cc.dopts.picker.Pick()
if err != nil {
return toRPCErr(err)
}
callHdr := &transport.CallHdr{
Host: conn.authority,
Method: method,
}
t, err = conn.Wait(ctx)
if err != nil {
if err == ErrTransientFailure {
continue
}
if lastErr != nil {
// This was a retry; return the error from the last attempt.
return toRPCErr(lastErr)
@ -165,7 +172,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts)
if err != nil {
if _, ok := err.(transport.ConnectionError); ok {
lastErr = err
@ -177,7 +184,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return toRPCErr(err)
}
// Receive the response
lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply)
if _, ok := lastErr.(transport.ConnectionError); ok {
continue
}

View File

@ -65,6 +65,8 @@ var (
// ErrClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
// ErrTransientFailure indicates the connection failed due to a transient error.
ErrTransientFailure = errors.New("grpc: transient connection failure")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@ -73,6 +75,7 @@ var (
// values passed to Dial.
type dialOptions struct {
codec Codec
picker Picker
block bool
insecure bool
copts transport.ConnectOptions
@ -142,90 +145,21 @@ func WithUserAgent(s string) DialOption {
// Dial creates a client connection the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
if target == "" {
return nil, ErrUnspecTarget
}
cc := &ClientConn{
target: target,
shutdownChan: make(chan struct{}),
}
if EnableTracing {
cc.events = trace.NewEventLog("grpc.ClientConn", target)
target: target,
}
for _, opt := range opts {
opt(&cc.dopts)
}
if !cc.dopts.insecure {
var ok bool
for _, c := range cc.dopts.copts.AuthOptions {
if _, ok := c.(credentials.TransportAuthenticator); !ok {
continue
}
ok = true
}
if !ok {
return nil, ErrNoTransportSecurity
}
} else {
for _, c := range cc.dopts.copts.AuthOptions {
if c.RequireTransportSecurity() {
return nil, ErrCredentialsMisuse
}
}
if cc.dopts.picker == nil {
cc.dopts.picker = &unicastPicker{}
}
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
if cc.dopts.codec == nil {
// Set the default codec.
cc.dopts.codec = protoCodec{}
}
cc.stateCV = sync.NewCond(&cc.mu)
if cc.dopts.block {
if err := cc.resetTransport(false); err != nil {
cc.mu.Lock()
cc.errorf("dial failed: %v", err)
cc.mu.Unlock()
cc.Close()
return nil, err
}
// Start to monitor the error status of transport.
go cc.transportMonitor()
} else {
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := cc.resetTransport(false); err != nil {
cc.mu.Lock()
cc.errorf("dial failed: %v", err)
cc.mu.Unlock()
grpclog.Printf("Failed to dial %s: %v; please retry.", target, err)
cc.Close()
return
}
go cc.transportMonitor()
}()
if err := cc.dopts.picker.Init(cc); err != nil {
return nil, err
}
return cc, nil
}
// printf records an event in cc's event log, unless cc has been closed.
// REQUIRES cc.mu is held.
func (cc *ClientConn) printf(format string, a ...interface{}) {
if cc.events != nil {
cc.events.Printf(format, a...)
}
}
// errorf records an error in cc's event log, unless cc has been closed.
// REQUIRES cc.mu is held.
func (cc *ClientConn) errorf(format string, a ...interface{}) {
if cc.events != nil {
cc.events.Errorf(format, a...)
}
}
// ConnectivityState indicates the state of a client connection.
type ConnectivityState int
@ -261,6 +195,30 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC service.
type ClientConn struct {
target string
dopts dialOptions
}
// State returns the connectivity state of cc.
// This is EXPERIMENTAL API.
func (cc *ClientConn) State() ConnectivityState {
return cc.dopts.picker.State()
}
// WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires on cc. It returns false if timeout fires, and true otherwise.
// This is EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
return cc.dopts.picker.WaitForStateChange(timeout, sourceState)
}
// Close starts to tear down the ClientConn.
func (cc *ClientConn) Close() error {
return cc.dopts.picker.Close()
}
// Conn is a client connection to a single destination.
type Conn struct {
target string
authority string
dopts dialOptions
@ -276,8 +234,86 @@ type ClientConn struct {
transport transport.ClientTransport
}
// State returns the connectivity state of the ClientConn
func (cc *ClientConn) State() ConnectivityState {
// NewConn creates a Conn.
func NewConn(cc *ClientConn) (*Conn, error) {
if cc.target == "" {
return nil, ErrUnspecTarget
}
c := &Conn{
target: cc.target,
dopts: cc.dopts,
shutdownChan: make(chan struct{}),
}
if EnableTracing {
c.events = trace.NewEventLog("grpc.ClientConn", c.target)
}
if !c.dopts.insecure {
var ok bool
for _, cd := range c.dopts.copts.AuthOptions {
if _, ok := cd.(credentials.TransportAuthenticator); !ok {
continue
}
ok = true
}
if !ok {
return nil, ErrNoTransportSecurity
}
} else {
for _, cd := range c.dopts.copts.AuthOptions {
if cd.RequireTransportSecurity() {
return nil, ErrCredentialsMisuse
}
}
}
colonPos := strings.LastIndex(c.target, ":")
if colonPos == -1 {
colonPos = len(c.target)
}
c.authority = c.target[:colonPos]
if c.dopts.codec == nil {
// Set the default codec.
c.dopts.codec = protoCodec{}
}
c.stateCV = sync.NewCond(&c.mu)
if c.dopts.block {
if err := c.resetTransport(false); err != nil {
c.Close()
return nil, err
}
// Start to monitor the error status of transport.
go c.transportMonitor()
} else {
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := c.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err)
c.Close()
return
}
go c.transportMonitor()
}()
}
return c, nil
}
// printf records an event in cc's event log, unless cc has been closed.
// REQUIRES cc.mu is held.
func (cc *Conn) printf(format string, a ...interface{}) {
if cc.events != nil {
cc.events.Printf(format, a...)
}
}
// errorf records an error in cc's event log, unless cc has been closed.
// REQUIRES cc.mu is held.
func (cc *Conn) errorf(format string, a ...interface{}) {
if cc.events != nil {
cc.events.Errorf(format, a...)
}
}
// State returns the connectivity state of the Conn
func (cc *Conn) State() ConnectivityState {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.state
@ -285,7 +321,8 @@ func (cc *ClientConn) State() ConnectivityState {
// WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires. It returns false if timeout fires and true otherwise.
func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
// TODO(zhaoq): Rewrite for complex Picker.
func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
start := time.Now()
cc.mu.Lock()
defer cc.mu.Unlock()
@ -317,7 +354,7 @@ func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState Conn
return true
}
func (cc *ClientConn) resetTransport(closeTransport bool) error {
func (cc *Conn) resetTransport(closeTransport bool) error {
var retries int
start := time.Now()
for {
@ -361,6 +398,10 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
cc.errorf("transient failure: %v", err)
cc.state = TransientFailure
cc.stateCV.Broadcast()
if cc.ready != nil {
close(cc.ready)
cc.ready = nil
}
cc.mu.Unlock()
sleepTime -= time.Since(connectTime)
if sleepTime < 0 {
@ -402,7 +443,7 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
func (cc *ClientConn) transportMonitor() {
func (cc *Conn) transportMonitor() {
for {
select {
// shutdownChan is needed to detect the teardown when
@ -427,9 +468,8 @@ func (cc *ClientConn) transportMonitor() {
}
}
// When wait returns, either the new transport is up or ClientConn is
// closing.
func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, error) {
// Wait blocks until i) the new transport is up or ii) ctx is done or iii)
func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
for {
cc.mu.Lock()
switch {
@ -439,6 +479,11 @@ func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, erro
case cc.state == Ready:
cc.mu.Unlock()
return cc.transport, nil
case cc.state == TransientFailure:
cc.mu.Unlock()
// Break out so that the caller gets chance to pick another transport to
// perform rpc instead of sticking to this transport.
return nil, ErrTransientFailure
default:
ready := cc.ready
if ready == nil {
@ -456,12 +501,12 @@ func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, erro
}
}
// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if
// Close starts to tear down the Conn. Returns ErrClientConnClosing if
// it has been closed (mostly due to dial time-out).
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many ClientConn's in a
// tight loop.
func (cc *ClientConn) Close() error {
func (cc *Conn) Close() error {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.state == Shutdown {

View File

@ -84,9 +84,9 @@ func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
s.RegisterService(&_Greeter_serviceDesc, srv)
}
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HelloRequest)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(GreeterServer).SayHello(ctx, in)

View File

@ -310,9 +310,9 @@ func RegisterRouteGuideServer(s *grpc.Server, srv RouteGuideServer) {
s.RegisterService(&_RouteGuide_serviceDesc, srv)
}
func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(Point)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(RouteGuideServer).GetFeature(ctx, in)

View File

@ -108,9 +108,9 @@ func RegisterHealthServer(s *grpc.Server, srv HealthServer) {
s.RegisterService(&_Health_serviceDesc, srv)
}
func _Health_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HealthCheckRequest)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(HealthServer).Check(ctx, in)

View File

@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
s.RegisterService(&_TestService_serviceDesc, srv)
}
func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(Empty)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).EmptyCall(ctx, in)
@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec
return out, nil
}
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(SimpleRequest)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).UnaryCall(ctx, in)

View File

@ -1,3 +1,36 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package etcd
import (
@ -5,41 +38,73 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)
// update defines an etcd key-value update.
type update struct {
key, val string
}
// getNode reports the set of changes starting from node recursively.
func getNode(node *etcdcl.Node) (updates []*update) {
for _, v := range node.Nodes {
updates = append(updates, getNode(v)...)
}
if !node.Dir {
u := &update{
key: node.Key,
val: node.Value,
}
updates = []*update{u}
}
return
}
type watcher struct {
wr etcdcl.Watcher
ctx context.Context
cancel context.CancelFunc
kv map[string]string
}
func (w *watcher) Next() (*naming.Update, error) {
func (w *watcher) Next() (nu []*naming.Update, _ error) {
for {
resp, err := w.wr.Next(w.ctx)
if err != nil {
return nil, err
}
if resp.Node.Dir {
continue
}
var act naming.OP
if resp.Action == "set" {
if resp.PrevNode == nil {
act = naming.Add
} else {
act = naming.Modify
updates := getNode(resp.Node)
for _, u := range updates {
switch resp.Action {
case "set":
if resp.PrevNode == nil {
w.kv[u.key] = u.val
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: u.val,
})
} else {
nu = append(nu, &naming.Update{
Op: naming.Delete,
Addr: w.kv[u.key],
})
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: u.val,
})
w.kv[u.key] = u.val
}
case "delete":
nu = append(nu, &naming.Update{
Op: naming.Delete,
Addr: w.kv[u.key],
})
delete(w.kv, u.key)
}
} else if resp.Action == "delete" {
act = naming.Delete
}
if act == naming.No {
continue
if len(nu) > 0 {
break
}
return &naming.Update{
Op: act,
Key: resp.Node.Key,
Val: resp.Node.Value,
}, nil
}
return nu, nil
}
func (w *watcher) Stop() {
@ -48,41 +113,36 @@ func (w *watcher) Stop() {
type resolver struct {
kapi etcdcl.KeysAPI
kv map[string]string
}
func (r *resolver) NewWatcher(target string) naming.Watcher {
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
w := &watcher{
wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}),
ctx: ctx,
cancel: cancel,
}
for k, v := range r.kv {
w.kv[k] = v
}
return w
}
// getNode reports the naming.Update starting from node recursively.
func getNode(node *etcdcl.Node) (updates []*naming.Update) {
for _, v := range node.Nodes {
updates = append(updates, getNode(v)...)
}
if !node.Dir {
entry := &naming.Update{
Op: naming.Add,
Key: node.Key,
Val: node.Value,
}
updates = []*naming.Update{entry}
}
return
}
func (r *resolver) Resolve(target string) ([]*naming.Update, error) {
func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) {
resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true})
if err != nil {
return nil, err
}
updates := getNode(resp.Node)
return updates, nil
for _, u := range updates {
r.kv[u.key] = u.val
nu = append(nu, &naming.Update{
Op: naming.Add,
Addr: u.val,
})
}
return nu, nil
}
// NewResolver creates an etcd-based naming.Resolver.
@ -93,5 +153,6 @@ func NewResolver(cfg etcdcl.Config) (naming.Resolver, error) {
}
return &resolver{
kapi: etcdcl.NewKeysAPI(c),
kv: make(map[string]string),
}, nil
}

View File

@ -1,17 +1,48 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
// Package naming defines the naming API and related data structures for gRPC.
// The interface is EXPERIMENTAL and may be suject to change.
package naming
// OP defines the corresponding operations for a name resolution change.
type OP uint8
const (
// No indicates there are no changes.
No OP = iota
// Add indicates a new address is added.
Add
Add = iota
// Delete indicates an exisiting address is deleted.
Delete
// Modify indicates an existing address is modified.
Modify
)
type ServiceConfig interface{}
@ -20,8 +51,7 @@ type ServiceConfig interface{}
type Update struct {
// Op indicates the operation of the update.
Op OP
Key string
Val string
Addr string
Config ServiceConfig
}
@ -36,8 +66,9 @@ type Resolver interface {
// Watcher watches the updates for a particular target.
type Watcher interface {
// Next blocks until an update or error happens.
Next() (*Update, error)
// Next blocks until an update or error happens. It may return one or more
// updates.
Next() ([]*Update, error)
// Stop stops the Watcher.
Stop()
}

90
picker.go Normal file
View File

@ -0,0 +1,90 @@
/*
*
* Copyright 2014, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package grpc
import (
"time"
)
// Picker picks a Conn for RPC requests.
// This is EXPERIMENTAL and Please do not implement your own Picker for now.
type Picker interface {
// Init does initial processing for the Picker, e.g., initiate some connections.
Init(cc *ClientConn) error
// Pick returns the Conn to use for the upcoming RPC. It may return different
// Conn's up to the implementation.
Pick() (*Conn, error)
// State returns the connectivity state of the underlying connections.
State() ConnectivityState
// WaitForStateChange blocks until the state changes to something other than
// the sourceState or timeout fires on cc. It returns false if timeout fires,
// and true otherwise.
WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool
// Close closes all the Conn's owned by this Picker.
Close() error
}
// unicastPicker is the default Picker which is used when there is no custom Picker
// specified by users. It always picks the same Conn.
type unicastPicker struct {
conn *Conn
}
func (p *unicastPicker) Init(cc *ClientConn) error {
c, err := NewConn(cc)
if err != nil {
return err
}
p.conn = c
return nil
}
func (p *unicastPicker) Pick() (*Conn, error) {
return p.conn, nil
}
func (p *unicastPicker) State() ConnectivityState {
return p.conn.State()
}
func (p *unicastPicker) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
return p.conn.WaitForStateChange(timeout, sourceState)
}
func (p *unicastPicker) Close() error {
if p.conn != nil {
return p.conn.Close()
}
return nil
}

View File

@ -42,6 +42,7 @@ import (
"runtime"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
@ -52,7 +53,7 @@ import (
"google.golang.org/grpc/transport"
)
type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error)
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
@ -284,12 +285,19 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
var traceInfo traceInfo
if EnableTracing {
traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false
traceInfo.firstLine.remoteAddr = t.RemoteAddr()
if dl, ok := ctx.Deadline(); ok {
traceInfo.firstLine.deadline = dl.Sub(time.Now())
}
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
ctx = trace.NewContext(ctx, traceInfo.tr)
defer func() {
if err != nil && err != io.EOF {
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@ -317,14 +325,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return err
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true)
}
switch pf {
case compressionNone:
statusCode := codes.OK
statusDesc := ""
reply, appErr := md.Handler(srv.server, stream.Context(), s.opts.codec, req)
df := func(v interface{}) error {
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
reply, appErr := md.Handler(srv.server, ctx, df)
if appErr != nil {
if err, ok := appErr.(rpcError); ok {
statusCode = err.code
@ -333,12 +347,20 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
}
if traceInfo.tr != nil && statusCode != codes.OK {
traceInfo.tr.LazyLog(stringer(statusDesc), true)
traceInfo.tr.SetError()
}
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
return err
}
return nil
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(stringer("OK"), false)
}
opts := &transport.Options{
Last: true,
Delay: false,
@ -367,9 +389,12 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
ss := &serverStream{
t: t,
s: stream,
ctx: ctx,
p: &parser{s: stream},
codec: s.opts.codec,
tracing: EnableTracing,
@ -377,7 +402,12 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if ss.tracing {
ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
ss.traceInfo.firstLine.client = false
ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr()
if dl, ok := ctx.Deadline(); ok {
ss.traceInfo.firstLine.deadline = dl.Sub(time.Now())
}
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)
defer func() {
ss.mu.Lock()
if err != nil && err != io.EOF {
@ -398,6 +428,16 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
ss.statusDesc = appErr.Error()
}
}
if ss.tracing {
ss.mu.Lock()
if ss.statusCode != codes.OK {
ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true)
ss.traceInfo.tr.SetError()
} else {
ss.traceInfo.tr.LazyLog(stringer("OK"), false)
}
ss.mu.Unlock()
}
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
}

View File

@ -96,14 +96,33 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
var (
conn *Conn
t transport.ClientTransport
err error
)
for {
conn, err = cc.dopts.picker.Pick()
if err != nil {
return nil, toRPCErr(err)
}
t, err = conn.Wait(ctx)
if err != nil {
if err == ErrTransientFailure {
continue
}
return nil, toRPCErr(err)
}
break
}
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
callHdr := &transport.CallHdr{
Host: cc.authority,
Host: conn.authority,
Method: method,
}
cs := &clientStream{
desc: desc,
codec: cc.dopts.codec,
codec: conn.dopts.codec,
tracing: EnableTracing,
}
if cs.tracing {
@ -113,10 +132,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
}
cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false)
}
t, err := cc.wait(ctx)
if err != nil {
return nil, toRPCErr(err)
ctx = trace.NewContext(ctx, cs.traceInfo.tr)
}
s, err := t.NewStream(ctx, callHdr)
if err != nil {
@ -278,6 +294,7 @@ type ServerStream interface {
type serverStream struct {
t transport.ServerTransport
s *transport.Stream
ctx context.Context // provides trace.FromContext when tracing
p *parser
codec Codec
statusCode codes.Code
@ -292,7 +309,7 @@ type serverStream struct {
}
func (ss *serverStream) Context() context.Context {
return ss.s.Context()
return ss.ctx
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
@ -317,7 +334,6 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.traceInfo.tr.SetError()
}
ss.mu.Unlock()
}
}()

View File

@ -539,9 +539,9 @@ func RegisterTestServiceServer(s *grpc.Server, srv TestServiceServer) {
s.RegisterService(&_TestService_serviceDesc, srv)
}
func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(Empty)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).EmptyCall(ctx, in)
@ -551,9 +551,9 @@ func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec
return out, nil
}
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(SimpleRequest)
if err := codec.Unmarshal(buf, in); err != nil {
if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).UnaryCall(ctx, in)

View File

@ -114,3 +114,7 @@ type fmtStringer struct {
func (f *fmtStringer) String() string {
return fmt.Sprintf(f.format, f.a...)
}
type stringer string
func (s stringer) String() string { return string(s) }

View File

@ -689,3 +689,7 @@ func (t *http2Server) closeStream(s *Stream) {
// other goroutines.
s.cancel()
}
func (t *http2Server) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
}

View File

@ -390,6 +390,8 @@ type ServerTransport interface {
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close() error
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
}
// StreamErrorf creates an StreamError with the specified error code and description.