roundrobin: strip attributes from addresses (#4024)
This commit is contained in:
@ -20,6 +20,8 @@ package attributes_test
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
"google.golang.org/grpc/attributes"
|
"google.golang.org/grpc/attributes"
|
||||||
)
|
)
|
||||||
@ -46,3 +48,13 @@ func ExampleAttributes_WithValues() {
|
|||||||
// Key one: 1
|
// Key one: 1
|
||||||
// Key two: two
|
// Key two: two
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test that two attributes with the same content are `reflect.DeepEqual`.
|
||||||
|
func TestDeepEqual(t *testing.T) {
|
||||||
|
type keyOne struct{}
|
||||||
|
a1 := attributes.New(keyOne{}, 1)
|
||||||
|
a2 := attributes.New(keyOne{}, 1)
|
||||||
|
if !reflect.DeepEqual(a1, a2) {
|
||||||
|
t.Fatalf("reflect.DeepEqual(%+v, %+v), want true, got false", a1, a2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -64,7 +64,7 @@ type baseBalancer struct {
|
|||||||
csEvltr *balancer.ConnectivityStateEvaluator
|
csEvltr *balancer.ConnectivityStateEvaluator
|
||||||
state connectivity.State
|
state connectivity.State
|
||||||
|
|
||||||
subConns map[resolver.Address]balancer.SubConn
|
subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses)
|
||||||
scStates map[balancer.SubConn]connectivity.State
|
scStates map[balancer.SubConn]connectivity.State
|
||||||
picker balancer.Picker
|
picker balancer.Picker
|
||||||
config Config
|
config Config
|
||||||
@ -101,17 +101,41 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
|
|||||||
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
|
// addrsSet is the set converted from addrs, it's used for quick lookup of an address.
|
||||||
addrsSet := make(map[resolver.Address]struct{})
|
addrsSet := make(map[resolver.Address]struct{})
|
||||||
for _, a := range s.ResolverState.Addresses {
|
for _, a := range s.ResolverState.Addresses {
|
||||||
addrsSet[a] = struct{}{}
|
// Strip attributes from addresses before using them as map keys. So
|
||||||
if _, ok := b.subConns[a]; !ok {
|
// that when two addresses only differ in attributes pointers (but with
|
||||||
|
// the same attribute content), they are considered the same address.
|
||||||
|
//
|
||||||
|
// Note that this doesn't handle the case where the attribute content is
|
||||||
|
// different. So if users want to set different attributes to create
|
||||||
|
// duplicate connections to the same backend, it doesn't work. This is
|
||||||
|
// fine for now, because duplicate is done by setting Metadata today.
|
||||||
|
//
|
||||||
|
// TODO: read attributes to handle duplicate connections.
|
||||||
|
aNoAttrs := a
|
||||||
|
aNoAttrs.Attributes = nil
|
||||||
|
addrsSet[aNoAttrs] = struct{}{}
|
||||||
|
if sc, ok := b.subConns[aNoAttrs]; !ok {
|
||||||
// a is a new address (not existing in b.subConns).
|
// a is a new address (not existing in b.subConns).
|
||||||
|
//
|
||||||
|
// When creating SubConn, the original address with attributes is
|
||||||
|
// passed through. So that connection configurations in attributes
|
||||||
|
// (like creds) will be used.
|
||||||
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
|
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
|
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
b.subConns[a] = sc
|
b.subConns[aNoAttrs] = sc
|
||||||
b.scStates[sc] = connectivity.Idle
|
b.scStates[sc] = connectivity.Idle
|
||||||
sc.Connect()
|
sc.Connect()
|
||||||
|
} else {
|
||||||
|
// Always update the subconn's address in case the attributes
|
||||||
|
// changed.
|
||||||
|
//
|
||||||
|
// The SubConn does a reflect.DeepEqual of the new and old
|
||||||
|
// addresses. So this is a noop if the current address is the same
|
||||||
|
// as the old one (including attributes).
|
||||||
|
sc.UpdateAddresses([]resolver.Address{a})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for a, sc := range b.subConns {
|
for a, sc := range b.subConns {
|
||||||
|
70
balancer/base/balancer_test.go
Normal file
70
balancer/base/balancer_test.go
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2020 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package base
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"google.golang.org/grpc/attributes"
|
||||||
|
"google.golang.org/grpc/balancer"
|
||||||
|
"google.golang.org/grpc/resolver"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testClientConn struct {
|
||||||
|
balancer.ClientConn
|
||||||
|
newSubConn func([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||||
|
return c.newSubConn(addrs, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
type testSubConn struct{}
|
||||||
|
|
||||||
|
func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}
|
||||||
|
|
||||||
|
func (sc *testSubConn) Connect() {}
|
||||||
|
|
||||||
|
func TestBaseBalancerStripAttributes(t *testing.T) {
|
||||||
|
b := (&baseBuilder{}).Build(&testClientConn{
|
||||||
|
newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) {
|
||||||
|
for _, addr := range addrs {
|
||||||
|
if addr.Attributes == nil {
|
||||||
|
t.Errorf("in NewSubConn, got address %+v with nil attributes, want not nil", addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &testSubConn{}, nil
|
||||||
|
},
|
||||||
|
}, balancer.BuildOptions{}).(*baseBalancer)
|
||||||
|
|
||||||
|
b.UpdateClientConnState(balancer.ClientConnState{
|
||||||
|
ResolverState: resolver.State{
|
||||||
|
Addresses: []resolver.Address{
|
||||||
|
{Addr: "1.1.1.1", Attributes: &attributes.Attributes{}},
|
||||||
|
{Addr: "2.2.2.2", Attributes: &attributes.Attributes{}},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
for addr := range b.subConns {
|
||||||
|
if addr.Attributes != nil {
|
||||||
|
t.Errorf("in b.subConns, got address %+v with nil attributes, want not nil", addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -32,6 +32,8 @@ import (
|
|||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
"google.golang.org/grpc/internal/grpctest"
|
"google.golang.org/grpc/internal/grpctest"
|
||||||
|
imetadata "google.golang.org/grpc/internal/metadata"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
"google.golang.org/grpc/resolver/manual"
|
"google.golang.org/grpc/resolver/manual"
|
||||||
@ -39,6 +41,10 @@ import (
|
|||||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
testMDKey = "test-md"
|
||||||
|
)
|
||||||
|
|
||||||
type s struct {
|
type s struct {
|
||||||
grpctest.Tester
|
grpctest.Tester
|
||||||
}
|
}
|
||||||
@ -49,9 +55,23 @@ func Test(t *testing.T) {
|
|||||||
|
|
||||||
type testServer struct {
|
type testServer struct {
|
||||||
testpb.UnimplementedTestServiceServer
|
testpb.UnimplementedTestServiceServer
|
||||||
|
|
||||||
|
testMDChan chan []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTestServer() *testServer {
|
||||||
|
return &testServer{testMDChan: make(chan []string, 1)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||||
|
md, ok := metadata.FromIncomingContext(ctx)
|
||||||
|
if ok && len(md[testMDKey]) != 0 {
|
||||||
|
select {
|
||||||
|
case s.testMDChan <- md[testMDKey]:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
return &testpb.Empty{}, nil
|
return &testpb.Empty{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,8 +80,9 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ
|
|||||||
}
|
}
|
||||||
|
|
||||||
type test struct {
|
type test struct {
|
||||||
servers []*grpc.Server
|
servers []*grpc.Server
|
||||||
addresses []string
|
serverImpls []*testServer
|
||||||
|
addresses []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *test) cleanup() {
|
func (t *test) cleanup() {
|
||||||
@ -85,8 +106,10 @@ func startTestServers(count int) (_ *test, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s := grpc.NewServer()
|
s := grpc.NewServer()
|
||||||
testpb.RegisterTestServiceServer(s, &testServer{})
|
sImpl := newTestServer()
|
||||||
|
testpb.RegisterTestServiceServer(s, sImpl)
|
||||||
t.servers = append(t.servers, s)
|
t.servers = append(t.servers, s)
|
||||||
|
t.serverImpls = append(t.serverImpls, sImpl)
|
||||||
t.addresses = append(t.addresses, lis.Addr().String())
|
t.addresses = append(t.addresses, lis.Addr().String())
|
||||||
|
|
||||||
go func(s *grpc.Server, l net.Listener) {
|
go func(s *grpc.Server, l net.Listener) {
|
||||||
@ -473,3 +496,54 @@ func (s) TestAllServersDown(t *testing.T) {
|
|||||||
}
|
}
|
||||||
t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
|
t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s) TestUpdateAddressAttributes(t *testing.T) {
|
||||||
|
r := manual.NewBuilderWithScheme("whatever")
|
||||||
|
|
||||||
|
test, err := startTestServers(1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to start servers: %v", err)
|
||||||
|
}
|
||||||
|
defer test.cleanup()
|
||||||
|
|
||||||
|
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(roundrobin.Name))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to dial: %v", err)
|
||||||
|
}
|
||||||
|
defer cc.Close()
|
||||||
|
testc := testpb.NewTestServiceClient(cc)
|
||||||
|
// The first RPC should fail because there's no address.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded {
|
||||||
|
t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}})
|
||||||
|
// The second RPC should succeed.
|
||||||
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
|
||||||
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
|
||||||
|
}
|
||||||
|
// The second RPC should not set metadata, so there's no md in the channel.
|
||||||
|
select {
|
||||||
|
case md1 := <-test.serverImpls[0].testMDChan:
|
||||||
|
t.Fatalf("got md: %v, want empty metadata", md1)
|
||||||
|
case <-time.After(time.Microsecond * 100):
|
||||||
|
}
|
||||||
|
|
||||||
|
const testMDValue = "test-md-value"
|
||||||
|
// Update metadata in address.
|
||||||
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{
|
||||||
|
imetadata.Set(resolver.Address{Addr: test.addresses[0]}, metadata.Pairs(testMDKey, testMDValue)),
|
||||||
|
}})
|
||||||
|
// The third RPC should succeed.
|
||||||
|
if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil {
|
||||||
|
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The third RPC should send metadata with it.
|
||||||
|
md2 := <-test.serverImpls[0].testMDChan
|
||||||
|
if len(md2) == 0 || md2[0] != testMDValue {
|
||||||
|
t.Fatalf("got md: %v, want %v", md2, []string{testMDValue})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -56,8 +56,8 @@ type TestSubConn struct {
|
|||||||
id string
|
id string
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateAddresses panics.
|
// UpdateAddresses is a no-op.
|
||||||
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) { panic("not implemented") }
|
func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {}
|
||||||
|
|
||||||
// Connect is a no-op.
|
// Connect is a no-op.
|
||||||
func (tsc *TestSubConn) Connect() {}
|
func (tsc *TestSubConn) Connect() {}
|
||||||
|
Reference in New Issue
Block a user