xdsclient: resource type agnostic WatchResource() API (#5777)

This commit is contained in:
Easwar Swaminathan
2022-11-21 12:42:50 -08:00
committed by GitHub
parent 3011eaf70e
commit 0abb6f9b69
9 changed files with 875 additions and 14 deletions

View File

@ -36,6 +36,20 @@ type XDSClient interface {
WatchEndpoints(string, func(xdsresource.EndpointsUpdate, error)) func()
ReportLoad(*bootstrap.ServerConfig) (*load.Store, func())
// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
// xDS requests are sent out and how responses are deserialized and
// validated. Upon receipt of a response from the management server, an
// appropriate callback on the watcher is invoked.
//
// Most callers will not have a need to use this API directly. They will
// instead use a resource-type-specific wrapper API provided by the relevant
// resource type implementation.
//
// TODO: Once this generic client API is fully implemented and integrated,
// delete the resource type specific watch APIs on this interface.
WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func())
DumpLDS() map[string]xdsresource.UpdateWithMD
DumpRDS() map[string]xdsresource.UpdateWithMD
DumpCDS() map[string]xdsresource.UpdateWithMD

View File

@ -20,6 +20,7 @@ package xdsclient
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sync"
@ -55,10 +56,14 @@ func NewWithConfig(config *bootstrap.Config) (XDSClient, error) {
// newWithConfig returns a new xdsClient with the given config.
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration, idleAuthorityDeleteTimeout time.Duration) (*clientImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
c := &clientImpl{
done: grpcsync.NewEvent(),
config: config,
watchExpiryTimeout: watchExpiryTimeout,
serializer: newCallbackSerializer(ctx),
serializerClose: cancel,
resourceTypes: newResourceTypeRegistry(),
authorities: make(map[string]*authority),
idleAuthorities: cache.NewTimeoutCache(idleAuthorityDeleteTimeout),
}

View File

@ -32,14 +32,14 @@ var _ XDSClient = &clientImpl{}
// clientImpl is the real implementation of the xds client. The exported Client
// is a wrapper of this struct with a ref count.
//
// Implements UpdateHandler interface.
// TODO(easwars): Make a wrapper struct which implements this interface in the
// style of ccBalancerWrapper so that the Client type does not implement these
// exported methods.
type clientImpl struct {
done *grpcsync.Event
config *bootstrap.Config
done *grpcsync.Event
config *bootstrap.Config
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
serializer *callbackSerializer
serializerClose func()
resourceTypes *resourceTypeRegistry
// authorityMu protects the authority fields. It's necessary because an
// authority is created when it's used.
@ -60,9 +60,6 @@ type clientImpl struct {
// An authority is either in authorities, or idleAuthorities,
// never both.
idleAuthorities *cache.TimeoutCache
logger *grpclog.PrefixLogger
watchExpiryTimeout time.Duration
}
// BootstrapConfig returns the configuration read from the bootstrap file.
@ -72,6 +69,9 @@ func (c *clientImpl) BootstrapConfig() *bootstrap.Config {
}
// Close closes the gRPC connection to the management server.
//
// TODO: ensure that all underlying transports are closed before this function
// returns.
func (c *clientImpl) Close() {
if c.done.HasFired() {
return
@ -80,16 +80,13 @@ func (c *clientImpl) Close() {
// TODO: Should we invoke the registered callbacks here with an error that
// the client is closed?
// Note that Close needs to check for nils even if some of them are always
// set in the constructor. This is because the constructor defers Close() in
// error cases, and the fields might not be set when the error happens.
c.authorityMu.Lock()
for _, a := range c.authorities {
a.close()
}
c.idleAuthorities.Clear(true)
c.authorityMu.Unlock()
c.serializerClose()
c.logger.Infof("Shutdown")
}

View File

@ -18,6 +18,10 @@
package xdsclient
import (
"context"
"fmt"
"sync"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)
@ -103,3 +107,67 @@ func (c *clientImpl) WatchEndpoints(clusterName string, cb func(xdsresource.Endp
unref()
}
}
// WatchResource uses xDS to discover the resource associated with the provided
// resource name. The resource type implementation determines how xDS requests
// are sent out and how responses are deserialized and validated. Upon receipt
// of a response from the management server, an appropriate callback on the
// watcher is invoked.
func (c *clientImpl) WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) {
// Return early if the client is already closed.
//
// The client returned from the top-level API is a ref-counted client which
// contains a pointer to `clientImpl`. When all references are released, the
// ref-counted client sets its pointer to `nil`. And if any watch APIs are
// made on such a closed client, we will get here with a `nil` receiver.
if c == nil || c.done.HasFired() {
logger.Warningf("Watch registered for name %q of type %q, but client is closed", rType.TypeEnum().String(), resourceName)
return func() {}
}
if err := c.resourceTypes.maybeRegister(rType); err != nil {
c.serializer.Schedule(func(context.Context) { watcher.OnError(err) })
return func() {}
}
// TODO: replace this with the code does the following when we have
// implemented generic watch API on the authority:
// - Parse the resource name and extract the authority.
// - Locate the corresponding authority object and acquire a reference to
// it. If the authority is not found, error out.
// - Call the watchResource() method on the authority.
// - Return a cancel function to cancel the watch on the authority and to
// release the reference.
return func() {}
}
// A registry of xdsresource.Type implementations indexed by their corresponding
// type URLs. Registration of an xdsresource.Type happens the first time a watch
// for a resource of that type is invoked.
type resourceTypeRegistry struct {
mu sync.Mutex
types map[string]xdsresource.Type
}
func newResourceTypeRegistry() *resourceTypeRegistry {
return &resourceTypeRegistry{types: make(map[string]xdsresource.Type)}
}
func (r *resourceTypeRegistry) maybeRegister(rType xdsresource.Type) error {
r.mu.Lock()
defer r.mu.Unlock()
urls := []string{rType.V2TypeURL(), rType.V3TypeURL()}
for _, u := range urls {
if u == "" {
// Silently ignore unsupported versions of the resource.
continue
}
typ, ok := r.types[u]
if ok && typ != rType {
return fmt.Errorf("attempt to re-register a resource type implementation for %v", rType.TypeEnum())
}
r.types[u] = rType
}
return nil
}

View File

@ -0,0 +1,149 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
var (
// Compile time interface checks.
_ Type = clusterResourceType{}
_ ResourceData = &ClusterResourceData{}
// Singleton instantiation of the resource type implementation.
clusterType = clusterResourceType{
resourceTypeState: resourceTypeState{
v2TypeURL: "type.googleapis.com/envoy.api.v2.Cluster",
v3TypeURL: "type.googleapis.com/envoy.config.cluster.v3.Cluster",
typeEnum: ClusterResource,
allResourcesRequiredInSotW: true,
},
}
)
// clusterResourceType provides the resource-type specific functionality for a
// Cluster resource.
//
// Implements the Type interface.
type clusterResourceType struct {
resourceTypeState
}
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (clusterResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
name, cluster, err := unmarshalClusterResource(resource, nil, opts.Logger)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
return nil, err
case err != nil:
// Protobuf deserialization succeeded, but resource validation failed.
return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err
}
// Perform extra validation here.
if err := securityConfigValidator(opts.BootstrapConfig, cluster.SecurityCfg); err != nil {
return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: ClusterUpdate{}}}, err
}
return &DecodeResult{Name: name, Resource: &ClusterResourceData{Resource: cluster}}, nil
}
// ClusterResourceData wraps the configuration of a Cluster resource as received
// from the management server.
//
// Implements the ResourceData interface.
type ClusterResourceData struct {
ResourceData
// TODO: We have always stored update structs by value. See if this can be
// switched to a pointer?
Resource ClusterUpdate
}
// Equal returns true if other is equal to r.
func (c *ClusterResourceData) Equal(other ResourceData) bool {
if c == nil && other == nil {
return true
}
if (c == nil) != (other == nil) {
return false
}
return proto.Equal(c.Resource.Raw, other.Raw())
}
// ToJSON returns a JSON string representation of the resource data.
func (c *ClusterResourceData) ToJSON() string {
return pretty.ToJSON(c.Resource)
}
// Raw returns the underlying raw protobuf form of the cluster resource.
func (c *ClusterResourceData) Raw() *anypb.Any {
return c.Resource.Raw
}
// ClusterWatcher wraps the callbacks to be invoked for different events
// corresponding to the cluster resource being watched.
type ClusterWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*ClusterResourceData)
// OnError is invoked under different error conditions including but not
// limited to the following:
// - authority mentioned in the resource is not found
// - resource name parsing error
// - resource deserialization error
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
}
type delegatingClusterWatcher struct {
watcher ClusterWatcher
}
func (d *delegatingClusterWatcher) OnUpdate(data ResourceData) {
c := data.(*ClusterResourceData)
d.watcher.OnUpdate(c)
}
func (d *delegatingClusterWatcher) OnError(err error) {
d.watcher.OnError(err)
}
func (d *delegatingClusterWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
}
// WatchCluster uses xDS to discover the configuration associated with the
// provided cluster resource name.
func WatchCluster(p Producer, name string, w ClusterWatcher) (cancel func()) {
delegator := &delegatingClusterWatcher{watcher: w}
return p.WatchResource(clusterType, name, delegator)
}

