Revert "balancer: move Balancer and Picker to V2; delete legacy… (#3315)

This reverts commit 336cf8d76145dc5ebd517fd9c19e14c6822450b3.
This commit is contained in:
Doug Fawley
2020-01-13 13:12:55 -08:00
committed by GitHub
parent 20bce9a0c6
commit c35a580b0c
36 changed files with 2144 additions and 559 deletions

391
balancer.go Normal file
View File

@ -0,0 +1,391 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"net"
"sync"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/status"
)
// Address represents a server the client connects to.
//
// Deprecated: please use package balancer.
type Address struct {
// Addr is the server address on which a connection will be established.
Addr string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision.
Metadata interface{}
}
// BalancerConfig specifies the configurations for Balancer.
//
// Deprecated: please use package balancer. May be removed in a future 1.x release.
type BalancerConfig struct {
// DialCreds is the transport credential the Balancer implementation can
// use to dial to a remote load balancer server. The Balancer implementations
// can ignore this if it does not need to talk to another party securely.
DialCreds credentials.TransportCredentials
// Dialer is the custom dialer the Balancer implementation can use to dial
// to a remote load balancer server. The Balancer implementations
// can ignore this if it doesn't need to talk to remote balancer.
Dialer func(context.Context, string) (net.Conn, error)
}
// BalancerGetOptions configures a Get call.
//
// Deprecated: please use package balancer. May be removed in a future 1.x release.
type BalancerGetOptions struct {
// BlockingWait specifies whether Get should block when there is no
// connected address.
BlockingWait bool
}
// Balancer chooses network addresses for RPCs.
//
// Deprecated: please use package balancer. May be removed in a future 1.x release.
type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example,
// this function may start the name resolution and watch the updates. It will
// be called when dialing.
Start(target string, config BalancerConfig) error
// Up informs the Balancer that gRPC has a connection to the server at
// addr. It returns down which is called once the connection to addr gets
// lost or closed.
// TODO: It is not clear how to construct and take advantage of the meaningful error
// parameter for down. Need realistic demands to guide.
Up(addr Address) (down func(error))
// Get gets the address of a server for the RPC corresponding to ctx.
// i) If it returns a connected address, gRPC internals issues the RPC on the
// connection to this address;
// ii) If it returns an address on which the connection is under construction
// (initiated by Notify(...)) but not connected, gRPC internals
// * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or
// Shutdown state;
// or
// * issues RPC on the connection otherwise.
// iii) If it returns an address on which the connection does not exist, gRPC
// internals treats it as an error and will fail the corresponding RPC.
//
// Therefore, the following is the recommended rule when writing a custom Balancer.
// If opts.BlockingWait is true, it should return a connected address or
// block if there is no connected address. It should respect the timeout or
// cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast
// RPCs), it should return an address it has notified via Notify(...) immediately
// instead of blocking.
//
// The function returns put which is called once the rpc has completed or failed.
// put can collect and report RPC stats to a remote load balancer.
//
// This function should only return the errors Balancer cannot recover by itself.
// gRPC internals will fail the RPC if an error is returned.
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
// Notify returns a channel that is used by gRPC internals to watch the addresses
// gRPC needs to connect. The addresses might be from a name resolver or remote
// load balancer. gRPC internals will compare it with the existing connected
// addresses. If the address Balancer notified is not in the existing connected
// addresses, gRPC starts to connect the address. If an address in the existing
// connected addresses is not in the notification list, the corresponding connection
// is shutdown gracefully. Otherwise, there are no operations to take. Note that
// the Address slice must be the full list of the Addresses which should be connected.
// It is NOT delta.
Notify() <-chan []Address
// Close shuts down the balancer.
Close() error
}
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly.
//
// Deprecated: please use package balancer/roundrobin. May be removed in a future 1.x release.
func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
}
type addrInfo struct {
addr Address
connected bool
}
type roundRobin struct {
r naming.Resolver
w naming.Watcher
addrs []*addrInfo // all the addresses the client should potentially connect
mu sync.Mutex
addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
next int // index of the next address to return for Get()
waitCh chan struct{} // the channel to block when there is no connected address available
done bool // The Balancer is closed.
}
func (rr *roundRobin) watchAddrUpdates() error {
updates, err := rr.w.Next()
if err != nil {
grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err)
return err
}
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
Addr: update.Addr,
Metadata: update.Metadata,
}
switch update.Op {
case naming.Add:
var exist bool
for _, v := range rr.addrs {
if addr == v.addr {
exist = true
grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr)
break
}
}
if exist {
continue
}
rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
for i, v := range rr.addrs {
if addr == v.addr {
copy(rr.addrs[i:], rr.addrs[i+1:])
rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
default:
grpclog.Errorln("Unknown update.Op ", update.Op)
}
}
// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
open := make([]Address, len(rr.addrs))
for i, v := range rr.addrs {
open[i] = v.addr
}
if rr.done {
return ErrClientConnClosing
}
select {
case <-rr.addrCh:
default:
}
rr.addrCh <- open
return nil
}
func (rr *roundRobin) Start(target string, config BalancerConfig) error {
rr.mu.Lock()
defer rr.mu.Unlock()
if rr.done {
return ErrClientConnClosing
}
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
// do name resolution. In this case, target is added into rr.addrs
// as the only address available and rr.addrCh stays nil.
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
w, err := rr.r.Resolve(target)
if err != nil {
return err
}
rr.w = w
rr.addrCh = make(chan []Address, 1)
go func() {
for {
if err := rr.watchAddrUpdates(); err != nil {
return
}
}
}()
return nil
}
// Up sets the connected state of addr and sends notification if there are pending
// Get() calls.
func (rr *roundRobin) Up(addr Address) func(error) {
rr.mu.Lock()
defer rr.mu.Unlock()
var cnt int
for _, a := range rr.addrs {
if a.addr == addr {
if a.connected {
return nil
}
a.connected = true
}
if a.connected {
cnt++
}
}
// addr is only one which is connected. Notify the Get() callers who are blocking.
if cnt == 1 && rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
return func(err error) {
rr.down(addr, err)
}
}
// down unsets the connected state of addr.
func (rr *roundRobin) down(addr Address, err error) {
rr.mu.Lock()
defer rr.mu.Unlock()
for _, a := range rr.addrs {
if addr == a.addr {
a.connected = false
break
}
}
}
// Get returns the next addr in the rotation.
func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
rr.mu.Lock()
if rr.done {
rr.mu.Unlock()
err = ErrClientConnClosing
return
}
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
}
if !opts.BlockingWait {
if len(rr.addrs) == 0 {
rr.mu.Unlock()
err = status.Errorf(codes.Unavailable, "there is no address available")
return
}
// Returns the next addr on rr.addrs for failfast RPCs.
addr = rr.addrs[rr.next].addr
rr.next++
rr.mu.Unlock()
return
}
// Wait on rr.waitCh for non-failfast RPCs.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
}
rr.mu.Unlock()
for {
select {
case <-ctx.Done():
err = ctx.Err()
return
case <-ch:
rr.mu.Lock()
if rr.done {
rr.mu.Unlock()
err = ErrClientConnClosing
return
}
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
}
// The newly added addr got removed by Down() again.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
}
rr.mu.Unlock()
}
}
}
func (rr *roundRobin) Notify() <-chan []Address {
return rr.addrCh
}
func (rr *roundRobin) Close() error {
rr.mu.Lock()
defer rr.mu.Unlock()
if rr.done {
return errBalancerClosed
}
rr.done = true
if rr.w != nil {
rr.w.Close()
}
if rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
if rr.addrCh != nil {
close(rr.addrCh)
}
return nil
}
// pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn.
// It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get()
// returns the only address Up by resetTransport().
type pickFirst struct {
*roundRobin
}

View File

