mirror of
https://github.com/grafana/grafana.git
synced 2025-07-31 06:12:49 +08:00
222 lines
7.0 KiB
Go
222 lines
7.0 KiB
Go
package resource
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"net/http"
|
|
|
|
"github.com/fullstorydev/grpchan"
|
|
"github.com/fullstorydev/grpchan/inprocgrpc"
|
|
"github.com/go-jose/go-jose/v3/jwt"
|
|
grpcAuth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc"
|
|
|
|
authnlib "github.com/grafana/authlib/authn"
|
|
"github.com/grafana/authlib/grpcutils"
|
|
"github.com/grafana/authlib/types"
|
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/identity"
|
|
authnGrpcUtils "github.com/grafana/grafana/pkg/services/authn/grpcutils"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
"github.com/grafana/grafana/pkg/setting"
|
|
grpcUtils "github.com/grafana/grafana/pkg/storage/unified/resource/grpc"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
)
|
|
|
|
type ResourceClient interface {
|
|
resourcepb.ResourceStoreClient
|
|
resourcepb.ResourceIndexClient
|
|
resourcepb.ManagedObjectIndexClient
|
|
resourcepb.BulkStoreClient
|
|
resourcepb.BlobStoreClient
|
|
resourcepb.DiagnosticsClient
|
|
}
|
|
|
|
// Internal implementation
|
|
type resourceClient struct {
|
|
resourcepb.ResourceStoreClient
|
|
resourcepb.ResourceIndexClient
|
|
resourcepb.ManagedObjectIndexClient
|
|
resourcepb.BulkStoreClient
|
|
resourcepb.BlobStoreClient
|
|
resourcepb.DiagnosticsClient
|
|
}
|
|
|
|
func NewResourceClient(conn, indexConn grpc.ClientConnInterface, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer trace.Tracer) (ResourceClient, error) {
|
|
if !features.IsEnabledGlobally(featuremgmt.FlagAppPlatformGrpcClientAuth) {
|
|
return NewLegacyResourceClient(conn, indexConn), nil
|
|
}
|
|
|
|
clientCfg := authnGrpcUtils.ReadGrpcClientConfig(cfg)
|
|
|
|
return NewRemoteResourceClient(tracer, conn, indexConn, RemoteResourceClientConfig{
|
|
Token: clientCfg.Token,
|
|
TokenExchangeURL: clientCfg.TokenExchangeURL,
|
|
Audiences: []string{"resourceStore"},
|
|
Namespace: clientCfg.TokenNamespace,
|
|
AllowInsecure: cfg.Env == setting.Dev,
|
|
})
|
|
}
|
|
|
|
func newResourceClient(storageCc grpc.ClientConnInterface, indexCc grpc.ClientConnInterface) ResourceClient {
|
|
return &resourceClient{
|
|
ResourceStoreClient: resourcepb.NewResourceStoreClient(storageCc),
|
|
ResourceIndexClient: resourcepb.NewResourceIndexClient(indexCc),
|
|
ManagedObjectIndexClient: resourcepb.NewManagedObjectIndexClient(indexCc),
|
|
BulkStoreClient: resourcepb.NewBulkStoreClient(storageCc),
|
|
BlobStoreClient: resourcepb.NewBlobStoreClient(storageCc),
|
|
DiagnosticsClient: resourcepb.NewDiagnosticsClient(storageCc),
|
|
}
|
|
}
|
|
|
|
func NewAuthlessResourceClient(cc grpc.ClientConnInterface) ResourceClient {
|
|
return newResourceClient(cc, cc)
|
|
}
|
|
|
|
func NewLegacyResourceClient(channel grpc.ClientConnInterface, indexChannel grpc.ClientConnInterface) ResourceClient {
|
|
cc := grpchan.InterceptClientConn(channel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
|
|
cci := grpchan.InterceptClientConn(indexChannel, grpcUtils.UnaryClientInterceptor, grpcUtils.StreamClientInterceptor)
|
|
return newResourceClient(cc, cci)
|
|
}
|
|
|
|
func NewLocalResourceClient(server ResourceServer) ResourceClient {
|
|
// scenario: local in-proc
|
|
channel := &inprocgrpc.Channel{}
|
|
tracer := otel.Tracer("github.com/grafana/grafana/pkg/storage/unified/resource")
|
|
|
|
grpcAuthInt := grpcutils.NewUnsafeAuthenticator(tracer)
|
|
for _, desc := range []*grpc.ServiceDesc{
|
|
&resourcepb.ResourceStore_ServiceDesc,
|
|
&resourcepb.ResourceIndex_ServiceDesc,
|
|
&resourcepb.ManagedObjectIndex_ServiceDesc,
|
|
&resourcepb.BlobStore_ServiceDesc,
|
|
&resourcepb.BulkStore_ServiceDesc,
|
|
&resourcepb.Diagnostics_ServiceDesc,
|
|
} {
|
|
channel.RegisterService(
|
|
grpchan.InterceptServer(
|
|
desc,
|
|
grpcAuth.UnaryServerInterceptor(grpcAuthInt),
|
|
grpcAuth.StreamServerInterceptor(grpcAuthInt),
|
|
),
|
|
server,
|
|
)
|
|
}
|
|
|
|
clientInt := authnlib.NewGrpcClientInterceptor(
|
|
ProvideInProcExchanger(),
|
|
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
|
|
)
|
|
|
|
cc := grpchan.InterceptClientConn(channel, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
|
return newResourceClient(cc, cc)
|
|
}
|
|
|
|
type RemoteResourceClientConfig struct {
|
|
Token string
|
|
TokenExchangeURL string
|
|
Audiences []string
|
|
Namespace string
|
|
AllowInsecure bool
|
|
}
|
|
|
|
func NewRemoteResourceClient(tracer trace.Tracer, conn grpc.ClientConnInterface, indexConn grpc.ClientConnInterface, cfg RemoteResourceClientConfig) (ResourceClient, error) {
|
|
exchangeOpts := []authnlib.ExchangeClientOpts{}
|
|
|
|
if cfg.AllowInsecure {
|
|
exchangeOpts = append(exchangeOpts, authnlib.WithHTTPClient(&http.Client{Transport: &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}}))
|
|
}
|
|
|
|
tc, err := authnlib.NewTokenExchangeClient(authnlib.TokenExchangeConfig{
|
|
Token: cfg.Token,
|
|
TokenExchangeURL: cfg.TokenExchangeURL,
|
|
}, exchangeOpts...)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clientInt := authnlib.NewGrpcClientInterceptor(
|
|
tc,
|
|
authnlib.WithClientInterceptorTracer(tracer),
|
|
authnlib.WithClientInterceptorNamespace(cfg.Namespace),
|
|
authnlib.WithClientInterceptorAudience(cfg.Audiences),
|
|
authnlib.WithClientInterceptorIDTokenExtractor(idTokenExtractor),
|
|
)
|
|
|
|
cc := grpchan.InterceptClientConn(conn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
|
cci := grpchan.InterceptClientConn(indexConn, clientInt.UnaryClientInterceptor, clientInt.StreamClientInterceptor)
|
|
return newResourceClient(cc, cci), nil
|
|
}
|
|
|
|
var authLogger = slog.Default().With("logger", "resource-client-auth-interceptor")
|
|
|
|
func idTokenExtractor(ctx context.Context) (string, error) {
|
|
if identity.IsServiceIdentity(ctx) {
|
|
return "", nil
|
|
}
|
|
|
|
info, ok := types.AuthInfoFrom(ctx)
|
|
if !ok {
|
|
return "", fmt.Errorf("no claims found")
|
|
}
|
|
|
|
if token := info.GetIDToken(); len(token) != 0 {
|
|
return token, nil
|
|
}
|
|
|
|
if !types.IsIdentityType(info.GetIdentityType(), types.TypeAccessPolicy) {
|
|
authLogger.Warn(
|
|
"calling resource store as the service without id token or marking it as the service identity",
|
|
"subject", info.GetSubject(),
|
|
"uid", info.GetUID(),
|
|
)
|
|
}
|
|
|
|
return "", nil
|
|
}
|
|
|
|
func ProvideInProcExchanger() authnlib.StaticTokenExchanger {
|
|
token, err := createInProcToken()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return authnlib.NewStaticTokenExchanger(token)
|
|
}
|
|
|
|
func createInProcToken() (string, error) {
|
|
claims := authnlib.Claims[authnlib.AccessTokenClaims]{
|
|
Claims: jwt.Claims{
|
|
Issuer: "grafana",
|
|
Subject: types.NewTypeID(types.TypeAccessPolicy, "grafana"),
|
|
Audience: []string{"resourceStore"},
|
|
},
|
|
Rest: authnlib.AccessTokenClaims{
|
|
Namespace: "*",
|
|
Permissions: identity.ServiceIdentityClaims.Rest.Permissions,
|
|
DelegatedPermissions: identity.ServiceIdentityClaims.Rest.DelegatedPermissions,
|
|
},
|
|
}
|
|
|
|
header, err := json.Marshal(map[string]string{
|
|
"alg": "none",
|
|
"typ": authnlib.TokenTypeAccess,
|
|
})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
payload, err := json.Marshal(claims)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
return base64.RawURLEncoding.EncodeToString(header) + "." + base64.RawURLEncoding.EncodeToString(payload) + ".", nil
|
|
}
|