xds: de-experimentalize xDS apis required for psm security (#4753)

This commit is contained in:
Easwar Swaminathan
2021-09-15 14:05:59 -07:00
committed by GitHub
parent c84a5de064
commit 4c5f7fb0ee
8 changed files with 83 additions and 92 deletions

View File

@ -18,7 +18,6 @@
// Package connectivity defines connectivity semantics.
// For details, see https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md.
// All APIs in this package are experimental.
package connectivity
import (
@ -45,7 +44,7 @@ func (s State) String() string {
return "SHUTDOWN"
default:
logger.Errorf("unknown connectivity state: %d", s)
return "Invalid-State"
return "INVALID_STATE"
}
}
@ -61,3 +60,35 @@ const (
// Shutdown indicates the ClientConn has started shutting down.
Shutdown
)
// ServingMode indicates the current mode of operation of the server.
//
// Only xDS enabled gRPC servers currently report their serving mode.
type ServingMode int
const (
// ServingModeStarting indicates that the server is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates that the server contains all required
// configuration and is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required configuration to serve RPCs.
ServingModeNotServing
)
func (s ServingMode) String() string {
switch s {
case ServingModeStarting:
return "STARTING"
case ServingModeServing:
return "SERVING"
case ServingModeNotServing:
return "NOT_SERVING"
default:
logger.Errorf("unknown serving mode: %d", s)
return "INVALID_MODE"
}
}

View File

@ -18,11 +18,6 @@
// Package xds provides a transport credentials implementation where the
// security configuration is pushed by a management server using xDS APIs.
//
// Experimental
//
// Notice: All APIs in this package are EXPERIMENTAL and may be removed in a
// later release.
package xds
import (

View File

@ -30,6 +30,7 @@ import (
"unsafe"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
@ -51,41 +52,11 @@ var (
backoffFunc = bs.Backoff
)
// ServingMode indicates the current mode of operation of the server.
//
// This API exactly mirrors the one in the public xds package. We have to
// redefine it here to avoid a cyclic dependency.
type ServingMode int
const (
// ServingModeStarting indicates that the serving is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing
)
func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
}
// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
type ServingModeCallback func(addr net.Addr, mode connectivity.ServingMode, err error)
// DrainCallback is the callback that an xDS-enabled server registers to get
// notified about updates to the Listener configuration. The server is expected
@ -208,7 +179,7 @@ type listenerWrapper struct {
// get a Listener resource update).
mu sync.RWMutex
// Current serving mode.
mode ServingMode
mode connectivity.ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager
@ -267,7 +238,7 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
}
l.mu.RLock()
if l.mode == ServingModeNotServing {
if l.mode == connectivity.ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
@ -390,7 +361,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
@ -398,7 +369,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
}
atomic.StorePointer(&l.rdsUpdates, unsafe.Pointer(&update.updates))
l.switchMode(l.filterChains, ServingModeServing, nil)
l.switchMode(l.filterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
@ -406,7 +377,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
l.switchMode(nil, connectivity.ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
@ -428,7 +399,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// what we have decided to do. See gRPC A36 for more details.
ilc := update.update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}
@ -447,12 +418,12 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
// from the management server, this listener has all the configuration
// needed, and is ready to serve.
if len(ilc.FilterChains.RouteConfigNames) == 0 {
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.switchMode(ilc.FilterChains, connectivity.ServingModeServing, nil)
l.goodUpdate.Fire()
}
}
func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode connectivity.ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

View File

@ -30,6 +30,7 @@ import (
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
testpb "google.golang.org/grpc/test/grpc_testing"
@ -64,8 +65,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
}
// Create a couple of channels on which mode updates will be pushed.
updateCh1 := make(chan xds.ServingMode, 1)
updateCh2 := make(chan xds.ServingMode, 1)
updateCh1 := make(chan connectivity.ServingMode, 1)
updateCh2 := make(chan connectivity.ServingMode, 1)
// Create a server option to get notified about serving mode changes, and
// push the updated mode on the channels created above.
@ -124,16 +125,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
@ -169,16 +170,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}
@ -203,8 +204,8 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeNotServing)
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}
@ -233,16 +234,16 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != xds.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, xds.ServingModeServing)
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

View File

@ -28,6 +28,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
@ -231,7 +232,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
ListenerResourceName: name,
XDSCredsInUse: s.xdsCredsInUse,
XDSClient: s.xdsC,
ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) {
ModeCallback: func(addr net.Addr, mode connectivity.ServingMode, err error) {
modeUpdateCh.Put(&modeChangeArgs{
addr: addr,
mode: mode,
@ -261,7 +262,7 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
// modeChangeArgs wraps argument required for invoking mode change callback.
type modeChangeArgs struct {
addr net.Addr
mode server.ServingMode
mode connectivity.ServingMode
err error
}
@ -278,7 +279,7 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
case u := <-updateCh.Get():
updateCh.Load()
args := u.(*modeChangeArgs)
if args.mode == ServingModeNotServing {
if args.mode == connectivity.ServingModeNotServing {
// We type assert our underlying gRPC server to the real
// grpc.Server here before trying to initiate the drain
// operation. This approach avoids performing the same type

View File

@ -22,7 +22,7 @@ import (
"net"
"google.golang.org/grpc"
iserver "google.golang.org/grpc/xds/internal/server"
"google.golang.org/grpc/connectivity"
)
type serverOptions struct {
@ -41,20 +41,6 @@ func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption {
return &serverOption{apply: func(o *serverOptions) { o.modeCallback = cb }}
}
// ServingMode indicates the current mode of operation of the server.
type ServingMode = iserver.ServingMode
const (
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing = iserver.ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing = iserver.ServingModeNotServing
)
// ServingModeCallbackFunc is the callback that users can register to get
// notified about the server's serving mode changes. The callback is invoked
// with the address of the listener and its new mode.
@ -66,7 +52,7 @@ type ServingModeCallbackFunc func(addr net.Addr, args ServingModeChangeArgs)
// function.
type ServingModeChangeArgs struct {
// Mode is the new serving mode of the server listener.
Mode ServingMode
Mode connectivity.ServingMode
// Err is set to a non-nil error if the server has transitioned into
// not-serving mode.
Err error
@ -80,6 +66,11 @@ type ServingModeChangeArgs struct {
//
// This function should ONLY be used for testing and may not work with some
// other features, including the CSDS service.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func BootstrapContentsForTesting(contents []byte) grpc.ServerOption {
return &serverOption{apply: func(o *serverOptions) { o.bootstrapContents = contents }}
}

View File

@ -35,6 +35,7 @@ import (
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/tls/certprovider"
"google.golang.org/grpc/credentials/xds"
@ -435,8 +436,8 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing)
if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeNotServing)
}
// Push a good LDS response, and wait for Serve() to be invoked on the
@ -463,8 +464,8 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeServing)
if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeServing {
t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeServing)
}
// Push an update to the registered listener watch callback with a Listener
@ -489,8 +490,8 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing)
if mode := v.(connectivity.ServingMode); mode != connectivity.ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, connectivity.ServingModeNotServing)
}
}

View File

@ -25,11 +25,6 @@
//
// See https://github.com/grpc/grpc-go/tree/master/examples/features/xds for
// example.
//
// Experimental
//
// Notice: All APIs in this package are experimental and may be removed in a
// later release.
package xds
import (
@ -87,6 +82,11 @@ func init() {
//
// This function should ONLY be used for testing and may not work with some
// other features, including the CSDS service.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func NewXDSResolverWithConfigForTesting(bootstrapConfig []byte) (resolver.Builder, error) {
return xdsresolver.NewBuilder(bootstrapConfig)
}