@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/status"
)
var (
@ -124,7 +123,7 @@ type State struct {
// determine the state of the ClientConn.
ConnectivityState connectivity.State
// Picker is used to choose connections (SubConns) for RPCs.
Picker Picker
Picker V2Picker
}
// ClientConn represents a gRPC ClientConn.
@ -142,11 +141,20 @@ type ClientConn interface {
// The SubConn will be shutdown.
RemoveSubConn(SubConn)
// UpdateBalancerState is called by balancer to notify gRPC that some internal
// state in balancer has changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConn.
//
// Deprecated: use UpdateState instead
UpdateBalancerState(s connectivity.State, p Picker)
// UpdateState notifies gRPC that the balancer's internal state has
// changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call
// pick on the new picker to pick new SubConns.
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConns.
UpdateState(State)
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
@ -229,13 +237,56 @@ type DoneInfo struct {
var (
// ErrNoSubConnAvailable indicates no SubConn is available for pick().
// gRPC will block the RPC until a new picker is available via UpdateState().
// gRPC will block the RPC until a new picker is available via UpdateBalancerState().
ErrNoSubConnAvailable = errors.New("no SubConn is available")
// ErrTransientFailure indicates all SubConns are in TransientFailure.
// WaitForReady RPCs will block, non-WaitForReady RPCs will fail.
ErrTransientFailure = errors.New("all SubConns are in TransientFailure")
ErrTransientFailure = TransientFailureError(errors.New("all SubConns are in TransientFailure"))
)
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
//
// Deprecated: use V2Picker instead
type Picker interface {
// Pick returns the SubConn to be used to send the RPC.
// The returned SubConn must be one returned by NewSubConn().
//
// This functions is expected to return:
// - a SubConn that is known to be READY;
// - ErrNoSubConnAvailable if no SubConn is available, but progress is being
// made (for example, some SubConn is in CONNECTING mode);
// - other errors if no active connecting is happening (for example, all SubConn
// are in TRANSIENT_FAILURE mode).
//
// If a SubConn is returned:
// - If it is READY, gRPC will send the RPC on it;
// - If it is not ready, or becomes not ready after it's returned, gRPC will
// block until UpdateBalancerState() is called and will call pick on the
// new picker. The done function returned from Pick(), if not nil, will be
// called with nil error, no bytes sent and no bytes received.
//
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
// - If the error is ErrTransientFailure or implements IsTransientFailure()
// bool, returning true:
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
// is called to pick again;
// - Otherwise, RPC will fail with unavailable error.
// - Else (error is other non-nil error):
// - The RPC will fail with the error's status code, or Unknown if it is
// not a status error.
//
// The returned done() function will be called once the rpc has finished,
// with the final status of that RPC. If the SubConn returned is not a
// valid SubConn type, done may not be called. done may be nil if balancer
// doesn't care about the RPC status.
Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
}
// PickResult contains information related to a connection chosen for an RPC.
type PickResult struct {
// SubConn is the connection to use for this pick, if its state is Ready.
@ -251,36 +302,24 @@ type PickResult struct {
Done func(DoneInfo)
}
type dropRPCError struct {
type transientFailureError struct {
error
status *status.Status
}
func (e *dropRPCError) DropRPC() bool { return true }
func (e *transientFailureError) IsTransientFailure() bool { return true }
func (e *dropRPCError) GRPCStatus() *status.Status { return e.status }
// DropRPCError wraps err in an error implementing DropRPC() bool, returning
// true. If err is not a status error, it will be converted to one with code
// Unknown and the message containing the err.Error() result.
func DropRPCError(err error) error {
st := status.Convert(err)
return &dropRPCError{error: st.Err(), status: st}
// TransientFailureError wraps err in an error implementing
// IsTransientFailure() bool, returning true.
func TransientFailureError(err error) error {
return &transientFailureError{error: err}
}
// TransientFailureError returns e. It exists for backward compatibility and
// will be deleted soon.
//
// Deprecated: no longer necessary, picker errors are treated this way by
// default.
func TransientFailureError(e error) error { return e }
// Picker is used by gRPC to pick a SubConn to send an RPC.
// V2Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateState().
type Picker interface {
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
type V2Picker interface {
// Pick returns the connection to use for this RPC and related information.
//
// Pick should not block. If the balancer needs to do I/O or any blocking
@ -293,14 +332,14 @@ type Picker interface {
// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
// Picker is provided by the balancer (using ClientConn.UpdateState).
//
// - If the error implements DropRPC() bool, returning true, gRPC will
// terminate all RPCs with the code and message provided. If the error
// is not a status error, it will be converted by gRPC to a status error
// with code Unknown.
// - If the error implements IsTransientFailure() bool, returning true,
// wait for ready RPCs will wait, but non-wait for ready RPCs will be
// terminated with this error's Error() string and status code
// Unavailable.
//
// - For all other errors, wait for ready RPCs will wait, but non-wait for
// ready RPCs will be terminated with this error's Error() string and
// status code Unavailable.
// - Any other errors terminate all RPCs with the code and message
// provided. If the error is not a status error, it will be converted by
// gRPC to a status error with code Unknown.
Pick(info PickInfo) (PickResult, error)
}
@ -309,36 +348,34 @@ type Picker interface {
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// UpdateClientConnState, ResolverError, UpdateSubConnState, and Close are
// guaranteed to be called synchronously from the same goroutine. There's no
// guarantee on picker.Pick, it may be called anytime.
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateSubConnState will be called instead.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
// V2Balancer is temporarily defined for backward compatibility reasons.
//
// Deprecated: use Balancer directly instead.
type V2Balancer = Balancer
// V2Picker is temporarily defined for backward compatibility reasons.
//
// Deprecated: use Picker directly instead.
type V2Picker = Picker
// SubConnState describes the state of a SubConn.
type SubConnState struct {
// ConnectivityState is the connectivity state of the SubConn.
@ -361,6 +398,27 @@ type ClientConnState struct {
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")
// V2Balancer is defined for documentation purposes. If a Balancer also
// implements V2Balancer, its UpdateClientConnState method will be called
// instead of HandleResolvedAddrs and its UpdateSubConnState will be called
// instead of HandleSubConnStateChange.
type V2Balancer interface {
// UpdateClientConnState is called by gRPC when the state of the ClientConn
// changes. If the error returned is ErrBadResolverState, the ClientConn
// will begin calling ResolveNow on the active name resolver with
// exponential backoff until a subsequent call to UpdateClientConnState
// returns a nil error. Any other errors are currently ignored.
UpdateClientConnState(ClientConnState) error
// ResolverError is called by gRPC when the name resolver reports an error.
ResolverError(error)
// UpdateSubConnState is called by gRPC when the state of a SubConn
// changes.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//

View File

@ -19,6 +19,7 @@
package base
import (
"context"
"errors"
"google.golang.org/grpc/balancer"
@ -30,6 +31,7 @@ import (
type baseBuilder struct {
name string
pickerBuilder PickerBuilder
v2PickerBuilder V2PickerBuilder
config Config
}
@ -37,6 +39,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
bal := &baseBalancer{
cc: cc,
pickerBuilder: bb.pickerBuilder,
v2PickerBuilder: bb.v2PickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn),
scStates: make(map[balancer.SubConn]connectivity.State),
@ -48,6 +51,8 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
// may call UpdateState with this picker.
if bb.pickerBuilder != nil {
bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
} else {
bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
}
return bal
}
@ -56,9 +61,12 @@ func (bb *baseBuilder) Name() string {
return bb.name
}
var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
type baseBalancer struct {
cc balancer.ClientConn
pickerBuilder PickerBuilder
v2PickerBuilder V2PickerBuilder
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
@ -66,19 +74,27 @@ type baseBalancer struct {
subConns map[resolver.Address]balancer.SubConn
scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker
v2Picker balancer.V2Picker
config Config
}
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("not implemented")
}
func (b *baseBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
if b.picker != nil {
b.picker = NewErrPicker(err)
} else {
b.v2Picker = NewErrPickerV2(err)
}
}
}
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.Err (log if not nil) once implemented.
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
@ -105,7 +121,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.cc.RemoveSubConn(sc)
delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
// The entry will be deleted in HandleSubConnStateChange.
}
}
return nil
@ -117,17 +133,32 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// - built by the pickerBuilder with all READY SubConns otherwise.
func (b *baseBalancer) regeneratePicker(err error) {
if b.state == connectivity.TransientFailure {
if b.pickerBuilder != nil {
b.picker = NewErrPicker(balancer.ErrTransientFailure)
} else {
if err != nil {
b.picker = NewErrPicker(err)
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
} else {
// This means the last subchannel transition was not to
// TransientFailure (otherwise err must be set), but the
// aggregate state of the balancer is TransientFailure, meaning
// there are no other addresses.
b.picker = NewErrPicker(errors.New("resolver returned no addresses"))
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
}
}
return
}
if b.pickerBuilder != nil {
readySCs := make(map[resolver.Address]balancer.SubConn)
// Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
readySCs[addr] = sc
}
}
b.picker = b.pickerBuilder.Build(readySCs)
} else {
readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map.
@ -136,7 +167,12 @@ func (b *baseBalancer) regeneratePicker(err error) {
readySCs[sc] = SubConnInfo{Address: addr}
}
}
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
}
}
func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not implemented")
}
func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
@ -174,7 +210,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
b.regeneratePicker(state.ConnectionError)
}
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
if b.picker != nil {
b.cc.UpdateBalancerState(b.state, b.picker)
} else {
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
}
}
// Close is a nop because base balancer doesn't have internal state to clean up,
@ -182,20 +222,28 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
func (b *baseBalancer) Close() {
}
// NewErrPicker returns a Picker that always returns err on Pick().
// NewErrPicker returns a picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
}
// NewErrPickerV2 is temporarily defined for backward compatibility reasons.
//
// Deprecated: use NewErrPicker instead.
var NewErrPickerV2 = NewErrPicker
type errPicker struct {
err error // Pick() always returns this err.
}
func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
return nil, nil, p.err
}
// NewErrPickerV2 returns a V2Picker that always returns err on Pick().
func NewErrPickerV2(err error) balancer.V2Picker {
return &errPickerV2{err: err}
}
type errPickerV2 struct {
err error // Pick() always returns this err.
}
func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{}, p.err
}

View File

@ -37,8 +37,15 @@ import (
// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
// Build takes a slice of ready SubConns, and returns a picker that will be
// used by gRPC to pick a SubConn.
Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker
}
// V2PickerBuilder creates balancer.V2Picker.
type V2PickerBuilder interface {
// Build returns a picker that will be used by gRPC to pick a SubConn.
Build(info PickerBuildInfo) balancer.Picker
Build(info PickerBuildInfo) balancer.V2Picker
}
// PickerBuildInfo contains information needed by the picker builder to
@ -55,14 +62,20 @@ type SubConnInfo struct {
Address resolver.Address // the address used to create this SubConn
}
// NewBalancerBuilder returns a balancer builder. The balancers
// built by this builder will use the picker builder to build pickers.
func NewBalancerBuilder(name string, pb PickerBuilder) balancer.Builder {
return NewBalancerBuilderWithConfig(name, pb, Config{})
}
// Config contains the config info about the base balancer builder.
type Config struct {
// HealthCheck indicates whether health checking should be enabled for this specific balancer.
HealthCheck bool
}
// NewBalancerBuilder returns a base balancer builder configured by the provided config.
func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.Builder {
// NewBalancerBuilderWithConfig returns a base balancer builder configured by the provided config.
func NewBalancerBuilderWithConfig(name string, pb PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
pickerBuilder: pb,
@ -70,13 +83,11 @@ func NewBalancerBuilder(name string, pb PickerBuilder, config Config) balancer.B
}
}
// NewBalancerBuilderV2 is temporarily defined for backward compatibility
// reasons.
//
// Deprecated: use NewBalancerBuilder instead.
var NewBalancerBuilderV2 = NewBalancerBuilder
// V2PickerBuilder is temporarily defined for backward compatibility reasons.
//
// Deprecated: use PickerBuilder instead.
type V2PickerBuilder = PickerBuilder
// NewBalancerBuilderV2 returns a base balancer builder configured by the provided config.
func NewBalancerBuilderV2(name string, pb V2PickerBuilder, config Config) balancer.Builder {
return &baseBuilder{
name: name,
v2PickerBuilder: pb,
config: config,
}
}

View File

@ -163,6 +163,8 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
return lb
}
var _ balancer.V2Balancer = (*lbBalancer)(nil) // Assert that we implement V2Balancer
type lbBalancer struct {
cc *lbCacheClientConn
target string
@ -212,7 +214,7 @@ type lbBalancer struct {
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
picker balancer.V2Picker
// Support fallback to resolved backend addresses if there's no response
// from remote balancer within fallbackTimeout.
remoteBalancerConnected bool
@ -310,6 +312,10 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
return connectivity.TransientFailure
}
func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not used")
}
func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if grpclog.V(2) {
@ -387,6 +393,13 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
lb.mu.Unlock()
}
// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
// clientConn. The remoteLB clientConn will handle creating/removing remoteLB
// connections.
func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("not used")
}
func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
lb.mu.Lock()
defer lb.mu.Unlock()

