mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 14:52:46 +08:00
336 lines
8.4 KiB
Go
336 lines
8.4 KiB
Go
package resource
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
authlib "github.com/grafana/authlib/types"
|
|
|
|
"github.com/grafana/grafana/pkg/apimachinery/utils"
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
)
|
|
|
|
const grpcMetaKeyCollection = "x-gf-batch-collection"
|
|
const grpcMetaKeyRebuildCollection = "x-gf-batch-rebuild-collection"
|
|
const grpcMetaKeySkipValidation = "x-gf-batch-skip-validation"
|
|
|
|
// Logged in trace.
|
|
var metadataKeys = []string{
|
|
grpcMetaKeyCollection,
|
|
grpcMetaKeyRebuildCollection,
|
|
grpcMetaKeySkipValidation,
|
|
}
|
|
|
|
func grpcMetaValueIsTrue(vals []string) bool {
|
|
return len(vals) == 1 && vals[0] == "true"
|
|
}
|
|
|
|
type BulkRequestIterator interface {
|
|
Next() bool
|
|
|
|
// The next event we should process
|
|
Request() *resourcepb.BulkRequest
|
|
|
|
// Rollback requested
|
|
RollbackRequested() bool
|
|
}
|
|
|
|
type BulkProcessingBackend interface {
|
|
ProcessBulk(ctx context.Context, setting BulkSettings, iter BulkRequestIterator) *resourcepb.BulkResponse
|
|
}
|
|
|
|
type BulkResourceWriter interface {
|
|
io.Closer
|
|
|
|
Write(ctx context.Context, key *resourcepb.ResourceKey, value []byte) error
|
|
|
|
// Called when finished writing
|
|
CloseWithResults() (*resourcepb.BulkResponse, error)
|
|
}
|
|
|
|
type BulkSettings struct {
|
|
// All requests will be within this namespace/group/resource
|
|
Collection []*resourcepb.ResourceKey
|
|
|
|
// The batch will include everything from the collection
|
|
// - all existing values will be removed/replaced if the batch completes successfully
|
|
RebuildCollection bool
|
|
|
|
// The byte[] payload and folder has already been validated - no need to decode and verify
|
|
SkipValidation bool
|
|
}
|
|
|
|
func (x *BulkSettings) ToMD() metadata.MD {
|
|
md := make(metadata.MD)
|
|
if len(x.Collection) > 0 {
|
|
for _, v := range x.Collection {
|
|
md[grpcMetaKeyCollection] = append(md[grpcMetaKeyCollection], SearchID(v))
|
|
}
|
|
}
|
|
if x.RebuildCollection {
|
|
md[grpcMetaKeyRebuildCollection] = []string{"true"}
|
|
}
|
|
if x.SkipValidation {
|
|
md[grpcMetaKeySkipValidation] = []string{"true"}
|
|
}
|
|
return md
|
|
}
|
|
|
|
func NewBulkSettings(md metadata.MD) (BulkSettings, error) {
|
|
settings := BulkSettings{}
|
|
for k, v := range md {
|
|
switch k {
|
|
case grpcMetaKeyCollection:
|
|
for _, c := range v {
|
|
key := &resourcepb.ResourceKey{}
|
|
err := ReadSearchID(key, c)
|
|
if err != nil {
|
|
return settings, fmt.Errorf("error reading collection metadata: %s / %w", c, err)
|
|
}
|
|
settings.Collection = append(settings.Collection, key)
|
|
}
|
|
case grpcMetaKeyRebuildCollection:
|
|
settings.RebuildCollection = grpcMetaValueIsTrue(v)
|
|
case grpcMetaKeySkipValidation:
|
|
settings.SkipValidation = grpcMetaValueIsTrue(v)
|
|
}
|
|
}
|
|
return settings, nil
|
|
}
|
|
|
|
// BulkWrite implements ResourceServer.
|
|
// All requests must be to the same NAMESPACE/GROUP/RESOURCE
|
|
func (s *server) BulkProcess(stream resourcepb.BulkStore_BulkProcessServer) error {
|
|
ctx := stream.Context()
|
|
ctx, span := s.tracer.Start(ctx, "resource.server.BulkProcess")
|
|
defer span.End()
|
|
|
|
sendAndClose := func(rsp *resourcepb.BulkResponse) error {
|
|
span.AddEvent("sendAndClose", trace.WithAttributes(attribute.String("msg", rsp.String())))
|
|
return stream.SendAndClose(rsp)
|
|
}
|
|
|
|
user, ok := authlib.AuthInfoFrom(ctx)
|
|
if !ok || user == nil {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "no user found in context",
|
|
Code: http.StatusUnauthorized,
|
|
},
|
|
})
|
|
}
|
|
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "unable to read metadata gRPC request",
|
|
Code: http.StatusPreconditionFailed,
|
|
},
|
|
})
|
|
}
|
|
|
|
// Add relevant metadata into span.
|
|
for _, k := range metadataKeys {
|
|
meta := md.Get(k)
|
|
if len(meta) > 0 {
|
|
span.SetAttributes(attribute.StringSlice(k, meta))
|
|
}
|
|
}
|
|
|
|
runner := &batchRunner{
|
|
checker: make(map[string]authlib.ItemChecker), // Can create
|
|
stream: stream,
|
|
span: span,
|
|
}
|
|
settings, err := NewBulkSettings(md)
|
|
if err != nil {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "error reading settings",
|
|
Reason: err.Error(),
|
|
Code: http.StatusPreconditionFailed,
|
|
},
|
|
})
|
|
}
|
|
|
|
if len(settings.Collection) < 1 {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "Missing target collection(s) in request header",
|
|
Code: http.StatusBadRequest,
|
|
},
|
|
})
|
|
}
|
|
|
|
if settings.RebuildCollection {
|
|
for _, k := range settings.Collection {
|
|
// Can we delete the whole collection
|
|
rsp, err := s.access.Check(ctx, user, authlib.CheckRequest{
|
|
Namespace: k.Namespace,
|
|
Group: k.Group,
|
|
Resource: k.Resource,
|
|
Verb: utils.VerbDeleteCollection,
|
|
})
|
|
if err != nil || !rsp.Allowed {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: fmt.Sprintf("Requester must be able to: %s", utils.VerbDeleteCollection),
|
|
Code: http.StatusForbidden,
|
|
},
|
|
})
|
|
}
|
|
|
|
// This will be called for each request -- with the folder ID
|
|
runner.checker[NSGR(k)], err = s.access.Compile(ctx, user, authlib.ListRequest{
|
|
Namespace: k.Namespace,
|
|
Group: k.Group,
|
|
Resource: k.Resource,
|
|
Verb: utils.VerbCreate,
|
|
})
|
|
if err != nil {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "Unable to check `create` permission",
|
|
Code: http.StatusForbidden,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
} else {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "Bulk currently only supports RebuildCollection",
|
|
Code: http.StatusBadRequest,
|
|
},
|
|
})
|
|
}
|
|
|
|
backend, ok := s.backend.(BulkProcessingBackend)
|
|
if !ok {
|
|
return sendAndClose(&resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Message: "The server backend does not support batch processing",
|
|
Code: http.StatusNotImplemented,
|
|
},
|
|
})
|
|
}
|
|
|
|
// BulkProcess requests
|
|
rsp := backend.ProcessBulk(ctx, settings, runner)
|
|
if rsp == nil {
|
|
rsp = &resourcepb.BulkResponse{
|
|
Error: &resourcepb.ErrorResult{
|
|
Code: http.StatusInternalServerError,
|
|
Message: "Nothing returned from process batch",
|
|
},
|
|
}
|
|
}
|
|
if runner.err != nil {
|
|
rsp.Error = AsErrorResult(runner.err)
|
|
}
|
|
|
|
if rsp.Error == nil && s.search != nil {
|
|
// Rebuild any changed indexes
|
|
for _, summary := range rsp.Summary {
|
|
_, _, err := s.search.build(ctx, NamespacedResource{
|
|
Namespace: summary.Namespace,
|
|
Group: summary.Group,
|
|
Resource: summary.Resource,
|
|
}, summary.Count, summary.ResourceVersion)
|
|
if err != nil {
|
|
s.log.Warn("error building search index after batch load", "err", err)
|
|
rsp.Error = &resourcepb.ErrorResult{
|
|
Code: http.StatusInternalServerError,
|
|
Message: "err building search index: " + summary.Resource,
|
|
Reason: err.Error(),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return sendAndClose(rsp)
|
|
}
|
|
|
|
var (
|
|
_ BulkRequestIterator = (*batchRunner)(nil)
|
|
)
|
|
|
|
type batchRunner struct {
|
|
stream resourcepb.BulkStore_BulkProcessServer
|
|
rollback bool
|
|
request *resourcepb.BulkRequest
|
|
err error
|
|
checker map[string]authlib.ItemChecker
|
|
span trace.Span
|
|
}
|
|
|
|
// Next implements BulkRequestIterator.
|
|
func (b *batchRunner) Next() bool {
|
|
if b.rollback {
|
|
return true
|
|
}
|
|
|
|
b.request, b.err = b.stream.Recv()
|
|
if errors.Is(b.err, io.EOF) {
|
|
b.err = nil
|
|
b.rollback = false
|
|
b.request = nil
|
|
return false
|
|
}
|
|
|
|
if b.err != nil {
|
|
b.rollback = true
|
|
b.span.AddEvent("next", trace.WithAttributes(attribute.String("error", b.err.Error())))
|
|
return true
|
|
}
|
|
|
|
if b.request != nil {
|
|
key := b.request.Key
|
|
k := NSGR(key)
|
|
checker, ok := b.checker[k]
|
|
if !ok {
|
|
b.err = fmt.Errorf("missing access control for: %s", k)
|
|
b.rollback = true
|
|
} else if !checker(key.Name, b.request.Folder) {
|
|
b.err = fmt.Errorf("not allowed to create resource")
|
|
b.rollback = true
|
|
}
|
|
|
|
// Mention resource in the span.
|
|
attrs := []attribute.KeyValue{
|
|
attribute.String("key", nsgrWithName(key)),
|
|
}
|
|
if b.err != nil {
|
|
attrs = append(attrs, attribute.String("error", b.err.Error()))
|
|
}
|
|
|
|
b.span.AddEvent("next", trace.WithAttributes(attrs...))
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Request implements BulkRequestIterator.
|
|
func (b *batchRunner) Request() *resourcepb.BulkRequest {
|
|
if b.rollback {
|
|
return nil
|
|
}
|
|
return b.request
|
|
}
|
|
|
|
// RollbackRequested implements BulkRequestIterator.
|
|
func (b *batchRunner) RollbackRequested() bool {
|
|
if b.rollback {
|
|
b.rollback = false // break iterator
|
|
return true
|
|
}
|
|
return false
|
|
}
|