Files
Will Assis f4ee58db50 unified-storage: split resource index client (#106297)
* split resource server and index server grpc connection if defined in config
2025-06-27 08:15:52 -04:00

222 lines
7.0 KiB
Go

package resource
import (
"context"
"crypto/tls"
"encoding/base64"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"github.com/fullstorydev/grpchan"
"github.com/fullstorydev/grpchan/inprocgrpc"
"github.com/go-jose/go-jose/v3/jwt"
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
authnlib "github.com/grafana/authlib/authn"
"github.com/grafana/authlib/grpcutils"
"github.com/grafana/authlib/types"
"github.com/grafana/grafana/pkg/apimachinery/identity"
authnGrpcUtils "github.com/grafana/grafana/pkg/services/authn/grpcutils"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
)
type ResourceClient interface {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
resourcepb.BlobStoreClient
resourcepb.DiagnosticsClient
}
// Internal implementation
type resourceClient struct {
resourcepb.ResourceStoreClient
resourcepb.ResourceIndexClient
resourcepb.ManagedObjectIndexClient
resourcepb.BulkStoreClient
resourcepb.BlobStoreClient
resourcepb.DiagnosticsClient
}
func NewResourceClient(conn, indexConn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) {
if !features.IsEnabledGlobally(featuremgmt.FlagAppPlatformGrpcClientAuth) {
return NewLegacyResourceClient(conn, indexConn), nil
}
clientCfg := authnGrpcUtils.ReadGrpcClientConfig(cfg)
return NewRemoteResourceClient(tracer, conn, indexConn, RemoteResourceClientConfig{
Token: clientCfg.Token,
TokenExchangeURL: clientCfg.TokenExchangeURL,
Audiences: []string{"resourceStore"},
Namespace: clientCfg.TokenNamespace,
AllowInsecure: cfg.Env == setting.Dev,
})
}
func newResourceClient(storageCc grpc.ClientConnInterface, indexCc grpc.ClientConnInterface) ResourceClient {
return &resourceClient{
ResourceStoreClient: resourcepb.NewResourceStoreClient(storageCc),
ResourceIndexClient: resourcepb.NewResourceIndexClient(indexCc),
ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(indexCc),
BulkStoreClient: resourcepb.NewBulkStoreClient(storageCc),
BlobStoreClient: resourcepb.NewBlobStoreClient(storageCc),
DiagnosticsClient: resourcepb.NewDiagnosticsClient(storageCc),
}
}
func NewAuthlessResourceClient(cc grpc.ClientConnInterface) ResourceClient {
return newResourceClient(cc, cc)
}
func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc.ClientConnInterface) ResourceClient {
cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
cci := grpchan.InterceptClientConn(indexChannel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
return newResourceClient(cc, cci)
}
func NewLocalResourceClient(server ResourceServer) ResourceClient {
// scenario: local in-proc
channel := &inprocgrpc.Channel{}
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
for _, desc := range []*grpc.ServiceDesc{
&resourcepb.ResourceStore_ServiceDesc,
&resourcepb.ResourceIndex_ServiceDesc,
&resourcepb.ManagedObjectIndex_ServiceDesc,
&resourcepb.BlobStore_ServiceDesc,
&resourcepb.BulkStore_ServiceDesc,
&resourcepb.Diagnostics_ServiceDesc,
} {
channel.RegisterService(
grpchan.InterceptServer(
desc,
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
grpcAuth.StreamServerInterceptor(grpcAuthInt),
),
server,
)
}
clientInt := authnlib.NewGrpcClientInterceptor(
ProvideInProcExchanger(),
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
)
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cc)
}
type RemoteResourceClientConfig struct {
Token string
TokenExchangeURL string
Audiences []string
Namespace string
AllowInsecure bool
}
func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, indexConn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) {
exchangeOpts := []authnlib.ExchangeClientOpts{}
if cfg.AllowInsecure {
exchangeOpts = append(exchangeOpts, authnlib.WithHTTPClient(&http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}))
}
tc, err := authnlib.NewTokenExchangeClient(authnlib.TokenExchangeConfig{
Token: cfg.Token,
TokenExchangeURL: cfg.TokenExchangeURL,
}, exchangeOpts...)
if err != nil {
return nil, err
}
clientInt := authnlib.NewGrpcClientInterceptor(
tc,
authnlib.WithClientInterceptorTracer(tracer),
authnlib.WithClientInterceptorNamespace(cfg.Namespace),
authnlib.WithClientInterceptorAudience(cfg.Audiences),
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
)
cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
cci := grpchan.InterceptClientConn(indexConn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
return newResourceClient(cc, cci), nil
}
var authLogger = slog.Default().With("logger", "resource-client-auth-interceptor")
func idTokenExtractor(ctx context.Context) (string, error) {
if identity.IsServiceIdentity(ctx) {
return "", nil
}
info, ok := types.AuthInfoFrom(ctx)
if !ok {
return "", fmt.Errorf("no claims found")
}
if token := info.GetIDToken(); len(token) != 0 {
return token, nil
}
if !types.IsIdentityType(info.GetIdentityType(), types.TypeAccessPolicy) {
authLogger.Warn(
"calling resource store as the service without id token or marking it as the service identity",
"subject", info.GetSubject(),
"uid", info.GetUID(),
)
}
return "", nil
}
func ProvideInProcExchanger() authnlib.StaticTokenExchanger {
token, err := createInProcToken()
if err != nil {
panic(err)
}
return authnlib.NewStaticTokenExchanger(token)
}
func createInProcToken() (string, error) {
claims := authnlib.Claims[authnlib.AccessTokenClaims]{
Claims: jwt.Claims{
Issuer: "grafana",
Subject: types.NewTypeID(types.TypeAccessPolicy, "grafana"),
Audience: []string{"resourceStore"},
},
Rest: authnlib.AccessTokenClaims{
Namespace: "*",
Permissions: identity.ServiceIdentityClaims.Rest.Permissions,
DelegatedPermissions: identity.ServiceIdentityClaims.Rest.DelegatedPermissions,
},
}
header, err := json.Marshal(map[string]string{
"alg": "none",
"typ": authnlib.TokenTypeAccess,
})
if err != nil {
return "", err
}
payload, err := json.Marshal(claims)
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(header) + "." + base64.RawURLEncoding.EncodeToString(payload) + ".", nil
}