View File

@ -172,7 +172,7 @@ func (p *lbPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
// If it's a drop, return an error and fail the RPC.
if s.Drop {
p.stats.drop(s.LoadBalanceToken)
return balancer.PickResult{}, balancer.DropRPCError(status.Errorf(codes.Unavailable, "request dropped by grpclb"))
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "request dropped by grpclb")
}
// If not a drop but there's no ready subConns.

View File

@ -35,7 +35,7 @@ const Name = "round_robin"
// newBuilder creates a new roundrobin balancer builder.
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
return base.NewBalancerBuilderV2(Name, &rrPickerBuilder{}, base.Config{HealthCheck: true})
}
func init() {
@ -44,10 +44,10 @@ func init() {
type rrPickerBuilder struct{}
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.V2Picker {
grpclog.Infof("roundrobinPicker: newPicker called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
return base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)
}
var scs []balancer.SubConn
for sc := range info.ReadySCs {

View File

@ -74,7 +74,11 @@ func (ccb *ccBalancerWrapper) watcher() {
}
ccb.balancerMu.Lock()
su := t.(*scStateUpdate)
ccb.balancer.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(su.sc, balancer.SubConnState{ConnectivityState: su.state, ConnectionError: su.err})
} else {
ccb.balancer.HandleSubConnStateChange(su.sc, su.state)
}
ccb.balancerMu.Unlock()
case <-ccb.done.Done():
}
@ -119,13 +123,19 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co
func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
return ccb.balancer.UpdateClientConnState(*ccs)
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
return ub.UpdateClientConnState(*ccs)
}
ccb.balancer.HandleResolvedAddrs(ccs.ResolverState.Addresses, nil)
return nil
}
func (ccb *ccBalancerWrapper) resolverError(err error) {
if ub, ok := ccb.balancer.(balancer.V2Balancer); ok {
ccb.balancerMu.Lock()
ccb.balancer.ResolverError(err)
ub.ResolverError(err)
ccb.balancerMu.Unlock()
}
}
func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
@ -163,6 +173,21 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
}
func (ccb *ccBalancerWrapper) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
if ccb.subConns == nil {
return
}
// Update picker before updating state. Even though the ordering here does
// not matter, it can lead to multiple calls of Pick in the common start-up
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(p)
ccb.cc.csMgr.updateState(s)
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.mu.Lock()
defer ccb.mu.Unlock()
@ -174,7 +199,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
// case where we wait for ready and then perform an RPC. If the picker is
// updated later, we could call the "connecting" picker when the state is
// updated, and then call the "ready" picker after the picker gets updated.
ccb.cc.blockingpicker.updatePicker(s.Picker)
ccb.cc.blockingpicker.updatePickerV2(s.Picker)
ccb.cc.csMgr.updateState(s.ConnectivityState)
}

View File

@ -23,19 +23,50 @@ import (
"testing"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
)
var _ balancer.V2Balancer = &funcBalancer{}
type funcBalancer struct {
updateClientConnState func(s balancer.ClientConnState) error
}
func (*funcBalancer) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {
panic("unimplemented") // v1 API
}
func (*funcBalancer) HandleResolvedAddrs([]resolver.Address, error) {
panic("unimplemented") // v1 API
}
func (b *funcBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
return b.updateClientConnState(s)
}
func (*funcBalancer) ResolverError(error) {}
func (*funcBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
panic("unimplemented") // we never have sub-conns
}
func (*funcBalancer) Close() {}
type funcBalancerBuilder struct {
name string
instance *funcBalancer
}
func (b *funcBalancerBuilder) Build(balancer.ClientConn, balancer.BuildOptions) balancer.Balancer {
return b.instance
}
func (b *funcBalancerBuilder) Name() string { return b.name }
// TestBalancerErrorResolverPolling injects balancer errors and verifies
// ResolveNow is called on the resolver with the appropriate backoff strategy
// being consulted between ResolveNow calls.
func (s) TestBalancerErrorResolverPolling(t *testing.T) {
// The test balancer will return ErrBadResolverState iff the
// ClientConnState contains no addresses.
bf := stub.BalancerFuncs{
UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error {
fb := &funcBalancer{
updateClientConnState: func(s balancer.ClientConnState) error {
if len(s.ResolverState.Addresses) == 0 {
return balancer.ErrBadResolverState
}
@ -43,7 +74,7 @@ func (s) TestBalancerErrorResolverPolling(t *testing.T) {
},
}
const balName = "BalancerErrorResolverPolling"
stub.Register(balName, bf)
balancer.Register(&funcBalancerBuilder{name: balName, instance: fb})
testResolverErrorPolling(t,
func(r *manual.Resolver) {

View File

@ -27,6 +27,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
@ -47,13 +48,9 @@ func (b *magicalLB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) ba
return b
}
func (b *magicalLB) ResolverError(error) {}
func (b *magicalLB) HandleSubConnStateChange(balancer.SubConn, connectivity.State) {}
func (b *magicalLB) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {}
func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {
return nil
}
func (b *magicalLB) HandleResolvedAddrs([]resolver.Address, error) {}
func (b *magicalLB) Close() {}
@ -61,21 +58,6 @@ func init() {
balancer.Register(&magicalLB{})
}
func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, func()) {
var servers []*server
for i := 0; i < numServers; i++ {
s := newTestServer()
servers = append(servers, s)
go s.start(t, 0, maxStreams)
s.wait(t, 2*time.Second)
}
return servers, func() {
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
}
func checkPickFirst(cc *ClientConn, servers []*server) error {
var (
req = "port"
@ -151,7 +133,7 @@ func (s) TestSwitchBalancer(t *testing.T) {
defer rcleanup()
const numServers = 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -183,7 +165,7 @@ func (s) TestBalancerDialOption(t *testing.T) {
defer rcleanup()
const numServers = 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}), WithBalancerName(roundrobin.Name))
@ -499,7 +481,7 @@ func (s) TestSwitchBalancerGRPCLBWithGRPCLBNotRegistered(t *testing.T) {
defer rcleanup()
const numServers = 3
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))

789
balancer_test.go Normal file
View File

@ -0,0 +1,789 @@
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"fmt"
"math"
"strconv"
"sync"
"testing"
"time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/status"
)
func pickFirstBalancerV1(r naming.Resolver) Balancer {
return &pickFirst{&roundRobin{r: r}}
}
type testWatcher struct {
// the channel to receives name resolution updates
update chan *naming.Update
// the side channel to get to know how many updates in a batch
side chan int
// the channel to notify update injector that the update reading is done
readDone chan int
}
func (w *testWatcher) Next() (updates []*naming.Update, err error) {
n := <-w.side
if n == 0 {
return nil, fmt.Errorf("w.side is closed")
}
for i := 0; i < n; i++ {
u := <-w.update
if u != nil {
updates = append(updates, u)
}
}
w.readDone <- 0
return
}
func (w *testWatcher) Close() {
close(w.side)
}
// Inject naming resolution updates to the testWatcher.
func (w *testWatcher) inject(updates []*naming.Update) {
w.side <- len(updates)
for _, u := range updates {
w.update <- u
}
<-w.readDone
}
type testNameResolver struct {
w *testWatcher
addr string
}
func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
r.w = &testWatcher{
update: make(chan *naming.Update, 1),
side: make(chan int, 1),
readDone: make(chan int),
}
r.w.side <- 1
r.w.update <- &naming.Update{
Op: naming.Add,
Addr: r.addr,
}
go func() {
<-r.w.readDone
}()
return r.w, nil
}
func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
var servers []*server
for i := 0; i < numServers; i++ {
s := newTestServer()
servers = append(servers, s)
go s.start(t, 0, maxStreams)
s.wait(t, 2*time.Second)
}
// Point to server[0]
addr := "localhost:" + servers[0].port
return servers, &testNameResolver{
addr: addr,
}, func() {
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
}
func (s) TestNameDiscovery(t *testing.T) {
// Start 2 servers on 2 ports.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
req := "port"
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Inject the name resolution change to remove servers[0] and add servers[1].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
})
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
// Loop until the rpcs in flight talks to servers[1].
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestEmptyAddrs(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
}
// Inject name resolution change to remove the server so that there is no address
// available after that.
u := &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until the above updates apply.
for {
time.Sleep(10 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
cancel()
break
}
cancel()
}
}
func (s) TestRoundRobin(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
// Add server2[2] to the service discovery.
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until both servers[2] are up.
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
break
}
time.Sleep(10 * time.Millisecond)
}
// Check the incoming RPCs served in a round-robin manner.
for i := 0; i < 10; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
}
}
}
func (s) TestCloseWithPendingRPC(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
// Loop until the above update applies.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
time.Sleep(10 * time.Millisecond)
cancel()
}
// Issue 2 RPCs which should be completed with error status once cc is closed.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
go func() {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
time.Sleep(5 * time.Millisecond)
cc.Close()
wg.Wait()
}
func (s) TestGetOnWaitChannel(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Remove all servers so that all upcoming RPCs will block on waitCh.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
for {
var reply string
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
cancel()
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
}()
// Add a connected server to get the above RPC through.
updates = []*naming.Update{{
Op: naming.Add,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
// Wait until the above RPC succeeds.
wg.Wait()
}
func (s) TestOneServerDown(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, kill server[0].
servers[0].stop()
wg.Done()
}()
// All non-failfast RPCs should not block because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true))
wg.Done()
}()
}
wg.Wait()
}
func (s) TestOneAddressRemoval(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, delete server[0].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
})
r.w.inject(updates)
wg.Done()
}()
// All non-failfast RPCs should not fail because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
var reply string
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
}
wg.Done()
}()
}
wg.Wait()
}
func checkServerUp(t *testing.T, currentServer *server) {
req := "port"
port := currentServer.port
cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
break
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestPickFirstEmptyAddrs(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
}
// Inject name resolution change to remove the server so that there is no address
// available after that.
u := &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until the above updates apply.
for {
time.Sleep(10 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
cancel()
break
}
cancel()
}
}
func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
// Loop until the above update applies.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
time.Sleep(10 * time.Millisecond)
cancel()
}
// Issue 2 RPCs which should be completed with error status once cc is closed.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
go func() {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
time.Sleep(5 * time.Millisecond)
cc.Close()
wg.Wait()
}
func (s) TestPickFirstOrderAllServerUp(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] and [2] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until all 3 servers are up
checkServerUp(t, servers[0])
checkServerUp(t, servers[1])
checkServerUp(t, servers[2])
// Check the incoming RPCs served in server[0]
req := "port"
var reply string
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[0] in the balancer, the incoming RPCs served in server[1]
// For test addrconn, close server[0] instead
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until it changes to server[1]
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// Add server[0] back to the balancer, the incoming RPCs served in server[1]
// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[1] in the balancer, the incoming RPCs served in server[2]
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
break
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[2] in the balancer, the incoming RPCs served in server[0]
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestPickFirstOrderOneServerDown(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] and [2] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until all 3 servers are up
checkServerUp(t, servers[0])
checkServerUp(t, servers[1])
checkServerUp(t, servers[2])
// Check the incoming RPCs served in server[0]
req := "port"
var reply string
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
// server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
// {server[0] server[1] server[2]}
servers[0].stop()
// Loop until it changes to server[1]
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// up the server[0] back, the incoming RPCs served in server[1]
p, _ := strconv.Atoi(servers[0].port)
servers[0] = newTestServer()
go servers[0].start(t, p, math.MaxUint32)
defer servers[0].stop()
servers[0].wait(t, 2*time.Second)
checkServerUp(t, servers[0])
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[1] in the balancer, the incoming RPCs served in server[0]
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestPickFirstOneAddressRemoval(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
// Create a new cc to Loop until servers[1] is up
checkServerUp(t, servers[0])
checkServerUp(t, servers[1])
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, delete server[0].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
})
r.w.inject(updates)
wg.Done()
}()
// All non-failfast RPCs should not fail because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
var reply string
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
}
wg.Done()
}()
}
wg.Wait()
}