View File

@ -0,0 +1,144 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
var (
// Compile time interface checks.
_ Type = endpointsResourceType{}
_ ResourceData = &EndpointsResourceData{}
// Singleton instantiation of the resource type implementation.
endpointsType = endpointsResourceType{
resourceTypeState: resourceTypeState{
v2TypeURL: "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
v3TypeURL: "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
typeEnum: EndpointsResource,
allResourcesRequiredInSotW: false,
},
}
)
// endpointsResourceType provides the resource-type specific functionality for a
// ClusterLoadAssignment (or Endpoints) resource.
//
// Implements the Type interface.
type endpointsResourceType struct {
resourceTypeState
}
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (endpointsResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
name, rc, err := unmarshalEndpointsResource(resource, opts.Logger)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
return nil, err
case err != nil:
// Protobuf deserialization succeeded, but resource validation failed.
return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: EndpointsUpdate{}}}, err
}
return &DecodeResult{Name: name, Resource: &EndpointsResourceData{Resource: rc}}, nil
}
// EndpointsResourceData wraps the configuration of an Endpoints resource as
// received from the management server.
//
// Implements the ResourceData interface.
type EndpointsResourceData struct {
ResourceData
// TODO: We have always stored update structs by value. See if this can be
// switched to a pointer?
Resource EndpointsUpdate
}
// Equal returns true if other is equal to r.
func (e *EndpointsResourceData) Equal(other ResourceData) bool {
if e == nil && other == nil {
return true
}
if (e == nil) != (other == nil) {
return false
}
return proto.Equal(e.Resource.Raw, other.Raw())
}
// ToJSON returns a JSON string representation of the resource data.
func (e *EndpointsResourceData) ToJSON() string {
return pretty.ToJSON(e.Resource)
}
// Raw returns the underlying raw protobuf form of the listener resource.
func (e *EndpointsResourceData) Raw() *anypb.Any {
return e.Resource.Raw
}
// EndpointsWatcher wraps the callbacks to be invoked for different
// events corresponding to the endpoints resource being watched.
type EndpointsWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*EndpointsResourceData)
// OnError is invoked under different error conditions including but not
// limited to the following:
// - authority mentioned in the resource is not found
// - resource name parsing error
// - resource deserialization error
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
}
type delegatingEndpointsWatcher struct {
watcher EndpointsWatcher
}
func (d *delegatingEndpointsWatcher) OnUpdate(data ResourceData) {
e := data.(*EndpointsResourceData)
d.watcher.OnUpdate(e)
}
func (d *delegatingEndpointsWatcher) OnError(err error) {
d.watcher.OnError(err)
}
func (d *delegatingEndpointsWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
}
// WatchEndpoints uses xDS to discover the configuration associated with the
// provided endpoints resource name.
func WatchEndpoints(p Producer, name string, w EndpointsWatcher) (cancel func()) {
delegator := &delegatingEndpointsWatcher{watcher: w}
return p.WatchResource(endpointsType, name, delegator)
}

