balancer: fix connectivity state aggregation algorithm to follow the spec (#5473)
This commit is contained in:

committed by
GitHub

parent
0d04c6f5d4
commit
03fee09e68
@ -371,56 +371,3 @@ type ClientConnState struct {
|
||||
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
|
||||
// problem with the provided name resolver data.
|
||||
var ErrBadResolverState = errors.New("bad resolver state")
|
||||
|
||||
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
|
||||
// and returns one aggregated connectivity state.
|
||||
//
|
||||
// It's not thread safe.
|
||||
type ConnectivityStateEvaluator struct {
|
||||
numReady uint64 // Number of addrConns in ready state.
|
||||
numConnecting uint64 // Number of addrConns in connecting state.
|
||||
numTransientFailure uint64 // Number of addrConns in transient failure state.
|
||||
numIdle uint64 // Number of addrConns in idle state.
|
||||
}
|
||||
|
||||
// RecordTransition records state change happening in subConn and based on that
|
||||
// it evaluates what aggregated state should be.
|
||||
//
|
||||
// - If at least one SubConn in Ready, the aggregated state is Ready;
|
||||
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
|
||||
// - Else if at least one SubConn is TransientFailure, the aggregated state is Transient Failure;
|
||||
// - Else if at least one SubConn is Idle, the aggregated state is Idle;
|
||||
// - Else there are no subconns and the aggregated state is Transient Failure
|
||||
//
|
||||
// Shutdown is not considered.
|
||||
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
|
||||
// Update counters.
|
||||
for idx, state := range []connectivity.State{oldState, newState} {
|
||||
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
|
||||
switch state {
|
||||
case connectivity.Ready:
|
||||
cse.numReady += updateVal
|
||||
case connectivity.Connecting:
|
||||
cse.numConnecting += updateVal
|
||||
case connectivity.TransientFailure:
|
||||
cse.numTransientFailure += updateVal
|
||||
case connectivity.Idle:
|
||||
cse.numIdle += updateVal
|
||||
}
|
||||
}
|
||||
|
||||
// Evaluate.
|
||||
if cse.numReady > 0 {
|
||||
return connectivity.Ready
|
||||
}
|
||||
if cse.numConnecting > 0 {
|
||||
return connectivity.Connecting
|
||||
}
|
||||
if cse.numTransientFailure > 0 {
|
||||
return connectivity.TransientFailure
|
||||
}
|
||||
if cse.numIdle > 0 {
|
||||
return connectivity.Idle
|
||||
}
|
||||
return connectivity.TransientFailure
|
||||
}
|
||||
|
70
balancer/conn_state_evaluator.go
Normal file
70
balancer/conn_state_evaluator.go
Normal file
@ -0,0 +1,70 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
|
||||
import "google.golang.org/grpc/connectivity"
|
||||
|
||||
// ConnectivityStateEvaluator takes the connectivity states of multiple SubConns
|
||||
// and returns one aggregated connectivity state.
|
||||
//
|
||||
// It's not thread safe.
|
||||
type ConnectivityStateEvaluator struct {
|
||||
numReady uint64 // Number of addrConns in ready state.
|
||||
numConnecting uint64 // Number of addrConns in connecting state.
|
||||
numTransientFailure uint64 // Number of addrConns in transient failure state.
|
||||
numIdle uint64 // Number of addrConns in idle state.
|
||||
}
|
||||
|
||||
// RecordTransition records state change happening in subConn and based on that
|
||||
// it evaluates what aggregated state should be.
|
||||
//
|
||||
// - If at least one SubConn in Ready, the aggregated state is Ready;
|
||||
// - Else if at least one SubConn in Connecting, the aggregated state is Connecting;
|
||||
// - Else if at least one SubConn is Idle, the aggregated state is Idle;
|
||||
// - Else if at least one SubConn is TransientFailure (or there are no SubConns), the aggregated state is Transient Failure.
|
||||
//
|
||||
// Shutdown is not considered.
|
||||
func (cse *ConnectivityStateEvaluator) RecordTransition(oldState, newState connectivity.State) connectivity.State {
|
||||
// Update counters.
|
||||
for idx, state := range []connectivity.State{oldState, newState} {
|
||||
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
|
||||
switch state {
|
||||
case connectivity.Ready:
|
||||
cse.numReady += updateVal
|
||||
case connectivity.Connecting:
|
||||
cse.numConnecting += updateVal
|
||||
case connectivity.TransientFailure:
|
||||
cse.numTransientFailure += updateVal
|
||||
case connectivity.Idle:
|
||||
cse.numIdle += updateVal
|
||||
}
|
||||
}
|
||||
|
||||
// Evaluate.
|
||||
if cse.numReady > 0 {
|
||||
return connectivity.Ready
|
||||
}
|
||||
if cse.numConnecting > 0 {
|
||||
return connectivity.Connecting
|
||||
}
|
||||
if cse.numIdle > 0 {
|
||||
return connectivity.Idle
|
||||
}
|
||||
return connectivity.TransientFailure
|
||||
}
|
245
balancer/conn_state_evaluator_test.go
Normal file
245
balancer/conn_state_evaluator_test.go
Normal file
@ -0,0 +1,245 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2022 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package balancer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/internal/grpctest"
|
||||
)
|
||||
|
||||
type s struct {
|
||||
grpctest.Tester
|
||||
}
|
||||
|
||||
func Test(t *testing.T) {
|
||||
grpctest.RunSubTests(t, s{})
|
||||
}
|
||||
|
||||
// TestRecordTransition_FirstStateChange tests the first call to
|
||||
// RecordTransition where the `oldState` is usually set to `Shutdown` (a state
|
||||
// that the ConnectivityStateEvaluator is set to ignore).
|
||||
func (s) TestRecordTransition_FirstStateChange(t *testing.T) {
|
||||
tests := []struct {
|
||||
newState connectivity.State
|
||||
wantState connectivity.State
|
||||
}{
|
||||
{
|
||||
newState: connectivity.Idle,
|
||||
wantState: connectivity.Idle,
|
||||
},
|
||||
{
|
||||
newState: connectivity.Connecting,
|
||||
wantState: connectivity.Connecting,
|
||||
},
|
||||
{
|
||||
newState: connectivity.Ready,
|
||||
wantState: connectivity.Ready,
|
||||
},
|
||||
{
|
||||
newState: connectivity.TransientFailure,
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
{
|
||||
newState: connectivity.Shutdown,
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
cse := &ConnectivityStateEvaluator{}
|
||||
if gotState := cse.RecordTransition(connectivity.Shutdown, test.newState); gotState != test.wantState {
|
||||
t.Fatalf("RecordTransition(%v, %v) = %v, want %v", connectivity.Shutdown, test.newState, gotState, test.wantState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRecordTransition_SameState tests the scenario where state transitions to
|
||||
// the same state are recorded multiple times.
|
||||
func (s) TestRecordTransition_SameState(t *testing.T) {
|
||||
tests := []struct {
|
||||
newState connectivity.State
|
||||
wantState connectivity.State
|
||||
}{
|
||||
{
|
||||
newState: connectivity.Idle,
|
||||
wantState: connectivity.Idle,
|
||||
},
|
||||
{
|
||||
newState: connectivity.Connecting,
|
||||
wantState: connectivity.Connecting,
|
||||
},
|
||||
{
|
||||
newState: connectivity.Ready,
|
||||
wantState: connectivity.Ready,
|
||||
},
|
||||
{
|
||||
newState: connectivity.TransientFailure,
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
{
|
||||
newState: connectivity.Shutdown,
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
}
|
||||
const numStateChanges = 5
|
||||
for _, test := range tests {
|
||||
cse := &ConnectivityStateEvaluator{}
|
||||
var prevState, gotState connectivity.State
|
||||
prevState = connectivity.Shutdown
|
||||
for i := 0; i < numStateChanges; i++ {
|
||||
gotState = cse.RecordTransition(prevState, test.newState)
|
||||
prevState = test.newState
|
||||
}
|
||||
if gotState != test.wantState {
|
||||
t.Fatalf("RecordTransition() = %v, want %v", gotState, test.wantState)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRecordTransition_SingleSubConn_DifferentStates tests some common
|
||||
// connectivity state change scenarios, on a single subConn.
|
||||
func (s) TestRecordTransition_SingleSubConn_DifferentStates(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
states []connectivity.State
|
||||
wantState connectivity.State
|
||||
}{
|
||||
{
|
||||
name: "regular transition to ready",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready},
|
||||
wantState: connectivity.Ready,
|
||||
},
|
||||
{
|
||||
name: "regular transition to transient failure",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure},
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
{
|
||||
name: "regular transition to ready",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.Idle},
|
||||
wantState: connectivity.Idle,
|
||||
},
|
||||
{
|
||||
name: "transition from ready to transient failure",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure},
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
{
|
||||
name: "transition from transient failure back to ready",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure, connectivity.Ready},
|
||||
wantState: connectivity.Ready,
|
||||
},
|
||||
{
|
||||
// This state transition is usually suppressed at the LB policy level, by
|
||||
// not calling RecordTransition.
|
||||
name: "transition from transient failure back to idle",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure, connectivity.Idle},
|
||||
wantState: connectivity.Idle,
|
||||
},
|
||||
{
|
||||
// This state transition is usually suppressed at the LB policy level, by
|
||||
// not calling RecordTransition.
|
||||
name: "transition from transient failure back to connecting",
|
||||
states: []connectivity.State{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure, connectivity.Connecting},
|
||||
wantState: connectivity.Connecting,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
cse := &ConnectivityStateEvaluator{}
|
||||
var prevState, gotState connectivity.State
|
||||
prevState = connectivity.Shutdown
|
||||
for _, newState := range test.states {
|
||||
gotState = cse.RecordTransition(prevState, newState)
|
||||
prevState = newState
|
||||
}
|
||||
if gotState != test.wantState {
|
||||
t.Fatalf("RecordTransition() = %v, want %v", gotState, test.wantState)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestRecordTransition_MultipleSubConns_DifferentStates tests state transitions
|
||||
// among multiple subConns, and verifies that the connectivity state aggregation
|
||||
// algorithm produces the expected aggregate connectivity state.
|
||||
func (s) TestRecordTransition_MultipleSubConns_DifferentStates(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
// Each entry in this slice corresponds to the state changes happening on an
|
||||
// individual subConn.
|
||||
subConnStates [][]connectivity.State
|
||||
wantState connectivity.State
|
||||
}{
|
||||
{
|
||||
name: "atleast one ready",
|
||||
subConnStates: [][]connectivity.State{
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.Ready},
|
||||
{connectivity.Idle},
|
||||
{connectivity.Idle, connectivity.Connecting},
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure},
|
||||
},
|
||||
wantState: connectivity.Ready,
|
||||
},
|
||||
{
|
||||
name: "atleast one connecting",
|
||||
subConnStates: [][]connectivity.State{
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.Connecting},
|
||||
{connectivity.Idle},
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure},
|
||||
},
|
||||
wantState: connectivity.Connecting,
|
||||
},
|
||||
{
|
||||
name: "atleast one idle",
|
||||
subConnStates: [][]connectivity.State{
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.Idle},
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.TransientFailure},
|
||||
},
|
||||
wantState: connectivity.Idle,
|
||||
},
|
||||
{
|
||||
name: "atleast one transient failure",
|
||||
subConnStates: [][]connectivity.State{
|
||||
{connectivity.Idle, connectivity.Connecting, connectivity.Ready, connectivity.TransientFailure},
|
||||
{connectivity.TransientFailure},
|
||||
},
|
||||
wantState: connectivity.TransientFailure,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
cse := &ConnectivityStateEvaluator{}
|
||||
var prevState, gotState connectivity.State
|
||||
for _, scStates := range test.subConnStates {
|
||||
prevState = connectivity.Shutdown
|
||||
for _, newState := range scStates {
|
||||
gotState = cse.RecordTransition(prevState, newState)
|
||||
prevState = newState
|
||||
}
|
||||
}
|
||||
if gotState != test.wantState {
|
||||
t.Fatalf("RecordTransition() = %v, want %v", gotState, test.wantState)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user