334
balancer_v1_wrapper.go Normal file
View File

@ -0,0 +1,334 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
type balancerWrapperBuilder struct {
b Balancer // The v1 balancer.
}
func (bwb *balancerWrapperBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
bwb.b.Start(opts.Target.Endpoint, BalancerConfig{
DialCreds: opts.DialCreds,
Dialer: opts.Dialer,
})
_, pickfirst := bwb.b.(*pickFirst)
bw := &balancerWrapper{
balancer: bwb.b,
pickfirst: pickfirst,
cc: cc,
targetAddr: opts.Target.Endpoint,
startCh: make(chan struct{}),
conns: make(map[resolver.Address]balancer.SubConn),
connSt: make(map[balancer.SubConn]*scState),
csEvltr: &balancer.ConnectivityStateEvaluator{},
state: connectivity.Idle,
}
cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: bw})
go bw.lbWatcher()
return bw
}
func (bwb *balancerWrapperBuilder) Name() string {
return "wrapper"
}
type scState struct {
addr Address // The v1 address type.
s connectivity.State
down func(error)
}
type balancerWrapper struct {
balancer Balancer // The v1 balancer.
pickfirst bool
cc balancer.ClientConn
targetAddr string // Target without the scheme.
mu sync.Mutex
conns map[resolver.Address]balancer.SubConn
connSt map[balancer.SubConn]*scState
// This channel is closed when handling the first resolver result.
// lbWatcher blocks until this is closed, to avoid race between
// - NewSubConn is created, cc wants to notify balancer of state changes;
// - Build hasn't return, cc doesn't have access to balancer.
startCh chan struct{}
// To aggregate the connectivity state.
csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State
}
// lbWatcher watches the Notify channel of the balancer and manages
// connections accordingly.
func (bw *balancerWrapper) lbWatcher() {
<-bw.startCh
notifyCh := bw.balancer.Notify()
if notifyCh == nil {
// There's no resolver in the balancer. Connect directly.
a := resolver.Address{
Addr: bw.targetAddr,
Type: resolver.Backend,
}
sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
} else {
bw.mu.Lock()
bw.conns[a] = sc
bw.connSt[sc] = &scState{
addr: Address{Addr: bw.targetAddr},
s: connectivity.Idle,
}
bw.mu.Unlock()
sc.Connect()
}
return
}
for addrs := range notifyCh {
grpclog.Infof("balancerWrapper: got update addr from Notify: %v", addrs)
if bw.pickfirst {
var (
oldA resolver.Address
oldSC balancer.SubConn
)
bw.mu.Lock()
for oldA, oldSC = range bw.conns {
break
}
bw.mu.Unlock()
if len(addrs) <= 0 {
if oldSC != nil {
// Teardown old sc.
bw.mu.Lock()
delete(bw.conns, oldA)
delete(bw.connSt, oldSC)
bw.mu.Unlock()
bw.cc.RemoveSubConn(oldSC)
}
continue
}
var newAddrs []resolver.Address
for _, a := range addrs {
newAddr := resolver.Address{
Addr: a.Addr,
Type: resolver.Backend, // All addresses from balancer are all backends.
ServerName: "",
Metadata: a.Metadata,
}
newAddrs = append(newAddrs, newAddr)
}
if oldSC == nil {
// Create new sc.
sc, err := bw.cc.NewSubConn(newAddrs, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", newAddrs, err)
} else {
bw.mu.Lock()
// For pickfirst, there should be only one SubConn, so the
// address doesn't matter. All states updating (up and down)
// and picking should all happen on that only SubConn.
bw.conns[resolver.Address{}] = sc
bw.connSt[sc] = &scState{
addr: addrs[0], // Use the first address.
s: connectivity.Idle,
}
bw.mu.Unlock()
sc.Connect()
}
} else {
bw.mu.Lock()
bw.connSt[oldSC].addr = addrs[0]
bw.mu.Unlock()
oldSC.UpdateAddresses(newAddrs)
}
} else {
var (
add []resolver.Address // Addresses need to setup connections.
del []balancer.SubConn // Connections need to tear down.
)
resAddrs := make(map[resolver.Address]Address)
for _, a := range addrs {
resAddrs[resolver.Address{
Addr: a.Addr,
Type: resolver.Backend, // All addresses from balancer are all backends.
ServerName: "",
Metadata: a.Metadata,
}] = a
}
bw.mu.Lock()
for a := range resAddrs {
if _, ok := bw.conns[a]; !ok {
add = append(add, a)
}
}
for a, c := range bw.conns {
if _, ok := resAddrs[a]; !ok {
del = append(del, c)
delete(bw.conns, a)
// Keep the state of this sc in bw.connSt until its state becomes Shutdown.
}
}
bw.mu.Unlock()
for _, a := range add {
sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
} else {
bw.mu.Lock()
bw.conns[a] = sc
bw.connSt[sc] = &scState{
addr: resAddrs[a],
s: connectivity.Idle,
}
bw.mu.Unlock()
sc.Connect()
}
}
for _, c := range del {
bw.cc.RemoveSubConn(c)
}
}
}
}
func (bw *balancerWrapper) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
bw.mu.Lock()
defer bw.mu.Unlock()
scSt, ok := bw.connSt[sc]
if !ok {
return
}
if s == connectivity.Idle {
sc.Connect()
}
oldS := scSt.s
scSt.s = s
if oldS != connectivity.Ready && s == connectivity.Ready {
scSt.down = bw.balancer.Up(scSt.addr)
} else if oldS == connectivity.Ready && s != connectivity.Ready {
if scSt.down != nil {
scSt.down(errConnClosing)
}
}
sa := bw.csEvltr.RecordTransition(oldS, s)
if bw.state != sa {
bw.state = sa
}
bw.cc.UpdateState(balancer.State{ConnectivityState: bw.state, Picker: bw})
if s == connectivity.Shutdown {
// Remove state for this sc.
delete(bw.connSt, sc)
}
}
func (bw *balancerWrapper) HandleResolvedAddrs([]resolver.Address, error) {
bw.mu.Lock()
defer bw.mu.Unlock()
select {
case <-bw.startCh:
default:
close(bw.startCh)
}
// There should be a resolver inside the balancer.
// All updates here, if any, are ignored.
}
func (bw *balancerWrapper) Close() {
bw.mu.Lock()
defer bw.mu.Unlock()
select {
case <-bw.startCh:
default:
close(bw.startCh)
}
bw.balancer.Close()
}
// The picker is the balancerWrapper itself.
// It either blocks or returns error, consistent with v1 balancer Get().
func (bw *balancerWrapper) Pick(info balancer.PickInfo) (result balancer.PickResult, err error) {
failfast := true // Default failfast is true.
if ss, ok := rpcInfoFromContext(info.Ctx); ok {
failfast = ss.failfast
}
a, p, err := bw.balancer.Get(info.Ctx, BalancerGetOptions{BlockingWait: !failfast})
if err != nil {
return balancer.PickResult{}, toRPCErr(err)
}
if p != nil {
result.Done = func(balancer.DoneInfo) { p() }
defer func() {
if err != nil {
p()
}
}()
}
bw.mu.Lock()
defer bw.mu.Unlock()
if bw.pickfirst {
// Get the first sc in conns.
for _, result.SubConn = range bw.conns {
return result, nil
}
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
var ok1 bool
result.SubConn, ok1 = bw.conns[resolver.Address{
Addr: a.Addr,
Type: resolver.Backend,
ServerName: "",
Metadata: a.Metadata,
}]
s, ok2 := bw.connSt[result.SubConn]
if !ok1 || !ok2 {
// This can only happen due to a race where Get() returned an address
// that was subsequently removed by Notify. In this case we should
// retry always.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
switch s.s {
case connectivity.Ready, connectivity.Idle:
return result, nil
case connectivity.Shutdown, connectivity.TransientFailure:
// If the returned sc has been shut down or is in transient failure,
// return error, and this RPC will fail or wait for another picker (if
// non-failfast).
return balancer.PickResult{}, balancer.ErrTransientFailure
default:
// For other states (connecting or unknown), the v1 balancer would
// traditionally wait until ready and then issue the RPC. Returning
// ErrNoSubConnAvailable will be a slight improvement in that it will
// allow the balancer to choose another address in case others are
// connected.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
}

View File

@ -68,6 +68,8 @@ var (
errConnDrain = errors.New("grpc: the connection is drained")
// errConnClosing indicates that the connection is closing.
errConnClosing = errors.New("grpc: the connection is closing")
// errBalancerClosed indicates that the balancer is closed.
errBalancerClosed = errors.New("grpc: balancer is closed")
// invalidDefaultServiceConfigErrPrefix is used to prefix the json parsing error for the default
// service config.
invalidDefaultServiceConfigErrPrefix = "grpc: the provided default service config is invalid"

View File

@ -449,9 +449,9 @@ type stateRecordingBalancer struct {
balancer.Balancer
}
func (b *stateRecordingBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.notifier <- s.ConnectivityState
b.Balancer.UpdateSubConnState(sc, s)
func (b *stateRecordingBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
b.notifier <- s
b.Balancer.HandleSubConnStateChange(sc, s)
}
func (b *stateRecordingBalancer) ResetNotifier(r chan<- connectivity.State) {

View File

@ -36,11 +36,21 @@ import (
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/testdata"
)
func assertState(wantState connectivity.State, cc *ClientConn) (connectivity.State, bool) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
var state connectivity.State
for state = cc.GetState(); state != wantState && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
}
return state, state == wantState
}
func (s) TestDialWithTimeout(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
@ -350,6 +360,42 @@ func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {
}
func (s) TestConnectivityStates(t *testing.T) {
servers, resolver, cleanup := startServers(t, 2, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(resolver)), WithInsecure())
if err != nil {
t.Fatalf("Dial(\"foo.bar.com\", WithBalancer(_)) = _, %v, want _ <nil>", err)
}
defer cc.Close()
wantState := connectivity.Ready
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
// Send an update to delete the server connection (tearDown addrConn).
update := []*naming.Update{
{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
},
}
resolver.w.inject(update)
wantState = connectivity.TransientFailure
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
update[0] = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
resolver.w.inject(update)
wantState = connectivity.Ready
if state, ok := assertState(wantState, cc); !ok {
t.Fatalf("asserState(%s) = %s, false, want %s, true", wantState, state, wantState)
}
}
func (s) TestWithTimeout(t *testing.T) {
conn, err := Dial("passthrough:///Non-Existent.Server:80", WithTimeout(time.Millisecond), WithBlock(), WithInsecure())
if err == nil {
@ -545,6 +591,43 @@ func (s) TestDialContextFailFast(t *testing.T) {
}
}
// blockingBalancer mimics the behavior of balancers whose initialization takes a long time.
// In this test, reading from blockingBalancer.Notify() blocks forever.
type blockingBalancer struct {
ch chan []Address
}
func newBlockingBalancer() Balancer {
return &blockingBalancer{ch: make(chan []Address)}
}
func (b *blockingBalancer) Start(target string, config BalancerConfig) error {
return nil
}
func (b *blockingBalancer) Up(addr Address) func(error) {
return nil
}
func (b *blockingBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
return Address{}, nil, nil
}
func (b *blockingBalancer) Notify() <-chan []Address {
return b.ch
}
func (b *blockingBalancer) Close() error {
close(b.ch)
return nil
}
func (s) TestDialWithBlockingBalancer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
dialDone := make(chan struct{})
go func() {
DialContext(ctx, "Non-Existent.Server:80", WithBlock(), WithInsecure(), WithBalancer(newBlockingBalancer()))
close(dialDone)
}()
cancel()
<-dialDone
}
// securePerRPCCredentials always requires transport security.
type securePerRPCCredentials struct{}
@ -640,6 +723,50 @@ func (s) TestConnectParamsWithMinConnectTimeout(t *testing.T) {
}
}
// emptyBalancer returns an empty set of servers.
type emptyBalancer struct {
ch chan []Address
}
func newEmptyBalancer() Balancer {
return &emptyBalancer{ch: make(chan []Address, 1)}
}
func (b *emptyBalancer) Start(_ string, _ BalancerConfig) error {
b.ch <- nil
return nil
}
func (b *emptyBalancer) Up(_ Address) func(error) {
return nil
}
func (b *emptyBalancer) Get(_ context.Context, _ BalancerGetOptions) (Address, func(), error) {
return Address{}, nil, nil
}
func (b *emptyBalancer) Notify() <-chan []Address {
return b.ch
}
func (b *emptyBalancer) Close() error {
close(b.ch)
return nil
}
func (s) TestNonblockingDialWithEmptyBalancer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dialDone := make(chan error)
go func() {
dialDone <- func() error {
conn, err := DialContext(ctx, "Non-Existent.Server:80", WithInsecure(), WithBalancer(newEmptyBalancer()))
if err != nil {
return err
}
return conn.Close()
}()
}()
if err := <-dialDone; err != nil {
t.Fatalf("unexpected error dialing connection: %s", err)
}
}
func (s) TestResolverServiceConfigBeforeAddressNotPanic(t *testing.T) {
r, rcleanup := manual.GenerateAndRegisterManualResolver()
defer rcleanup()

View File

@ -199,6 +199,19 @@ func WithDecompressor(dc Decompressor) DialOption {
})
}
// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
// Name resolver will be ignored if this DialOption is specified.
//
// Deprecated: use the new balancer APIs in balancer package and
// WithBalancerName. Will be removed in a future 1.x release.
func WithBalancer(b Balancer) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = &balancerWrapperBuilder{
b: b,
}
})
}
// WithBalancerName sets the balancer that the ClientConn will be initialized
// with. Balancer registered with balancerName will be used. This function
// panics if no balancer was registered by balancerName.