View File

@ -0,0 +1,181 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"fmt"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
var (
// Compile time interface checks.
_ Type = listenerResourceType{}
_ ResourceData = &ListenerResourceData{}
// Singleton instantiation of the resource type implementation.
listenerType = listenerResourceType{
resourceTypeState: resourceTypeState{
v2TypeURL: "type.googleapis.com/envoy.api.v2.Listener",
v3TypeURL: "type.googleapis.com/envoy.config.listener.v3.Listener",
typeEnum: ListenerResource,
allResourcesRequiredInSotW: true,
},
}
)
// listenerResourceType provides the resource-type specific functionality for a
// Listener resource.
//
// Implements the Type interface.
type listenerResourceType struct {
resourceTypeState
}
func securityConfigValidator(bc *bootstrap.Config, sc *SecurityConfig) error {
if sc == nil {
return nil
}
if sc.IdentityInstanceName != "" {
if _, ok := bc.CertProviderConfigs[sc.IdentityInstanceName]; !ok {
return fmt.Errorf("identitiy certificate provider instance name %q missing in bootstrap configuration", sc.IdentityInstanceName)
}
}
if sc.RootInstanceName != "" {
if _, ok := bc.CertProviderConfigs[sc.RootInstanceName]; !ok {
return fmt.Errorf("root certificate provider instance name %q missing in bootstrap configuration", sc.RootInstanceName)
}
}
return nil
}
func listenerValidator(bc *bootstrap.Config, lis ListenerUpdate) error {
if lis.InboundListenerCfg == nil || lis.InboundListenerCfg.FilterChains == nil {
return nil
}
return lis.InboundListenerCfg.FilterChains.Validate(func(fc *FilterChain) error {
if fc == nil {
return nil
}
return securityConfigValidator(bc, fc.SecurityCfg)
})
}
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (listenerResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
name, listener, err := unmarshalListenerResource(resource, nil, opts.Logger)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
return nil, err
case err != nil:
// Protobuf deserialization succeeded, but resource validation failed.
return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err
}
// Perform extra validation here.
if err := listenerValidator(opts.BootstrapConfig, listener); err != nil {
return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: ListenerUpdate{}}}, err
}
return &DecodeResult{Name: name, Resource: &ListenerResourceData{Resource: listener}}, nil
}
// ListenerResourceData wraps the configuration of a Listener resource as
// received from the management server.
//
// Implements the ResourceData interface.
type ListenerResourceData struct {
ResourceData
// TODO: We have always stored update structs by value. See if this can be
// switched to a pointer?
Resource ListenerUpdate
}
// Equal returns true if other is equal to l.
func (l *ListenerResourceData) Equal(other ResourceData) bool {
if l == nil && other == nil {
return true
}
if (l == nil) != (other == nil) {
return false
}
return proto.Equal(l.Resource.Raw, other.Raw())
}
// ToJSON returns a JSON string representation of the resource data.
func (l *ListenerResourceData) ToJSON() string {
return pretty.ToJSON(l.Resource)
}
// Raw returns the underlying raw protobuf form of the listener resource.
func (l *ListenerResourceData) Raw() *anypb.Any {
return l.Resource.Raw
}
// ListenerWatcher wraps the callbacks to be invoked for different
// events corresponding to the listener resource being watched.
type ListenerWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*ListenerResourceData)
// OnError is invoked under different error conditions including but not
// limited to the following:
// - authority mentioned in the resource is not found
// - resource name parsing error
// - resource deserialization error
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
}
type delegatingListenerWatcher struct {
watcher ListenerWatcher
}
func (d *delegatingListenerWatcher) OnUpdate(data ResourceData) {
l := data.(*ListenerResourceData)
d.watcher.OnUpdate(l)
}
func (d *delegatingListenerWatcher) OnError(err error) {
d.watcher.OnError(err)
}
func (d *delegatingListenerWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
}
// WatchListener uses xDS to discover the configuration associated with the
// provided listener resource name.
func WatchListener(p Producer, name string, w ListenerWatcher) (cancel func()) {
delegator := &delegatingListenerWatcher{watcher: w}
return p.WatchResource(listenerType, name, delegator)
}

View File

@ -0,0 +1,158 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/protobuf/types/known/anypb"
)
// Producer contains a single method to discover resource configuration from a
// remote management server using xDS APIs.
//
// The xdsclient package provides a concrete implementation of this interface.
type Producer interface {
// WatchResource uses xDS to discover the resource associated with the
// provided resource name. The resource type implementation determines how
// xDS requests are sent out and how responses are deserialized and
// validated. Upon receipt of a response from the management server, an
// appropriate callback on the watcher is invoked.
WatchResource(rType Type, resourceName string, watcher ResourceWatcher) (cancel func())
}
// ResourceWatcher wraps the callbacks to be invoked for different events
// corresponding to the resource being watched.
type ResourceWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
// The ResourceData parameter needs to be type asserted to the appropriate
// type for the resource being watched.
OnUpdate(ResourceData)
// OnError is invoked under different error conditions including but not
// limited to the following:
// - authority mentioned in the resource is not found
// - resource name parsing error
// - resource deserialization error
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
}
// TODO: Once the implementation is complete, rename this interface as
// ResourceType and get rid of the existing ResourceType enum.
// Type wraps all resource-type specific functionality. Each supported resource
// type will provide an implementation of this interface.
type Type interface {
// V2TypeURL is the xDS type URL of this resource type for v2 transport.
V2TypeURL() string
// V3TypeURL is the xDS type URL of this resource type for v3 transport.
V3TypeURL() string
// TypeEnum is an enumerated value for this resource type. This can be used
// for logging/debugging purposes, as well in cases where the resource type
// is to be uniquely identified but the actual functionality provided by the
// resource type is not required.
//
// TODO: once Type is renamed to ResourceType, rename ResourceType to
// ResourceTypeEnum.
TypeEnum() ResourceType
// AllResourcesRequiredInSotW indicates whether this resource type requires
// that all resources be present in every SotW response from the server. If
// true, a response that does not include a previously seen resource will be
// interpreted as a deletion of that resource.
AllResourcesRequiredInSotW() bool
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
//
// If protobuf deserialization fails or resource validation fails,
// returns a non-nil error. Otherwise, returns a fully populated
// DecodeResult.
Decode(*DecodeOptions, *anypb.Any) (*DecodeResult, error)
}
// ResourceData contains the configuration data sent by the xDS management
// server, associated with the resource being watched. Every resource type must
// provide an implementation of this interface to represent the configuration
// received from the xDS management server.
type ResourceData interface {
isResourceData()
// Equal returns true if the passed in resource data is equal to that of the
// receiver.
Equal(ResourceData) bool
// ToJSON returns a JSON string representation of the resource data.
ToJSON() string
Raw() *anypb.Any
}
// DecodeOptions wraps the options required by ResourceType implementation for
// decoding configuration received from the xDS management server.
type DecodeOptions struct {
// BootstrapConfig contains the bootstrap configuration passed to the
// top-level xdsClient. This contains useful data for resource validation.
BootstrapConfig *bootstrap.Config
// Logger is to be used for emitting logs during the Decode operation.
Logger *grpclog.PrefixLogger
}
// DecodeResult is the result of a decode operation.
type DecodeResult struct {
// Name is the name of the resource being watched.
Name string
// Resource contains the configuration associated with the resource being
// watched.
Resource ResourceData
}
// resourceTypeState wraps the static state associated with concrete resource
// type implementations, which can then embed this struct and get the methods
// implemented here for free.
type resourceTypeState struct {
v2TypeURL string
v3TypeURL string
typeEnum ResourceType
allResourcesRequiredInSotW bool
}
func (r resourceTypeState) V2TypeURL() string {
return r.v2TypeURL
}
func (r resourceTypeState) V3TypeURL() string {
return r.v3TypeURL
}
func (r resourceTypeState) TypeEnum() ResourceType {
return r.typeEnum
}
func (r resourceTypeState) AllResourcesRequiredInSotW() bool {
return r.allResourcesRequiredInSotW
}

View File

@ -0,0 +1,145 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
var (
// Compile time interface checks.
_ Type = routeConfigResourceType{}
_ ResourceData = &RouteConfigResourceData{}
// Singleton instantiation of the resource type implementation.
routeConfigType = routeConfigResourceType{
resourceTypeState: resourceTypeState{
v2TypeURL: "type.googleapis.com/envoy.api.v2.RouteConfiguration",
v3TypeURL: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration",
typeEnum: RouteConfigResource,
allResourcesRequiredInSotW: false,
},
}
)
// routeConfigResourceType provides the resource-type specific functionality for
// a RouteConfiguration resource.
//
// Implements the Type interface.
type routeConfigResourceType struct {
resourceTypeState
}
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (routeConfigResourceType) Decode(opts *DecodeOptions, resource *anypb.Any) (*DecodeResult, error) {
name, rc, err := unmarshalRouteConfigResource(resource, opts.Logger)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
return nil, err
case err != nil:
// Protobuf deserialization succeeded, but resource validation failed.
return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: RouteConfigUpdate{}}}, err
}
return &DecodeResult{Name: name, Resource: &RouteConfigResourceData{Resource: rc}}, nil
}
// RouteConfigResourceData wraps the configuration of a RouteConfiguration
// resource as received from the management server.
//
// Implements the ResourceData interface.
type RouteConfigResourceData struct {
ResourceData
// TODO: We have always stored update structs by value. See if this can be
// switched to a pointer?
Resource RouteConfigUpdate
}
// Equal returns true if other is equal to r.
func (r *RouteConfigResourceData) Equal(other ResourceData) bool {
if r == nil && other == nil {
return true
}
if (r == nil) != (other == nil) {
return false
}
return proto.Equal(r.Resource.Raw, other.Raw())
}
// ToJSON returns a JSON string representation of the resource data.
func (r *RouteConfigResourceData) ToJSON() string {
return pretty.ToJSON(r.Resource)
}
// Raw returns the underlying raw protobuf form of the route configuration
// resource.
func (r *RouteConfigResourceData) Raw() *anypb.Any {
return r.Resource.Raw
}
// RouteConfigWatcher wraps the callbacks to be invoked for different
// events corresponding to the route configuration resource being watched.
type RouteConfigWatcher interface {
// OnUpdate is invoked to report an update for the resource being watched.
OnUpdate(*RouteConfigResourceData)
// OnError is invoked under different error conditions including but not
// limited to the following:
// - authority mentioned in the resource is not found
// - resource name parsing error
// - resource deserialization error
// - resource validation error
// - ADS stream failure
// - connection failure
OnError(error)
// OnResourceDoesNotExist is invoked for a specific error condition where
// the requested resource is not found on the xDS management server.
OnResourceDoesNotExist()
}
type delegatingRouteConfigWatcher struct {
watcher RouteConfigWatcher
}
func (d *delegatingRouteConfigWatcher) OnUpdate(data ResourceData) {
rc := data.(*RouteConfigResourceData)
d.watcher.OnUpdate(rc)
}
func (d *delegatingRouteConfigWatcher) OnError(err error) {
d.watcher.OnError(err)
}
func (d *delegatingRouteConfigWatcher) OnResourceDoesNotExist() {
d.watcher.OnResourceDoesNotExist()
}
// WatchRouteConfig uses xDS to discover the configuration associated with the
// provided route configuration resource name.
func WatchRouteConfig(p Producer, name string, w RouteConfigWatcher) (cancel func()) {
delegator := &delegatingRouteConfigWatcher{watcher: w}
return p.WatchResource(routeConfigType, name, delegator)
}