add BalancerGetOption

This commit is contained in:
iamqizhao
2016-05-24 17:19:44 -07:00
parent 5b484e4099
commit 0b1df3bca2
4 changed files with 35 additions and 18 deletions

View File

@ -52,6 +52,13 @@ type Address struct {
Metadata interface{}
}
// BalancerGetOption can configure a Get call.
type BalancerGetOptions struct {
// BlockingWait specifies whether Get should block when there is no
// address ready for transmission.
BlockingWait bool
}
// Balancer chooses network addresses for RPCs.
// This is the EXPERIMENTAL API and may be changed or extended in the future.
type Balancer interface {
@ -65,19 +72,21 @@ type Balancer interface {
// by Get.
Up(addr Address) (down func(error))
// Get gets the address of a server for the rpc corresponding to ctx.
// It may block if there is no server available. It respects the
// timeout or cancellation of ctx when blocking. It returns put which
// is called once the rpc has completed or failed. put can collect and
// If opts.BlockingWait is true, it blocks if there is no connection available,
// i.e., invocations of Up(...) is equal to that of Down(...). It respects the
// timeout or cancellation of ctx when blocking. If opts.BlockingWait is
// false, it may return any address it has notified via Notify(...). It returns
// put which is called once the rpc has completed or failed. put can collect and
// report rpc stats to remote load balancer.
Get(ctx context.Context) (addr Address, put func(), err error)
// Notify gRPC internals the list of Address which should be connected. gRPC
// internals will compare it with the exisiting connected addresses. If the
// address Balancer notified is not in the list of the connected addresses,
// gRPC starts to connect the address. If an address in the connected
// addresses is not in the notification list, the corresponding connect will be
// shutdown gracefully. Otherwise, there are no operations. Note that this
// function must return the full list of the Addrresses which should be connected.
// It is NOT delta.
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
// Notify gRPC internals the list of Address which should be connected. The list
// may be from a name resolver or remote load balancer. gRPC internals will
// compare it with the exisiting connected addresses. If the address Balancer
// notified is not in the list of the connected addresses, gRPC starts to connect
// the address. If an address in the connected addresses is not in the notification
// list, the corresponding connect will be shutdown gracefully. Otherwise, there
// are no operations. Note that this function must return the full list of the
// Addrresses which should be connected. It is NOT delta.
Notify() <-chan []Address
// Close shuts down the balancer.
Close() error
@ -204,7 +213,7 @@ func (rr *roundRobin) down(addr Address, err error) {
}
// Get returns the next addr in the rotation. It blocks if there is no address available.
func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err error) {
func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
rr.mu.Lock()
if rr.done {
@ -222,6 +231,7 @@ func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err er
return
}
// There is no address available. Wait on rr.waitCh.
// TODO(zhaoq): Handle the case when opts.BlockingWait is false.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch

View File

@ -147,7 +147,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type()
}
t, put, err = cc.getTransport(ctx)
gopts := BalancerGetOptions{
BlockingWait: !c.failFast,
}
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if err == ErrClientConnClosing {

View File

@ -401,8 +401,9 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error {
return nil
}
func (cc *ClientConn) getTransport(ctx context.Context) (transport.ClientTransport, func(), error) {
addr, put, err := cc.balancer.Get(ctx)
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
// TODO(zhaoq): Implement fail-fast logic.
addr, put, err := cc.balancer.Get(ctx, opts)
if err != nil {
return nil, nil, err
}

View File

@ -105,11 +105,14 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
err error
put func()
)
t, put, err = cc.getTransport(ctx)
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
gopts := BalancerGetOptions{
BlockingWait: false,
}
t, put, err = cc.getTransport(ctx, gopts)
if err != nil {
return nil, toRPCErr(err)
}
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
callHdr := &transport.CallHdr{
Host: cc.authority,
Method: method,