mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 19:13:07 +08:00
209 lines
6.5 KiB
Go
209 lines
6.5 KiB
Go
package resource
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"gocloud.dev/blob"
|
|
)
|
|
|
|
// CDKBucket is an abstraction that provides the same functionality as gocloud.dev/blob.Bucket
|
|
// It can be used to wrap gocloud.dev/blob.Bucket with some useful things as o11y.
|
|
type CDKBucket interface {
|
|
Attributes(context.Context, string) (*blob.Attributes, error)
|
|
List(*blob.ListOptions) *blob.ListIterator
|
|
ListPage(context.Context, []byte, int, *blob.ListOptions) ([]*blob.ListObject, []byte, error)
|
|
WriteAll(context.Context, string, []byte, *blob.WriterOptions) error
|
|
ReadAll(context.Context, string) ([]byte, error)
|
|
Delete(context.Context, string) error
|
|
SignedURL(context.Context, string, *blob.SignedURLOptions) (string, error)
|
|
}
|
|
|
|
var _ CDKBucket = (*blob.Bucket)(nil)
|
|
var _ CDKBucket = (*InstrumentedBucket)(nil)
|
|
|
|
const (
|
|
cdkBucketOperationLabel = "operation"
|
|
cdkBucketStatusLabel = "status"
|
|
cdkBucketStatusSuccess = "success"
|
|
cdkBucketStatusError = "error"
|
|
)
|
|
|
|
type InstrumentedBucket struct {
|
|
requests *prometheus.CounterVec
|
|
latency *prometheus.HistogramVec
|
|
tracer trace.Tracer
|
|
bucket CDKBucket
|
|
}
|
|
|
|
func NewInstrumentedBucket(bucket CDKBucket, reg prometheus.Registerer, tracer trace.Tracer) *InstrumentedBucket {
|
|
b := &InstrumentedBucket{
|
|
bucket: bucket,
|
|
tracer: tracer,
|
|
}
|
|
b.initMetrics(reg)
|
|
return b
|
|
}
|
|
|
|
func (b *InstrumentedBucket) initMetrics(reg prometheus.Registerer) {
|
|
b.requests = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
|
|
Name: "cdk_blobstorage_requests_total",
|
|
}, []string{
|
|
cdkBucketOperationLabel,
|
|
cdkBucketStatusLabel,
|
|
})
|
|
b.latency = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
|
|
Name: "cdk_blobstorage_latency_seconds",
|
|
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
|
|
}, []string{
|
|
cdkBucketOperationLabel,
|
|
cdkBucketStatusLabel,
|
|
})
|
|
}
|
|
|
|
func (b *InstrumentedBucket) Attributes(ctx context.Context, key string) (*blob.Attributes, error) {
|
|
ctx, span := b.tracer.Start(ctx, "InstrumentedBucket/Attributes")
|
|
defer span.End()
|
|
start := time.Now()
|
|
retVal, err := b.bucket.Attributes(ctx, key)
|
|
end := time.Since(start).Seconds()
|
|
labels := prometheus.Labels{
|
|
cdkBucketOperationLabel: "Attributes",
|
|
}
|
|
if err != nil {
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusError
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return retVal, err
|
|
}
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusSuccess
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
return retVal, err
|
|
}
|
|
|
|
func (b *InstrumentedBucket) List(opts *blob.ListOptions) *blob.ListIterator {
|
|
// List just returns an iterator struct based on the provided options. No need for extended telemetry.
|
|
return b.bucket.List(opts)
|
|
}
|
|
|
|
func (b *InstrumentedBucket) ListPage(ctx context.Context, pageToken []byte, pageSize int, opts *blob.ListOptions) ([]*blob.ListObject, []byte, error) {
|
|
ctx, span := b.tracer.Start(ctx, "InstrumentedBucket/ListPage")
|
|
defer span.End()
|
|
start := time.Now()
|
|
retVal, nextPageToken, err := b.bucket.ListPage(ctx, pageToken, pageSize, opts)
|
|
end := time.Since(start).Seconds()
|
|
labels := prometheus.Labels{
|
|
cdkBucketOperationLabel: "ListPage",
|
|
}
|
|
if err != nil {
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusError
|
|
b.latency.With(labels).Observe(end)
|
|
b.requests.With(labels).Inc()
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return retVal, nextPageToken, err
|
|
}
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusSuccess
|
|
b.requests.With(labels).Inc()
|
|
return retVal, nextPageToken, err
|
|
}
|
|
|
|
func (b *InstrumentedBucket) ReadAll(ctx context.Context, key string) ([]byte, error) {
|
|
ctx, span := b.tracer.Start(ctx, "InstrumentedBucket/ReadAll")
|
|
defer span.End()
|
|
start := time.Now()
|
|
retVal, err := b.bucket.ReadAll(ctx, key)
|
|
end := time.Since(start).Seconds()
|
|
labels := prometheus.Labels{
|
|
cdkBucketOperationLabel: "ReadAll",
|
|
}
|
|
if err != nil {
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusError
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return retVal, err
|
|
}
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusSuccess
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
return retVal, err
|
|
}
|
|
|
|
func (b *InstrumentedBucket) WriteAll(ctx context.Context, key string, p []byte, opts *blob.WriterOptions) error {
|
|
ctx, span := b.tracer.Start(ctx, "InstrumentedBucket/WriteAll")
|
|
defer span.End()
|
|
start := time.Now()
|
|
err := b.bucket.WriteAll(ctx, key, p, opts)
|
|
end := time.Since(start).Seconds()
|
|
labels := prometheus.Labels{
|
|
cdkBucketOperationLabel: "WriteAll",
|
|
}
|
|
if err != nil {
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusError
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusSuccess
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
return err
|
|
}
|
|
|
|
func (b *InstrumentedBucket) Delete(ctx context.Context, key string) error {
|
|
ctx, span := b.tracer.Start(ctx, "InstrumentedBucket/Delete")
|
|
defer span.End()
|
|
start := time.Now()
|
|
err := b.bucket.Delete(ctx, key)
|
|
end := time.Since(start).Seconds()
|
|
labels := prometheus.Labels{
|
|
cdkBucketOperationLabel: "Delete",
|
|
}
|
|
if err != nil {
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusError
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return err
|
|
}
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusSuccess
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
return nil
|
|
}
|
|
|
|
func (b *InstrumentedBucket) SignedURL(ctx context.Context, key string, opts *blob.SignedURLOptions) (string, error) {
|
|
ctx, span := b.tracer.Start(ctx, "InstrumentedBucket/SignedURL")
|
|
defer span.End()
|
|
start := time.Now()
|
|
retVal, err := b.bucket.SignedURL(ctx, key, opts)
|
|
end := time.Since(start).Seconds()
|
|
labels := prometheus.Labels{
|
|
cdkBucketOperationLabel: "SignedURL",
|
|
}
|
|
if err != nil {
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusError
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
return retVal, err
|
|
}
|
|
labels[cdkBucketStatusLabel] = cdkBucketStatusSuccess
|
|
b.requests.With(labels).Inc()
|
|
b.latency.With(labels).Observe(end)
|
|
return retVal, err
|
|
}
|