balancer/base: consider an empty address list an error (#3361)
This commit is contained in:
@ -21,6 +21,7 @@ package base
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
@ -76,6 +77,9 @@ type baseBalancer struct {
|
|||||||
picker balancer.Picker
|
picker balancer.Picker
|
||||||
v2Picker balancer.V2Picker
|
v2Picker balancer.V2Picker
|
||||||
config Config
|
config Config
|
||||||
|
|
||||||
|
resolverErr error // the last error reported by the resolver; cleared on successful resolution
|
||||||
|
connErr error // the last connection error; cleared upon leaving TransientFailure
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
|
||||||
@ -83,13 +87,23 @@ func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (b *baseBalancer) ResolverError(err error) {
|
func (b *baseBalancer) ResolverError(err error) {
|
||||||
switch b.state {
|
b.resolverErr = err
|
||||||
case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
|
if len(b.subConns) == 0 {
|
||||||
if b.picker != nil {
|
b.state = connectivity.TransientFailure
|
||||||
b.picker = NewErrPicker(err)
|
|
||||||
} else {
|
|
||||||
b.v2Picker = NewErrPickerV2(err)
|
|
||||||
}
|
}
|
||||||
|
if b.state != connectivity.TransientFailure {
|
||||||
|
// The picker will not change since the balancer does not currently
|
||||||
|
// report an error.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.regeneratePicker()
|
||||||
|
if b.picker != nil {
|
||||||
|
b.cc.UpdateBalancerState(b.state, b.picker)
|
||||||
|
} else {
|
||||||
|
b.cc.UpdateState(balancer.State{
|
||||||
|
ConnectivityState: b.state,
|
||||||
|
Picker: b.v2Picker,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,6 +113,12 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||||||
if grpclog.V(2) {
|
if grpclog.V(2) {
|
||||||
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
|
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
|
||||||
}
|
}
|
||||||
|
if len(s.ResolverState.Addresses) == 0 {
|
||||||
|
b.ResolverError(errors.New("produced zero addresses"))
|
||||||
|
return balancer.ErrBadResolverState
|
||||||
|
}
|
||||||
|
// Successful resolution; clear resolver error and ensure we return nil.
|
||||||
|
b.resolverErr = nil
|
||||||
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
|
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
|
||||||
addrsSet := make(map[resolver.Address]struct{})
|
addrsSet := make(map[resolver.Address]struct{})
|
||||||
for _, a := range s.ResolverState.Addresses {
|
for _, a := range s.ResolverState.Addresses {
|
||||||
@ -127,24 +147,30 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mergeErrors builds an error from the last connection error and the last
|
||||||
|
// resolver error. Must only be called if b.state is TransientFailure.
|
||||||
|
func (b *baseBalancer) mergeErrors() error {
|
||||||
|
// connErr must always be non-nil unless there are no SubConns, in which
|
||||||
|
// case resolverErr must be non-nil.
|
||||||
|
if b.connErr == nil {
|
||||||
|
return fmt.Errorf("last resolver error: %v", b.resolverErr)
|
||||||
|
}
|
||||||
|
if b.resolverErr == nil {
|
||||||
|
return fmt.Errorf("last connection error: %v", b.connErr)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
|
||||||
|
}
|
||||||
|
|
||||||
// regeneratePicker takes a snapshot of the balancer, and generates a picker
|
// regeneratePicker takes a snapshot of the balancer, and generates a picker
|
||||||
// from it. The picker is
|
// from it. The picker is
|
||||||
// - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
|
// - errPicker if the balancer is in TransientFailure,
|
||||||
// - built by the pickerBuilder with all READY SubConns otherwise.
|
// - built by the pickerBuilder with all READY SubConns otherwise.
|
||||||
func (b *baseBalancer) regeneratePicker(err error) {
|
func (b *baseBalancer) regeneratePicker() {
|
||||||
if b.state == connectivity.TransientFailure {
|
if b.state == connectivity.TransientFailure {
|
||||||
if b.pickerBuilder != nil {
|
if b.pickerBuilder != nil {
|
||||||
b.picker = NewErrPicker(balancer.ErrTransientFailure)
|
b.picker = NewErrPicker(balancer.ErrTransientFailure)
|
||||||
} else {
|
} else {
|
||||||
if err != nil {
|
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(b.mergeErrors()))
|
||||||
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
|
|
||||||
} else {
|
|
||||||
// This means the last subchannel transition was not to
|
|
||||||
// TransientFailure (otherwise err must be set), but the
|
|
||||||
// aggregate state of the balancer is TransientFailure, meaning
|
|
||||||
// there are no other addresses.
|
|
||||||
b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -200,6 +226,9 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
|
|||||||
oldAggrState := b.state
|
oldAggrState := b.state
|
||||||
b.state = b.csEvltr.RecordTransition(oldS, s)
|
b.state = b.csEvltr.RecordTransition(oldS, s)
|
||||||
|
|
||||||
|
// Set or clear the last connection error accordingly.
|
||||||
|
b.connErr = state.ConnectionError
|
||||||
|
|
||||||
// Regenerate picker when one of the following happens:
|
// Regenerate picker when one of the following happens:
|
||||||
// - this sc became ready from not-ready
|
// - this sc became ready from not-ready
|
||||||
// - this sc became not-ready from ready
|
// - this sc became not-ready from ready
|
||||||
@ -207,7 +236,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
|
|||||||
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
|
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
|
||||||
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
|
if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
|
||||||
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
|
(b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
|
||||||
b.regeneratePicker(state.ConnectionError)
|
b.regeneratePicker()
|
||||||
}
|
}
|
||||||
|
|
||||||
if b.picker != nil {
|
if b.picker != nil {
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
@ -29,6 +30,7 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/balancer/roundrobin"
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
_ "google.golang.org/grpc/grpclog/glogger"
|
_ "google.golang.org/grpc/grpclog/glogger"
|
||||||
"google.golang.org/grpc/internal/leakcheck"
|
"google.golang.org/grpc/internal/leakcheck"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
@ -210,15 +212,31 @@ func TestAddressesRemoved(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
||||||
for i := 0; i < 1000; i++ {
|
// Removing addresses results in an error reported to the clientconn, but
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
|
// the existing connections remain. RPCs should still succeed.
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||||
return
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
|
||||||
}
|
}
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
|
// Stop the server to bring the channel state into transient failure.
|
||||||
|
test.cleanup()
|
||||||
|
// Wait for not-ready.
|
||||||
|
for src := cc.GetState(); src == connectivity.Ready; src = cc.GetState() {
|
||||||
|
if !cc.WaitForStateChange(ctx, src) {
|
||||||
|
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
|
||||||
}
|
}
|
||||||
t.Fatalf("No RPC failed after removing all addresses, want RPC to fail with DeadlineExceeded")
|
}
|
||||||
|
// Report an empty server list again; because the state is not ready, the
|
||||||
|
// empty address list error should surface to the user.
|
||||||
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
||||||
|
|
||||||
|
const msgWant = "produced zero addresses"
|
||||||
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || !strings.Contains(status.Convert(err).Message(), msgWant) {
|
||||||
|
t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant)
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCloseWithPendingRPC(t *testing.T) {
|
func TestCloseWithPendingRPC(t *testing.T) {
|
||||||
|
@ -20,9 +20,11 @@ package grpc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"google.golang.org/grpc/balancer"
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/balancer/roundrobin"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
"google.golang.org/grpc/resolver/manual"
|
"google.golang.org/grpc/resolver/manual"
|
||||||
@ -88,3 +90,32 @@ func (s) TestBalancerErrorResolverPolling(t *testing.T) {
|
|||||||
},
|
},
|
||||||
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
|
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestRoundRobinZeroAddressesResolverPolling reports no addresses to the round
|
||||||
|
// robin balancer and verifies ResolveNow is called on the resolver with the
|
||||||
|
// appropriate backoff strategy being consulted between ResolveNow calls.
|
||||||
|
func (s) TestRoundRobinZeroAddressesResolverPolling(t *testing.T) {
|
||||||
|
// We need to start a real server or else the connecting loop will call
|
||||||
|
// ResolveNow after every iteration, even after a valid resolver result is
|
||||||
|
// returned.
|
||||||
|
lis, err := net.Listen("tcp", "localhost:0")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error while listening. Err: %v", err)
|
||||||
|
}
|
||||||
|
defer lis.Close()
|
||||||
|
s := NewServer()
|
||||||
|
defer s.Stop()
|
||||||
|
go s.Serve(lis)
|
||||||
|
|
||||||
|
testResolverErrorPolling(t,
|
||||||
|
func(r *manual.Resolver) {
|
||||||
|
// No addresses so the balancer will fail.
|
||||||
|
r.CC.UpdateState(resolver.State{})
|
||||||
|
}, func(r *manual.Resolver) {
|
||||||
|
// UpdateState will block if ResolveNow is being called (which
|
||||||
|
// blocks on rn), so call it in a goroutine. Include a valid
|
||||||
|
// address so the balancer will be happy.
|
||||||
|
go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}})
|
||||||
|
},
|
||||||
|
WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name)))
|
||||||
|
}
|
||||||
|
@ -1515,15 +1515,29 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
// Wait for ready
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
|
||||||
|
if !te.cc.WaitForStateChange(ctx, src) {
|
||||||
|
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
|
||||||
|
// Wait for not-ready.
|
||||||
|
for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
|
||||||
|
if !te.cc.WaitForStateChange(ctx, src) {
|
||||||
|
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := verifyResultWithDelay(func() (bool, error) {
|
if err := verifyResultWithDelay(func() (bool, error) {
|
||||||
tcs, _ := channelz.GetTopChannels(0, 0)
|
tcs, _ := channelz.GetTopChannels(0, 0)
|
||||||
if len(tcs) != 1 {
|
if len(tcs) != 1 {
|
||||||
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
|
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
|
||||||
}
|
}
|
||||||
if len(tcs[0].SubChans) != 0 {
|
if len(tcs[0].SubChans) != 1 {
|
||||||
return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
|
return false, fmt.Errorf("there should be 1 subchannel not %d", len(tcs[0].SubChans))
|
||||||
}
|
}
|
||||||
scm := channelz.GetSubChannel(subConn)
|
scm := channelz.GetSubChannel(subConn)
|
||||||
if scm == nil {
|
if scm == nil {
|
||||||
@ -1770,7 +1784,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
|
|||||||
return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
|
return false, fmt.Errorf("transient failure has not happened on SubChannel yet")
|
||||||
}
|
}
|
||||||
transient = 0
|
transient = 0
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
|
||||||
for _, e := range scm.Trace.Events {
|
for _, e := range scm.Trace.Events {
|
||||||
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
|
if e.Desc == fmt.Sprintf("Subchannel Connectivity change to %v", connectivity.Ready) {
|
||||||
ready++
|
ready++
|
||||||
@ -1866,7 +1880,7 @@ func (s) TestCZTraceOverwriteChannelDeletion(t *testing.T) {
|
|||||||
czCleanup := channelz.NewChannelzStorage()
|
czCleanup := channelz.NewChannelzStorage()
|
||||||
defer czCleanupWrapper(czCleanup, t)
|
defer czCleanupWrapper(czCleanup, t)
|
||||||
e := tcpClearRREnv
|
e := tcpClearRREnv
|
||||||
// avoid calling API to set balancer type, which will void service config's change of balancer.
|
// avoid newTest using WithBalancer, which would override service config's change of balancer below.
|
||||||
e.balancer = ""
|
e.balancer = ""
|
||||||
te := newTest(t, e)
|
te := newTest(t, e)
|
||||||
channelz.SetMaxTraceEntry(1)
|
channelz.SetMaxTraceEntry(1)
|
||||||
@ -1956,19 +1970,20 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
// Wait for ready
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
if err := verifyResultWithDelay(func() (bool, error) {
|
defer cancel()
|
||||||
tcs, _ := channelz.GetTopChannels(0, 0)
|
for src := te.cc.GetState(); src != connectivity.Ready; src = te.cc.GetState() {
|
||||||
if len(tcs) != 1 {
|
if !te.cc.WaitForStateChange(ctx, src) {
|
||||||
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
|
t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.Ready)
|
||||||
}
|
}
|
||||||
if len(tcs[0].SubChans) != 0 {
|
|
||||||
return false, fmt.Errorf("there should be 0 subchannel not %d", len(tcs[0].SubChans))
|
|
||||||
}
|
}
|
||||||
return true, nil
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}})
|
||||||
}); err != nil {
|
// Wait for not-ready.
|
||||||
t.Fatal(err)
|
for src := te.cc.GetState(); src == connectivity.Ready; src = te.cc.GetState() {
|
||||||
|
if !te.cc.WaitForStateChange(ctx, src) {
|
||||||
|
t.Fatalf("timed out waiting for state change. got %v; want !%v", src, connectivity.Ready)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// verify that the subchannel no longer exist due to trace referencing it got overwritten.
|
// verify that the subchannel no longer exist due to trace referencing it got overwritten.
|
||||||
|
@ -507,7 +507,7 @@ func (s) TestHealthCheckWithAddrConnDrain(t *testing.T) {
|
|||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
// trigger teardown of the ac
|
// trigger teardown of the ac
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-hcExitChan:
|
case <-hcExitChan:
|
||||||
@ -653,7 +653,7 @@ func (s) TestHealthCheckWithoutSetConnectivityStateCalledAddrConnShutDown(t *tes
|
|||||||
t.Fatal("Health check function has not been invoked after 5s.")
|
t.Fatal("Health check function has not been invoked after 5s.")
|
||||||
}
|
}
|
||||||
// trigger teardown of the ac, ac in SHUTDOWN state
|
// trigger teardown of the ac, ac in SHUTDOWN state
|
||||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{}, ServiceConfig: sc})
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "fake address"}}, ServiceConfig: sc})
|
||||||
|
|
||||||
// The health check func should exit without calling the setConnectivityState func, as server hasn't sent
|
// The health check func should exit without calling the setConnectivityState func, as server hasn't sent
|
||||||
// any response.
|
// any response.
|
||||||
|
Reference in New Issue
Block a user