balancer: set RPC metadata in address attributes, instead of Metadata field (#4041)
This metadata will be sent with all RPCs on the created SubConn
This commit is contained in:
@ -39,6 +39,7 @@ import (
|
||||
"google.golang.org/grpc/internal/balancerload"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/grpcutil"
|
||||
imetadata "google.golang.org/grpc/internal/metadata"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/resolver"
|
||||
@ -543,6 +544,76 @@ func (s) TestAddressAttributesInNewSubConn(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestMetadataInAddressAttributes verifies that the metadata added to
|
||||
// address.Attributes will be sent with the RPCs.
|
||||
func (s) TestMetadataInAddressAttributes(t *testing.T) {
|
||||
const (
|
||||
testMDKey = "test-md"
|
||||
testMDValue = "test-md-value"
|
||||
mdBalancerName = "metadata-balancer"
|
||||
)
|
||||
|
||||
// Register a stub balancer which adds metadata to the first address that it
|
||||
// receives and then calls NewSubConn on it.
|
||||
bf := stub.BalancerFuncs{
|
||||
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
|
||||
addrs := ccs.ResolverState.Addresses
|
||||
if len(addrs) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Only use the first address.
|
||||
sc, err := bd.ClientConn.NewSubConn([]resolver.Address{
|
||||
imetadata.Set(addrs[0], metadata.Pairs(testMDKey, testMDValue)),
|
||||
}, balancer.NewSubConnOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sc.Connect()
|
||||
return nil
|
||||
},
|
||||
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
|
||||
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: state.ConnectivityState, Picker: &aiPicker{result: balancer.PickResult{SubConn: sc}, err: state.ConnectionError}})
|
||||
},
|
||||
}
|
||||
stub.Register(mdBalancerName, bf)
|
||||
t.Logf("Registered balancer %s...", mdBalancerName)
|
||||
|
||||
testMDChan := make(chan []string, 1)
|
||||
ss := &stubServer{
|
||||
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
|
||||
md, ok := metadata.FromIncomingContext(ctx)
|
||||
if ok {
|
||||
select {
|
||||
case testMDChan <- md[testMDKey]:
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
return &testpb.Empty{}, nil
|
||||
},
|
||||
}
|
||||
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
|
||||
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, mdBalancerName),
|
||||
)); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
// The RPC should succeed with the expected md.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
||||
t.Fatalf("EmptyCall() = _, %v, want _, <nil>", err)
|
||||
}
|
||||
t.Log("Made an RPC which succeeded...")
|
||||
|
||||
// The server should receive the test metadata.
|
||||
md1 := <-testMDChan
|
||||
if len(md1) == 0 || md1[0] != testMDValue {
|
||||
t.Fatalf("got md: %v, want %v", md1, []string{testMDValue})
|
||||
}
|
||||
}
|
||||
|
||||
// TestServersSwap creates two servers and verifies the client switches between
|
||||
// them when the name resolver reports the first and then the second.
|
||||
func (s) TestServersSwap(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user