View File

@ -1,94 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package stub implements a balancer for testing purposes.
package stub
import "google.golang.org/grpc/balancer"
// BalancerFuncs contains all balancer.Balancer functions with a preceding
// *BalancerData parameter for passing additional instance information. Any
// nil functions will never be called.
type BalancerFuncs struct {
// Init is called after ClientConn and BuildOptions are set in
// BalancerData. It may be used to initialize BalancerData.Data.
Init func(*BalancerData)
UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState)
Close func(*BalancerData)
}
// BalancerData contains data relevant to a stub balancer.
type BalancerData struct {
// ClientConn is set by the builder.
ClientConn balancer.ClientConn
// BuildOptions is set by the builder.
BuildOptions balancer.BuildOptions
// Data may be used to store arbitrary user data.
Data interface{}
}
type bal struct {
bf BalancerFuncs
bd *BalancerData
}
func (b *bal) UpdateClientConnState(c balancer.ClientConnState) error {
if b.bf.UpdateClientConnState != nil {
return b.bf.UpdateClientConnState(b.bd, c)
}
return nil
}
func (b *bal) ResolverError(e error) {
if b.bf.ResolverError != nil {
b.bf.ResolverError(b.bd, e)
}
}
func (b *bal) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if b.bf.UpdateSubConnState != nil {
b.bf.UpdateSubConnState(b.bd, sc, scs)
}
}
func (b *bal) Close() {
if b.bf.Close != nil {
b.bf.Close(b.bd)
}
}
type bb struct {
name string
bf BalancerFuncs
}
func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
b := &bal{bf: bb.bf, bd: &BalancerData{ClientConn: cc, BuildOptions: opts}}
if b.bf.Init != nil {
b.bf.Init(b.bd)
}
return b
}
func (bb bb) Name() string { return bb.name }
// Register registers a stub balancer builder which will call the provided
// functions. The name used should be unique.
func Register(name string, bf BalancerFuncs) {
balancer.Register(bb{name: name, bf: bf})
}

View File

@ -20,6 +20,7 @@ package grpc
import (
"context"
"fmt"
"io"
"sync"
@ -31,13 +32,33 @@ import (
"google.golang.org/grpc/status"
)
// v2PickerWrapper wraps a balancer.Picker while providing the
// balancer.V2Picker API. It requires a pickerWrapper to generate errors
// including the latest connectionError. To be deleted when balancer.Picker is
// updated to the balancer.V2Picker API.
type v2PickerWrapper struct {
picker balancer.Picker
connErr *connErr
}
func (v *v2PickerWrapper) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
sc, done, err := v.picker.Pick(info.Ctx, info)
if err != nil {
if err == balancer.ErrTransientFailure {
return balancer.PickResult{}, balancer.TransientFailureError(fmt.Errorf("%v, latest connection error: %v", err, v.connErr.connectionError()))
}
return balancer.PickResult{}, err
}
return balancer.PickResult{SubConn: sc, Done: done}, nil
}
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
blockingCh chan struct{}
picker balancer.Picker
picker balancer.V2Picker
// The latest connection error. TODO: remove when V1 picker is deprecated;
// balancer should be responsible for providing the error.
@ -68,6 +89,11 @@ func newPickerWrapper() *pickerWrapper {
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
pw.updatePickerV2(&v2PickerWrapper{picker: p, connErr: pw.connErr})
}
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
func (pw *pickerWrapper) updatePickerV2(p balancer.V2Picker) {
pw.mu.Lock()
if pw.done {
pw.mu.Unlock()
@ -154,19 +180,19 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if err == balancer.ErrNoSubConnAvailable {
continue
}
if drop, ok := err.(interface{ DropRPC() bool }); ok && drop.DropRPC() {
// End the RPC unconditionally. If not a status error, Convert
// will transform it into one with code Unknown.
return nil, nil, status.Convert(err).Err()
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail.
if tfe, ok := err.(interface{ IsTransientFailure() bool }); ok && tfe.IsTransientFailure() {
if !failfast {
lastPickErr = err
continue
}
return nil, nil, status.Error(codes.Unavailable, err.Error())
}
if _, ok := status.FromError(err); ok {
return nil, nil, err
}
// err is some other error.
return nil, nil, status.Error(codes.Unknown, err.Error())
}
acw, ok := pickResult.SubConn.(*acBalancerWrapper)
if !ok {

View File

@ -55,14 +55,14 @@ type testingPicker struct {
maxCalled int64
}
func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
func (p *testingPicker) Pick(ctx context.Context, info balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
if atomic.AddInt64(&p.maxCalled, -1) < 0 {
return balancer.PickResult{}, fmt.Errorf("pick called to many times (> goroutineCount)")
return nil, nil, fmt.Errorf("pick called to many times (> goroutineCount)")
}
if p.err != nil {
return balancer.PickResult{}, p.err
return nil, nil, p.err
}
return balancer.PickResult{SubConn: p.sc}, nil
return p.sc, nil, nil
}
func (s) TestBlockingPickTimeout(t *testing.T) {

View File

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)
@ -51,6 +52,20 @@ type pickfirstBalancer struct {
sc balancer.SubConn
}
var _ balancer.V2Balancer = &pickfirstBalancer{} // Assert we implement v2
func (b *pickfirstBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
if err != nil {
b.ResolverError(err)
return
}
b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}}) // Ignore error
}
func (b *pickfirstBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: s})
}
func (b *pickfirstBalancer) ResolverError(err error) {
switch b.state {
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
@ -114,9 +129,15 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}})
case connectivity.TransientFailure:
err := balancer.ErrTransientFailure
// TODO: this can be unconditional after the V1 API is removed, as
// SubConnState will always contain a connection error.
if s.ConnectionError != nil {
err = balancer.TransientFailureError(s.ConnectionError)
}
b.cc.UpdateState(balancer.State{
ConnectivityState: s.ConnectivityState,
Picker: &picker{err: s.ConnectionError},
Picker: &picker{err: err},
})
}
}

View File

@ -43,7 +43,7 @@ func (s) TestOneBackendPickfirst(t *testing.T) {
defer rcleanup()
numServers := 1
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -76,7 +76,7 @@ func (s) TestBackendsPickfirst(t *testing.T) {
defer rcleanup()
numServers := 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -109,7 +109,7 @@ func (s) TestNewAddressWhileBlockingPickfirst(t *testing.T) {
defer rcleanup()
numServers := 1
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -145,7 +145,7 @@ func (s) TestCloseWithPendingRPCPickfirst(t *testing.T) {
defer rcleanup()
numServers := 1
_, scleanup := startServers(t, numServers, math.MaxInt32)
_, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -181,7 +181,7 @@ func (s) TestOneServerDownPickfirst(t *testing.T) {
defer rcleanup()
numServers := 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -222,7 +222,7 @@ func (s) TestAllServersDownPickfirst(t *testing.T) {
defer rcleanup()
numServers := 2
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))
@ -265,7 +265,7 @@ func (s) TestAddressesRemovedPickfirst(t *testing.T) {
defer rcleanup()
numServers := 3
servers, scleanup := startServers(t, numServers, math.MaxInt32)
servers, _, scleanup := startServers(t, numServers, math.MaxInt32)
defer scleanup()
cc, err := Dial(r.Scheme()+":///test.server", WithInsecure(), WithCodec(testCodec{}))

View File

@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
@ -185,12 +184,12 @@ const happyBalancerName = "happy balancer"
func init() {
// Register a balancer that never returns an error from
// UpdateClientConnState, and doesn't do anything else either.
bf := stub.BalancerFuncs{
UpdateClientConnState: func(*stub.BalancerData, balancer.ClientConnState) error {
fb := &funcBalancer{
updateClientConnState: func(s balancer.ClientConnState) error {
return nil
},
}
stub.Register(happyBalancerName, bf)
balancer.Register(&funcBalancerBuilder{name: happyBalancerName, instance: fb})
}
// TestResolverErrorPolling injects resolver errors and verifies ResolveNow is

View File

@ -20,17 +20,12 @@ package test
import (
"context"
"errors"
"fmt"
"net"
"reflect"
"testing"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
@ -39,7 +34,6 @@ import (
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/status"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/testdata"
)
@ -49,7 +43,7 @@ const testBalancerName = "testbalancer"
// testBalancer creates one subconn with the first address from resolved
// addresses.
//
// It's used to test options for NewSubConn are applied correctly.
// It's used to test options for NewSubConn are applies correctly.
type testBalancer struct {
cc balancer.ClientConn
sc balancer.SubConn
@ -68,43 +62,37 @@ func (*testBalancer) Name() string {
return testBalancerName
}
func (*testBalancer) ResolverError(err error) {
panic("not implemented")
}
func (b *testBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
func (b *testBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
// Only create a subconn at the first time.
if b.sc == nil {
var err error
b.sc, err = b.cc.NewSubConn(state.ResolverState.Addresses, b.newSubConnOptions)
if err == nil && b.sc == nil {
b.sc, err = b.cc.NewSubConn(addrs, b.newSubConnOptions)
if err != nil {
grpclog.Errorf("testBalancer: failed to NewSubConn: %v", err)
return nil
return
}
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &picker{sc: b.sc, bal: b}})
b.sc.Connect()
}
return nil
}
func (b *testBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
grpclog.Infof("testBalancer: UpdateSubConnState: %p, %v", sc, s)
func (b *testBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
grpclog.Infof("testBalancer: HandleSubConnStateChange: %p, %v", sc, s)
if b.sc != sc {
grpclog.Infof("testBalancer: ignored state change because sc is not recognized")
return
}
if s.ConnectivityState == connectivity.Shutdown {
if s == connectivity.Shutdown {
b.sc = nil
return
}
switch s.ConnectivityState {
switch s {
case connectivity.Ready, connectivity.Idle:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{sc: sc, bal: b}})
b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{sc: sc, bal: b}})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}})
b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrNoSubConnAvailable, bal: b}})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}})
b.cc.UpdateState(balancer.State{ConnectivityState: s, Picker: &picker{err: balancer.ErrTransientFailure, bal: b}})
}
}
@ -291,10 +279,6 @@ func newTestBalancerKeepAddresses() *testBalancerKeepAddresses {
}
}
func (testBalancerKeepAddresses) ResolverError(err error) {
panic("not implemented")
}
func (b *testBalancerKeepAddresses) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return b
}
@ -303,12 +287,11 @@ func (*testBalancerKeepAddresses) Name() string {
return testBalancerKeepAddressesName
}
func (b *testBalancerKeepAddresses) UpdateClientConnState(state balancer.ClientConnState) error {
b.addrsChan <- state.ResolverState.Addresses
return nil
func (b *testBalancerKeepAddresses) HandleResolvedAddrs(addrs []resolver.Address, err error) {
b.addrsChan <- addrs
}
func (testBalancerKeepAddresses) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
func (testBalancerKeepAddresses) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not used")
}
@ -363,220 +346,3 @@ func (s) TestNonGRPCLBBalancerGetsNoGRPCLBAddress(t *testing.T) {
t.Fatalf("With both backend and grpclb addresses, balancer got addresses %v, want %v", got, nonGRPCLBAddresses)
}
}
// TestServersSwap creates two servers and verifies the client switches between
// them when the name resolver reports the first and then the second.
func (s) TestServersSwap(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Initialize servers
reg := func(username string) (addr string, cleanup func()) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: username}, nil
},
}
testpb.RegisterTestServiceServer(s, ts)
go s.Serve(lis)
return lis.Addr().String(), s.Stop
}
const one = "1"
addr1, cleanup := reg(one)
defer cleanup()
const two = "2"
addr2, cleanup := reg(two)
defer cleanup()
// Initialize client
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: addr1}}})
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer cc.Close()
client := testpb.NewTestServiceClient(cc)
// Confirm we are connected to the first server
if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}
// Update resolver to report only the second server
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: addr2}}})
// Loop until new RPCs talk to server two.
for i := 0; i < 2000; i++ {
if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
} else if res.Username == two {
break // pass
}
time.Sleep(5 * time.Millisecond)
}
}
// TestEmptyAddrs verifies client behavior when a working connection is
// removed. In pick first, it will continue using the old connection but in
// round robin it will not.
func (s) TestEmptyAddrs(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Initialize server
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceServer(s, ts)
go s.Serve(lis)
// Initialize pickfirst client
pfr, cleanup := manual.GenerateAndRegisterManualResolver()
rn := make(chan struct{})
pfr.ResolveNowCallback = func(resolver.ResolveNowOptions) {
rn <- struct{}{}
}
defer cleanup()
pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithInsecure())
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer pfcc.Close()
pfclient := testpb.NewTestServiceClient(pfcc)
// Confirm we are connected to the server
if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}
// Remove all addresses.
pfr.UpdateState(resolver.State{})
// Wait for a ResolveNow call on the pick first client's resolver.
select {
case <-rn:
case <-ctx.Done():
t.Fatalf("ResolveNow() never invoked after providing empty addresses")
}
// Initialize roundrobin client
rrr, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithInsecure(),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer rrcc.Close()
rrclient := testpb.NewTestServiceClient(rrcc)
// Confirm we are connected to the server
if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one {
t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}
// Remove all addresses.
rrr.UpdateState(resolver.State{})
// Confirm RPCs eventually fail on round robin.
for {
if _, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
break
}
time.Sleep(5 * time.Millisecond)
}
if ctx.Err() != nil {
t.Fatalf("round robin client did not fail after 5 seconds")
}
// Confirm several new RPCs succeed on pick first. Because rr failed
// before this, but we initialized and updated pf first, we should not need
// to check this success case for as long.
for i := 0; i < 10; i++ {
if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
}
time.Sleep(5 * time.Millisecond)
}
}
func (s) TestWaitForReady(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Initialize server
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceServer(s, ts)
go s.Serve(lis)
// Initialize client
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithInsecure())
if err != nil {
t.Fatalf("Error creating client: %v", err)
}
defer cc.Close()
client := testpb.NewTestServiceClient(cc)
// Report an error so non-WFR RPCs will give up early.
r.CC.ReportError(errors.New("fake resolver error"))
// Ensure the client is not connected to anything and fails non-WFR RPCs.
if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); status.Code(err) != codes.Unavailable {
t.Fatalf("UnaryCall(_) = %v, %v; want _, Code()=%v", res, err, codes.Unavailable)
}
errChan := make(chan error, 1)
go func() {
if res, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}, grpc.WaitForReady(true)); err != nil || res.Username != one {
errChan <- fmt.Errorf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one)
}
close(errChan)
}()
select {
case err := <-errChan:
t.Errorf("unexpected receive from errChan before addresses provided")
t.Fatal(err.Error())
case <-time.After(5 * time.Millisecond):
}
// Resolve the server. The WFR RPC should unblock and use it.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
if err := <-errChan; err != nil {
t.Fatal(err.Error())
}
}

