revise Picker API

This commit is contained in:
iamqizhao
2015-09-29 10:24:03 -07:00
parent 2ca13251f7
commit c01ea6e359
4 changed files with 38 additions and 35 deletions

@ -150,7 +150,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if lastErr != nil && c.failFast { if lastErr != nil && c.failFast {
return toRPCErr(lastErr) return toRPCErr(lastErr)
} }
conn, err = cc.picker.Pick() conn, err = cc.dopts.picker.Pick()
if err != nil { if err != nil {
return toRPCErr(err) return toRPCErr(err)
} }

@ -66,7 +66,7 @@ var (
// established or re-established within the specified timeout. // established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
// ErrTransientFailure indicates the connection failed due to a transient error. // ErrTransientFailure indicates the connection failed due to a transient error.
ErrTransientFailure = errors.New("transient connection failure") ErrTransientFailure = errors.New("grpc: transient connection failure")
// minimum time to give a connection to complete // minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second minConnectTimeout = 20 * time.Second
) )
@ -145,18 +145,19 @@ func WithUserAgent(s string) DialOption {
// Dial creates a client connection the given target. // Dial creates a client connection the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) { func Dial(target string, opts ...DialOption) (*ClientConn, error) {
var dopts dialOptions cc := &ClientConn{
target: target,
}
for _, opt := range opts { for _, opt := range opts {
opt(&dopts) opt(&cc.dopts)
} }
if dopts.picker == nil { if cc.dopts.picker == nil {
p, err := newUnicastPicker(target, dopts) cc.dopts.picker = &unicastPicker{}
if err != nil {
return nil, err
}
dopts.picker = p
} }
return &ClientConn{dopts.picker}, nil if err := cc.dopts.picker.Init(cc); err != nil {
return nil, err
}
return cc, nil
} }
// ConnectivityState indicates the state of a client connection. // ConnectivityState indicates the state of a client connection.
@ -194,25 +195,26 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC service. // ClientConn represents a client connection to an RPC service.
type ClientConn struct { type ClientConn struct {
picker Picker target string
dopts dialOptions
} }
// State returns the connectivity state of cc. // State returns the connectivity state of cc.
// This is EXPERIMENTAL API. // This is EXPERIMENTAL API.
func (cc *ClientConn) State() ConnectivityState { func (cc *ClientConn) State() ConnectivityState {
return cc.picker.State() return cc.dopts.picker.State()
} }
// WaitForStateChange blocks until the state changes to something other than the sourceState // WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires on cc. It returns false if timeout fires, and true otherwise. // or timeout fires on cc. It returns false if timeout fires, and true otherwise.
// This is EXPERIMENTAL API. // This is EXPERIMENTAL API.
func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool { func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
return cc.picker.WaitForStateChange(timeout, sourceState) return cc.dopts.picker.WaitForStateChange(timeout, sourceState)
} }
// Close starts to tear down the ClientConn. // Close starts to tear down the ClientConn.
func (cc *ClientConn) Close() error { func (cc *ClientConn) Close() error {
return cc.picker.Close() return cc.dopts.picker.Close()
} }
// Conn is a client connection to a single destination. // Conn is a client connection to a single destination.
@ -233,17 +235,17 @@ type Conn struct {
} }
// NewConn creates a Conn. // NewConn creates a Conn.
func NewConn(target string, dopts dialOptions) (*Conn, error) { func NewConn(cc *ClientConn) (*Conn, error) {
if target == "" { if cc.target == "" {
return nil, ErrUnspecTarget return nil, ErrUnspecTarget
} }
c := &Conn{ c := &Conn{
target: target, target: cc.target,
dopts: dopts, dopts: cc.dopts,
shutdownChan: make(chan struct{}), shutdownChan: make(chan struct{}),
} }
if EnableTracing { if EnableTracing {
c.events = trace.NewEventLog("grpc.ClientConn", target) c.events = trace.NewEventLog("grpc.ClientConn", c.target)
} }
if !c.dopts.insecure { if !c.dopts.insecure {
var ok bool var ok bool
@ -263,11 +265,11 @@ func NewConn(target string, dopts dialOptions) (*Conn, error) {
} }
} }
} }
colonPos := strings.LastIndex(target, ":") colonPos := strings.LastIndex(c.target, ":")
if colonPos == -1 { if colonPos == -1 {
colonPos = len(target) colonPos = len(c.target)
} }
c.authority = target[:colonPos] c.authority = c.target[:colonPos]
if c.dopts.codec == nil { if c.dopts.codec == nil {
// Set the default codec. // Set the default codec.
c.dopts.codec = protoCodec{} c.dopts.codec = protoCodec{}
@ -284,7 +286,7 @@ func NewConn(target string, dopts dialOptions) (*Conn, error) {
// Start a goroutine connecting to the server asynchronously. // Start a goroutine connecting to the server asynchronously.
go func() { go func() {
if err := c.resetTransport(false); err != nil { if err := c.resetTransport(false); err != nil {
grpclog.Printf("Failed to dial %s: %v; please retry.", target, err) grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err)
c.Close() c.Close()
return return
} }

@ -40,6 +40,8 @@ import (
// Picker picks a Conn for RPC requests. // Picker picks a Conn for RPC requests.
// This is EXPERIMENTAL and Please do not implement your own Picker for now. // This is EXPERIMENTAL and Please do not implement your own Picker for now.
type Picker interface { type Picker interface {
// Init does initial processing for the Picker, e.g., initiate some connections.
Init(cc *ClientConn) error
// Pick returns the Conn to use for the upcoming RPC. It may return different // Pick returns the Conn to use for the upcoming RPC. It may return different
// Conn's up to the implementation. // Conn's up to the implementation.
Pick() (*Conn, error) Pick() (*Conn, error)
@ -53,22 +55,21 @@ type Picker interface {
Close() error Close() error
} }
func newUnicastPicker(target string, dopts dialOptions) (Picker, error) {
c, err := NewConn(target, dopts)
if err != nil {
return nil, err
}
return &unicastPicker{
conn: c,
}, nil
}
// unicastPicker is the default Picker which is used when there is no custom Picker // unicastPicker is the default Picker which is used when there is no custom Picker
// specified by users. It always picks the same Conn. // specified by users. It always picks the same Conn.
type unicastPicker struct { type unicastPicker struct {
conn *Conn conn *Conn
} }
func (p *unicastPicker) Init(cc *ClientConn) error {
c, err := NewConn(cc)
if err != nil {
return err
}
p.conn = c
return nil
}
func (p *unicastPicker) Pick() (*Conn, error) { func (p *unicastPicker) Pick() (*Conn, error) {
return p.conn, nil return p.conn, nil
} }

@ -102,7 +102,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
err error err error
) )
for { for {
conn, err = cc.picker.Pick() conn, err = cc.dopts.picker.Pick()
if err != nil { if err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }