package resource import ( "context" "crypto/tls" "encoding/base64" "encoding/json" "fmt" "log/slog" "net/http" "github.com/fullstorydev/grpchan" "github.com/fullstorydev/grpchan/inprocgrpc" "github.com/go-jose/go-jose/v3/jwt" grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" authnlib "github.com/grafana/authlib/authn" "github.com/grafana/authlib/grpcutils" "github.com/grafana/authlib/types" "github.com/grafana/grafana/pkg/apimachinery/identity" authnGrpcUtils "github.com/grafana/grafana/pkg/services/authn/grpcutils" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/setting" grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc" "github.com/grafana/grafana/pkg/storage/unified/resourcepb" ) type ResourceClient interface { resourcepb.ResourceStoreClient resourcepb.ResourceIndexClient resourcepb.ManagedObjectIndexClient resourcepb.BulkStoreClient resourcepb.BlobStoreClient resourcepb.DiagnosticsClient } // Internal implementation type resourceClient struct { resourcepb.ResourceStoreClient resourcepb.ResourceIndexClient resourcepb.ManagedObjectIndexClient resourcepb.BulkStoreClient resourcepb.BlobStoreClient resourcepb.DiagnosticsClient } func NewResourceClient(conn, indexConn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) { if !features.IsEnabledGlobally(featuremgmt.FlagAppPlatformGrpcClientAuth) { return NewLegacyResourceClient(conn, indexConn), nil } clientCfg := authnGrpcUtils.ReadGrpcClientConfig(cfg) return NewRemoteResourceClient(tracer, conn, indexConn, RemoteResourceClientConfig{ Token: clientCfg.Token, TokenExchangeURL: clientCfg.TokenExchangeURL, Audiences: []string{"resourceStore"}, Namespace: clientCfg.TokenNamespace, AllowInsecure: cfg.Env == setting.Dev, }) } func newResourceClient(storageCc grpc.ClientConnInterface, indexCc grpc.ClientConnInterface) ResourceClient { return &resourceClient{ ResourceStoreClient: resourcepb.NewResourceStoreClient(storageCc), ResourceIndexClient: resourcepb.NewResourceIndexClient(indexCc), ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(indexCc), BulkStoreClient: resourcepb.NewBulkStoreClient(storageCc), BlobStoreClient: resourcepb.NewBlobStoreClient(storageCc), DiagnosticsClient: resourcepb.NewDiagnosticsClient(storageCc), } } func NewAuthlessResourceClient(cc grpc.ClientConnInterface) ResourceClient { return newResourceClient(cc, cc) } func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc.ClientConnInterface) ResourceClient { cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor) cci := grpchan.InterceptClientConn(indexChannel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor) return newResourceClient(cc, cci) } func NewLocalResourceClient(server ResourceServer) ResourceClient { // scenario: local in-proc channel := &inprocgrpc.Channel{} tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource") grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer) for _, desc := range []*grpc.ServiceDesc{ &resourcepb.ResourceStore_ServiceDesc, &resourcepb.ResourceIndex_ServiceDesc, &resourcepb.ManagedObjectIndex_ServiceDesc, &resourcepb.BlobStore_ServiceDesc, &resourcepb.BulkStore_ServiceDesc, &resourcepb.Diagnostics_ServiceDesc, } { channel.RegisterService( grpchan.InterceptServer( desc, grpcAuth.UnaryServerInterceptor(grpcAuthInt), grpcAuth.StreamServerInterceptor(grpcAuthInt), ), server, ) } clientInt := authnlib.NewGrpcClientInterceptor( ProvideInProcExchanger(), authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor), ) cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) return newResourceClient(cc, cc) } type RemoteResourceClientConfig struct { Token string TokenExchangeURL string Audiences []string Namespace string AllowInsecure bool } func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, indexConn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) { exchangeOpts := []authnlib.ExchangeClientOpts{} if cfg.AllowInsecure { exchangeOpts = append(exchangeOpts, authnlib.WithHTTPClient(&http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}})) } tc, err := authnlib.NewTokenExchangeClient(authnlib.TokenExchangeConfig{ Token: cfg.Token, TokenExchangeURL: cfg.TokenExchangeURL, }, exchangeOpts...) if err != nil { return nil, err } clientInt := authnlib.NewGrpcClientInterceptor( tc, authnlib.WithClientInterceptorTracer(tracer), authnlib.WithClientInterceptorNamespace(cfg.Namespace), authnlib.WithClientInterceptorAudience(cfg.Audiences), authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor), ) cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) cci := grpchan.InterceptClientConn(indexConn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor) return newResourceClient(cc, cci), nil } var authLogger = slog.Default().With("logger", "resource-client-auth-interceptor") func idTokenExtractor(ctx context.Context) (string, error) { if identity.IsServiceIdentity(ctx) { return "", nil } info, ok := types.AuthInfoFrom(ctx) if !ok { return "", fmt.Errorf("no claims found") } if token := info.GetIDToken(); len(token) != 0 { return token, nil } if !types.IsIdentityType(info.GetIdentityType(), types.TypeAccessPolicy) { authLogger.Warn( "calling resource store as the service without id token or marking it as the service identity", "subject", info.GetSubject(), "uid", info.GetUID(), ) } return "", nil } func ProvideInProcExchanger() authnlib.StaticTokenExchanger { token, err := createInProcToken() if err != nil { panic(err) } return authnlib.NewStaticTokenExchanger(token) } func createInProcToken() (string, error) { claims := authnlib.Claims[authnlib.AccessTokenClaims]{ Claims: jwt.Claims{ Issuer: "grafana", Subject: types.NewTypeID(types.TypeAccessPolicy, "grafana"), Audience: []string{"resourceStore"}, }, Rest: authnlib.AccessTokenClaims{ Namespace: "*", Permissions: identity.ServiceIdentityClaims.Rest.Permissions, DelegatedPermissions: identity.ServiceIdentityClaims.Rest.DelegatedPermissions, }, } header, err := json.Marshal(map[string]string{ "alg": "none", "typ": authnlib.TokenTypeAccess, }) if err != nil { return "", err } payload, err := json.Marshal(claims) if err != nil { return "", err } return base64.RawURLEncoding.EncodeToString(header) + "." + base64.RawURLEncoding.EncodeToString(payload) + ".", nil }