View File

@ -66,7 +66,7 @@ func (c *testCredsBundle) NewWithMode(mode string) (credentials.Bundle, error) {
}
func (s) TestCredsBundleBoth(t *testing.T) {
te := newTest(t, env{name: "creds-bundle", network: "tcp", security: "empty"})
te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: "v1", security: "empty"})
te.tapHandle = authHandle
te.customDialOptions = []grpc.DialOption{
grpc.WithCredentialsBundle(&testCredsBundle{t: t}),
@ -89,7 +89,7 @@ func (s) TestCredsBundleBoth(t *testing.T) {
}
func (s) TestCredsBundleTransportCredentials(t *testing.T) {
te := newTest(t, env{name: "creds-bundle", network: "tcp", security: "empty"})
te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: "v1", security: "empty"})
te.customDialOptions = []grpc.DialOption{
grpc.WithCredentialsBundle(&testCredsBundle{t: t, mode: bundleTLSOnly}),
}
@ -111,7 +111,7 @@ func (s) TestCredsBundleTransportCredentials(t *testing.T) {
}
func (s) TestCredsBundlePerRPCCredentials(t *testing.T) {
te := newTest(t, env{name: "creds-bundle", network: "tcp", security: "empty"})
te := newTest(t, env{name: "creds-bundle", network: "tcp", balancer: "v1", security: "empty"})
te.tapHandle = authHandle
te.customDialOptions = []grpc.DialOption{
grpc.WithCredentialsBundle(&testCredsBundle{t: t, mode: bundlePerRPCOnly}),

View File

@ -50,6 +50,7 @@ import (
"golang.org/x/net/http2/hpack"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
@ -420,7 +421,7 @@ type env struct {
network string // The type of network such as tcp, unix, etc.
security string // The security protocol such as TLS, SSH, etc.
httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS
balancer string // One of "round_robin", "pick_first", or "".
balancer string // One of "round_robin", "pick_first", "v1", or "".
customDialer func(string, string, time.Duration) (net.Conn, error)
}
@ -439,8 +440,8 @@ func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
}
var (
tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp"}
tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls"}
tcpClearEnv = env{name: "tcp-clear-v1-balancer", network: "tcp", balancer: "v1"}
tcpTLSEnv = env{name: "tcp-tls-v1-balancer", network: "tcp", security: "tls", balancer: "v1"}
tcpClearRREnv = env{name: "tcp-clear", network: "tcp", balancer: "round_robin"}
tcpTLSRREnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: "round_robin"}
handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: "round_robin"}
@ -837,8 +838,11 @@ func (te *test) configDial(opts ...grpc.DialOption) ([]grpc.DialOption, string)
} else {
scheme = te.resolverScheme + ":///"
}
if te.e.balancer != "" {
opts = append(opts, grpc.WithBalancerName(te.e.balancer))
switch te.e.balancer {
case "v1":
opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil)))
case "round_robin":
opts = append(opts, grpc.WithBalancerName(roundrobin.Name))
}
if te.clientInitialWindowSize > 0 {
opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize))
@ -4762,7 +4766,7 @@ func (c *clientTimeoutCreds) OverrideServerName(s string) error {
}
func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds"})
te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "clientTimeoutCreds", balancer: "v1"})
te.userAgent = testAppUA
te.startServer(&testServer{security: te.e.security})
defer te.tearDown()

5
vet.sh
View File

