From 0b1df3bca2197b6c636cb19c9b6c4f310be95a03 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 24 May 2016 17:19:44 -0700 Subject: [PATCH] add BalancerGetOption --- balancer.go | 36 +++++++++++++++++++++++------------- call.go | 5 ++++- clientconn.go | 5 +++-- stream.go | 7 +++++-- 4 files changed, 35 insertions(+), 18 deletions(-) diff --git a/balancer.go b/balancer.go index 279474b9..1aa170dd 100644 --- a/balancer.go +++ b/balancer.go @@ -52,6 +52,13 @@ type Address struct { Metadata interface{} } +// BalancerGetOption can configure a Get call. +type BalancerGetOptions struct { + // BlockingWait specifies whether Get should block when there is no + // address ready for transmission. + BlockingWait bool +} + // Balancer chooses network addresses for RPCs. // This is the EXPERIMENTAL API and may be changed or extended in the future. type Balancer interface { @@ -65,19 +72,21 @@ type Balancer interface { // by Get. Up(addr Address) (down func(error)) // Get gets the address of a server for the rpc corresponding to ctx. - // It may block if there is no server available. It respects the - // timeout or cancellation of ctx when blocking. It returns put which - // is called once the rpc has completed or failed. put can collect and + // If opts.BlockingWait is true, it blocks if there is no connection available, + // i.e., invocations of Up(...) is equal to that of Down(...). It respects the + // timeout or cancellation of ctx when blocking. If opts.BlockingWait is + // false, it may return any address it has notified via Notify(...). It returns + // put which is called once the rpc has completed or failed. put can collect and // report rpc stats to remote load balancer. - Get(ctx context.Context) (addr Address, put func(), err error) - // Notify gRPC internals the list of Address which should be connected. gRPC - // internals will compare it with the exisiting connected addresses. If the - // address Balancer notified is not in the list of the connected addresses, - // gRPC starts to connect the address. If an address in the connected - // addresses is not in the notification list, the corresponding connect will be - // shutdown gracefully. Otherwise, there are no operations. Note that this - // function must return the full list of the Addrresses which should be connected. - // It is NOT delta. + Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) + // Notify gRPC internals the list of Address which should be connected. The list + // may be from a name resolver or remote load balancer. gRPC internals will + // compare it with the exisiting connected addresses. If the address Balancer + // notified is not in the list of the connected addresses, gRPC starts to connect + // the address. If an address in the connected addresses is not in the notification + // list, the corresponding connect will be shutdown gracefully. Otherwise, there + // are no operations. Note that this function must return the full list of the + // Addrresses which should be connected. It is NOT delta. Notify() <-chan []Address // Close shuts down the balancer. Close() error @@ -204,7 +213,7 @@ func (rr *roundRobin) down(addr Address, err error) { } // Get returns the next addr in the rotation. It blocks if there is no address available. -func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err error) { +func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { var ch chan struct{} rr.mu.Lock() if rr.done { @@ -222,6 +231,7 @@ func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err er return } // There is no address available. Wait on rr.waitCh. + // TODO(zhaoq): Handle the case when opts.BlockingWait is false. if rr.waitCh == nil { ch = make(chan struct{}) rr.waitCh = ch diff --git a/call.go b/call.go index 98b8e2b1..2149ce29 100644 --- a/call.go +++ b/call.go @@ -147,7 +147,10 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if cc.dopts.cp != nil { callHdr.SendCompress = cc.dopts.cp.Type() } - t, put, err = cc.getTransport(ctx) + gopts := BalancerGetOptions{ + BlockingWait: !c.failFast, + } + t, put, err = cc.getTransport(ctx, gopts) if err != nil { // TODO(zhaoq): Probably revisit the error handling. if err == ErrClientConnClosing { diff --git a/clientconn.go b/clientconn.go index 141cbbf0..81e75d01 100644 --- a/clientconn.go +++ b/clientconn.go @@ -401,8 +401,9 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { return nil } -func (cc *ClientConn) getTransport(ctx context.Context) (transport.ClientTransport, func(), error) { - addr, put, err := cc.balancer.Get(ctx) +func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { + // TODO(zhaoq): Implement fail-fast logic. + addr, put, err := cc.balancer.Get(ctx, opts) if err != nil { return nil, nil, err } diff --git a/stream.go b/stream.go index 7f0e7c3e..de125d5b 100644 --- a/stream.go +++ b/stream.go @@ -105,11 +105,14 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth err error put func() ) - t, put, err = cc.getTransport(ctx) + // TODO(zhaoq): CallOption is omitted. Add support when it is needed. + gopts := BalancerGetOptions{ + BlockingWait: false, + } + t, put, err = cc.getTransport(ctx, gopts) if err != nil { return nil, toRPCErr(err) } - // TODO(zhaoq): CallOption is omitted. Add support when it is needed. callHdr := &transport.CallHdr{ Host: cc.authority, Method: method,