Merge pull request #393 from iamqizhao/master

Part1: revise Picker API
This commit is contained in:
Qi Zhao
2015-10-08 11:18:22 -07:00
4 changed files with 31 additions and 41 deletions

13
call.go
View File

@ -144,21 +144,16 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err error err error
t transport.ClientTransport t transport.ClientTransport
stream *transport.Stream stream *transport.Stream
conn *Conn
) )
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs. // TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
if lastErr != nil && c.failFast { if lastErr != nil && c.failFast {
return toRPCErr(lastErr) return toRPCErr(lastErr)
} }
conn, err = cc.dopts.picker.Pick()
if err != nil {
return toRPCErr(err)
}
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: conn.authority, Host: cc.authority,
Method: method, Method: method,
} }
t, err = conn.Wait(ctx) t, err = cc.dopts.picker.Pick(ctx)
if err != nil { if err != nil {
if lastErr != nil { if lastErr != nil {
// This was a retry; return the error from the last attempt. // This was a retry; return the error from the last attempt.
@ -169,7 +164,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if c.traceInfo.tr != nil { if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
} }
stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts) stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
if err != nil { if err != nil {
if _, ok := err.(transport.ConnectionError); ok { if _, ok := err.(transport.ConnectionError); ok {
lastErr = err lastErr = err
@ -181,7 +176,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
return toRPCErr(err) return toRPCErr(err)
} }
// Receive the response // Receive the response
lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply) lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
if _, ok := lastErr.(transport.ConnectionError); ok { if _, ok := lastErr.(transport.ConnectionError); ok {
continue continue
} }

View File

@ -149,12 +149,21 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
for _, opt := range opts { for _, opt := range opts {
opt(&cc.dopts) opt(&cc.dopts)
} }
if cc.dopts.codec == nil {
// Set the default codec.
cc.dopts.codec = protoCodec{}
}
if cc.dopts.picker == nil { if cc.dopts.picker == nil {
cc.dopts.picker = &unicastPicker{} cc.dopts.picker = &unicastPicker{}
} }
if err := cc.dopts.picker.Init(cc); err != nil { if err := cc.dopts.picker.Init(cc); err != nil {
return nil, err return nil, err
} }
colonPos := strings.LastIndex(target, ":")
if colonPos == -1 {
colonPos = len(target)
}
cc.authority = target[:colonPos]
return cc, nil return cc, nil
} }
@ -194,6 +203,7 @@ func (s ConnectivityState) String() string {
// ClientConn represents a client connection to an RPC service. // ClientConn represents a client connection to an RPC service.
type ClientConn struct { type ClientConn struct {
target string target string
authority string
dopts dialOptions dopts dialOptions
} }
@ -218,7 +228,6 @@ func (cc *ClientConn) Close() error {
// Conn is a client connection to a single destination. // Conn is a client connection to a single destination.
type Conn struct { type Conn struct {
target string target string
authority string
dopts dialOptions dopts dialOptions
shutdownChan chan struct{} shutdownChan chan struct{}
events trace.EventLog events trace.EventLog
@ -263,15 +272,6 @@ func NewConn(cc *ClientConn) (*Conn, error) {
} }
} }
} }
colonPos := strings.LastIndex(c.target, ":")
if colonPos == -1 {
colonPos = len(c.target)
}
c.authority = c.target[:colonPos]
if c.dopts.codec == nil {
// Set the default codec.
c.dopts.codec = protoCodec{}
}
c.stateCV = sync.NewCond(&c.mu) c.stateCV = sync.NewCond(&c.mu)
if c.dopts.block { if c.dopts.block {
if err := c.resetTransport(false); err != nil { if err := c.resetTransport(false); err != nil {

View File

@ -35,6 +35,9 @@ package grpc
import ( import (
"time" "time"
"golang.org/x/net/context"
"google.golang.org/grpc/transport"
) )
// Picker picks a Conn for RPC requests. // Picker picks a Conn for RPC requests.
@ -42,9 +45,9 @@ import (
type Picker interface { type Picker interface {
// Init does initial processing for the Picker, e.g., initiate some connections. // Init does initial processing for the Picker, e.g., initiate some connections.
Init(cc *ClientConn) error Init(cc *ClientConn) error
// Pick returns the Conn to use for the upcoming RPC. It may return different // Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
// Conn's up to the implementation. // or some error happens.
Pick() (*Conn, error) Pick(ctx context.Context) (transport.ClientTransport, error)
// State returns the connectivity state of the underlying connections. // State returns the connectivity state of the underlying connections.
State() ConnectivityState State() ConnectivityState
// WaitForStateChange blocks until the state changes to something other than // WaitForStateChange blocks until the state changes to something other than
@ -70,8 +73,8 @@ func (p *unicastPicker) Init(cc *ClientConn) error {
return nil return nil
} }
func (p *unicastPicker) Pick() (*Conn, error) { func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
return p.conn, nil return p.conn.Wait(ctx)
} }
func (p *unicastPicker) State() ConnectivityState { func (p *unicastPicker) State() ConnectivityState {

View File

@ -97,29 +97,21 @@ type ClientStream interface {
// by generated code. // by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
var ( var (
conn *Conn
t transport.ClientTransport t transport.ClientTransport
err error err error
) )
for { t, err = cc.dopts.picker.Pick(ctx)
conn, err = cc.dopts.picker.Pick()
if err != nil { if err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }
t, err = conn.Wait(ctx)
if err != nil {
return nil, toRPCErr(err)
}
break
}
// TODO(zhaoq): CallOption is omitted. Add support when it is needed. // TODO(zhaoq): CallOption is omitted. Add support when it is needed.
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: conn.authority, Host: cc.authority,
Method: method, Method: method,
} }
cs := &clientStream{ cs := &clientStream{
desc: desc, desc: desc,
codec: conn.dopts.codec, codec: cc.dopts.codec,
tracing: EnableTracing, tracing: EnableTracing,
} }
if cs.tracing { if cs.tracing {