@ -121,11 +121,14 @@ staticcheck -go 1.9 -checks 'inherit,-ST1015,-SA6002' ./... > "${SC_OUT}" || tru
# Error if anything other than deprecation warnings are printed.
(! grep -v "is deprecated:.*SA1019" "${SC_OUT}")
# Only ignore the following deprecated types/fields/functions.
(! grep -Fv '.HeaderMap
(! grep -Fv '.HandleResolvedAddrs
.HandleSubConnStateChange
.HeaderMap
.NewAddress
.NewServiceConfig
.Metadata is deprecated: use Attributes
.Type is deprecated: use Attributes
.UpdateBalancerState
balancer.Picker
grpc.CallCustomCodec
grpc.Code

View File

@ -25,6 +25,7 @@ import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/resolver"
@ -45,7 +46,7 @@ var (
// newEDSBalancer is a helper function to build a new edsBalancer and will be
// overridden in unittests.
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer {
builder := balancer.Get(edsName)
if builder == nil {
grpclog.Errorf("xds: no balancer builder with name %v", edsName)
@ -54,7 +55,7 @@ var (
// We directly pass the parent clientConn to the
// underlying edsBalancer because the cdsBalancer does
// not deal with subConns.
return builder.Build(cc, opts)
return builder.Build(cc, opts).(balancer.V2Balancer)
}
)
@ -149,7 +150,7 @@ type cdsBalancer struct {
updateCh *buffer.Unbounded
client xdsClientInterface
cancelWatch func()
edsLB balancer.Balancer
edsLB balancer.V2Balancer
clusterToWatch string
// The only thing protected by this mutex is the closed boolean. This is
@ -339,3 +340,11 @@ func (b *cdsBalancer) isClosed() bool {
b.mu.Unlock()
return closed
}
func (b *cdsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}
func (b *cdsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
grpclog.Error("UpdateClientConnState should be called instead of HandleResolvedAddrs")
}

View File

@ -209,11 +209,11 @@ func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.Clie
func setup() (*cdsBalancer, *testEDSBalancer, func()) {
builder := cdsBB{}
tcc := &testClientConn{}
cdsB := builder.Build(tcc, balancer.BuildOptions{})
cdsB := builder.Build(tcc, balancer.BuildOptions{}).(balancer.V2Balancer)
edsB := newTestEDSBalancer()
oldEDSBalancerBuilder := newEDSBalancer
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
newEDSBalancer = func(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.V2Balancer {
return edsB
}

View File

@ -66,6 +66,10 @@ type subBalancerWithConfig struct {
balancer balancer.Balancer
}
func (sbc *subBalancerWithConfig) UpdateBalancerState(state connectivity.State, picker balancer.Picker) {
grpclog.Fatalln("not implemented")
}
// UpdateState overrides balancer.ClientConn, to keep state and picker.
func (sbc *subBalancerWithConfig) UpdateState(state balancer.State) {
sbc.mu.Lock()
@ -91,7 +95,11 @@ func (sbc *subBalancerWithConfig) updateBalancerStateWithCachedPicker() {
func (sbc *subBalancerWithConfig) startBalancer() {
b := sbc.builder.Build(sbc, balancer.BuildOptions{})
sbc.balancer = b
b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: sbc.addrs}})
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: sbc.addrs}})
} else {
b.HandleResolvedAddrs(sbc.addrs, nil)
}
}
func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
@ -103,7 +111,11 @@ func (sbc *subBalancerWithConfig) handleSubConnStateChange(sc balancer.SubConn,
// happen.
return
}
b.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state})
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: state})
} else {
b.HandleSubConnStateChange(sc, state)
}
}
func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) {
@ -120,7 +132,11 @@ func (sbc *subBalancerWithConfig) updateAddrs(addrs []resolver.Address) {
// it's the lower priority, but it can still get address updates.
return
}
b.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}})
if ub, ok := b.(balancer.V2Balancer); ok {
ub.UpdateClientConnState(balancer.ClientConnState{ResolverState: resolver.State{Addresses: addrs}})
} else {
b.HandleResolvedAddrs(addrs, nil)
}
}
func (sbc *subBalancerWithConfig) stopBalancer() {
@ -130,7 +146,7 @@ func (sbc *subBalancerWithConfig) stopBalancer() {
type pickerState struct {
weight uint32
picker balancer.Picker
picker balancer.V2Picker
state connectivity.State
}
@ -541,7 +557,7 @@ func buildPickerAndState(m map[internal.Locality]*pickerState) balancer.State {
aggregatedState = connectivity.TransientFailure
}
if aggregatedState == connectivity.TransientFailure {
return balancer.State{aggregatedState, base.NewErrPicker(balancer.ErrTransientFailure)}
return balancer.State{aggregatedState, base.NewErrPickerV2(balancer.ErrTransientFailure)}
}
return balancer.State{aggregatedState, newPickerGroup(readyPickerWithWeights)}
}
@ -577,7 +593,7 @@ func (pg *pickerGroup) Pick(info balancer.PickInfo) (balancer.PickResult, error)
if pg.length <= 0 {
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
p := pg.w.Next().(balancer.Picker)
p := pg.w.Next().(balancer.V2Picker)
return p.Pick(info)
}
@ -587,13 +603,13 @@ const (
)
type loadReportPicker struct {
p balancer.Picker
p balancer.V2Picker
id internal.Locality
loadStore lrs.Store
}
func newLoadReportPicker(p balancer.Picker, id internal.Locality, loadStore lrs.Store) *loadReportPicker {
func newLoadReportPicker(p balancer.V2Picker, id internal.Locality, loadStore lrs.Store) *loadReportPicker {
return &loadReportPicker{
p: p,
id: id,

View File

@ -48,7 +48,7 @@ func init() {
defaultSubBalancerCloseTimeout = time.Millisecond
}
func subConnFromPicker(p balancer.Picker) func() balancer.SubConn {
func subConnFromPicker(p balancer.V2Picker) func() balancer.SubConn {
return func() balancer.SubConn {
scst, _ := p.Pick(balancer.PickInfo{})
return scst.SubConn

View File

@ -29,6 +29,8 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/xds/internal/balancer/lrs"
xdsclient "google.golang.org/grpc/xds/internal/client"
@ -96,6 +98,8 @@ type edsBalancerImplInterface interface {
Close()
}
var _ balancer.V2Balancer = (*edsBalancer)(nil) // Assert that we implement V2Balancer
// edsBalancer manages xdsClient and the actual EDS balancer implementation that
// does load balancing.
//
@ -192,6 +196,14 @@ type subConnStateUpdate struct {
state balancer.SubConnState
}
func (x *edsBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
grpclog.Error("UpdateSubConnState should be called instead of HandleSubConnStateChange")
}
func (x *edsBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
grpclog.Error("UpdateResolverState should be called instead of HandleResolvedAddrs")
}
func (x *edsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
update := &subConnStateUpdate{
sc: sc,

View File

@ -392,6 +392,9 @@ type edsBalancerWrapperCC struct {
func (ebwcc *edsBalancerWrapperCC) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
return ebwcc.parent.newSubConn(ebwcc.priority, addrs, opts)
}
func (ebwcc *edsBalancerWrapperCC) UpdateBalancerState(state connectivity.State, picker balancer.Picker) {
grpclog.Fatalln("not implemented")
}
func (ebwcc *edsBalancerWrapperCC) UpdateState(state balancer.State) {
ebwcc.parent.updateState(ebwcc.priority, state)
}
@ -418,11 +421,11 @@ func (edsImpl *edsBalancerImpl) Close() {
type dropPicker struct {
drops []*dropper
p balancer.Picker
p balancer.V2Picker
loadStore lrs.Store
}
func newDropPicker(p balancer.Picker, drops []*dropper, loadStore lrs.Store) *dropPicker {
func newDropPicker(p balancer.V2Picker, drops []*dropper, loadStore lrs.Store) *dropPicker {
return &dropPicker{
drops: drops,
p: p,
@ -446,7 +449,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
if d.loadStore != nil {
d.loadStore.CallDropped(category)
}
return balancer.PickResult{}, balancer.DropRPCError(status.Errorf(codes.Unavailable, "RPC is dropped"))
return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped")
}
// TODO: (eds) don't drop unless the inner picker is READY. Similar to
// https://github.com/grpc/grpc-go/issues/2622.

View File

@ -45,7 +45,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityChange() {
// Everything was removed by EDS.
if !edsImpl.priorityLowest.isSet() {
edsImpl.priorityInUse = newPriorityTypeUnset()
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPicker(balancer.ErrTransientFailure)})
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: base.NewErrPickerV2(balancer.ErrTransientFailure)})
return
}
@ -69,7 +69,7 @@ func (edsImpl *edsBalancerImpl) handlePriorityChange() {
// We don't have an old state to send to parent, but we also don't
// want parent to keep using picker from old_priorityInUse. Send an
// update to trigger block picks until a new picker is ready.
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable)})
edsImpl.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: base.NewErrPickerV2(balancer.ErrNoSubConnAvailable)})
}
return
}

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal"
xdsclient "google.golang.org/grpc/xds/internal/client"
)
@ -384,20 +385,15 @@ type testConstBalancer struct {
cc balancer.ClientConn
}
func (tb *testConstBalancer) ResolverError(error) {
panic("not implemented")
}
func (tb *testConstBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
func (tb *testConstBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
tb.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &testConstPicker{err: errTestConstPicker}})
}
func (tb *testConstBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
a := s.ResolverState.Addresses
if len(a) > 0 {
tb.cc.NewSubConn(a, balancer.NewSubConnOptions{})
func (tb *testConstBalancer) HandleResolvedAddrs(a []resolver.Address, err error) {
if len(a) == 0 {
return
}
return nil
tb.cc.NewSubConn(a, balancer.NewSubConnOptions{})
}
func (*testConstBalancer) Close() {

View File

@ -274,15 +274,11 @@ type fakeBalancer struct {
cc balancer.ClientConn
}
func (b *fakeBalancer) ResolverError(error) {
func (b *fakeBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("implement me")
}
func (b *fakeBalancer) UpdateClientConnState(balancer.ClientConnState) error {
panic("implement me")
}
func (b *fakeBalancer) UpdateSubConnState(balancer.SubConn, balancer.SubConnState) {
func (b *fakeBalancer) HandleSubConnStateChange(sc balancer.SubConn, state connectivity.State) {
panic("implement me")
}

View File

@ -65,7 +65,7 @@ type testClientConn struct {
newSubConnCh chan balancer.SubConn // The last 10 subconn created.
removeSubConnCh chan balancer.SubConn // The last 10 subconn removed.
newPickerCh chan balancer.Picker // The last picker updated.
newPickerCh chan balancer.V2Picker // The last picker updated.
newStateCh chan connectivity.State // The last state.
subConnIdx int
@ -79,7 +79,7 @@ func newTestClientConn(t *testing.T) *testClientConn {
newSubConnCh: make(chan balancer.SubConn, 10),
removeSubConnCh: make(chan balancer.SubConn, 10),
newPickerCh: make(chan balancer.Picker, 1),
newPickerCh: make(chan balancer.V2Picker, 1),
newStateCh: make(chan connectivity.State, 1),
}
}
@ -110,6 +110,10 @@ func (tcc *testClientConn) RemoveSubConn(sc balancer.SubConn) {
}
}
func (tcc *testClientConn) UpdateBalancerState(s connectivity.State, p balancer.Picker) {
tcc.t.Fatal("not implemented")
}
func (tcc *testClientConn) UpdateState(bs balancer.State) {
tcc.t.Logf("testClientConn: UpdateState(%v)", bs)
select {