Unified Storage: Don't read before create (#102906)

* Unified Storage: Don't read before create

* test: use the existing test infra

* fix: support pq

We use pgx, but it seems to be wrapped in a pq driver shim, causing the errors to be remapped to pq's type. Weird
situation.

* feat: support CDK backend

* revert: there is a postgres_tests block

* fix(CDK): only check existence on ADDED updates

* fix(CDK): use ReadResource to deal with deleted files
This commit is contained in:
Mariell Hoversholm
2025-03-31 15:06:31 +02:00
committed by GitHub
parent 827d86a985
commit f0a6327edc
7 changed files with 93 additions and 18 deletions

View File

@ -2456,6 +2456,21 @@ func RunTestGuaranteedUpdateChecksStoredData(ctx context.Context, t *testing.T,
} }
} }
func RunTestValidUpdate(ctx context.Context, t *testing.T, store storage.Interface) {
pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}
key, pod := testPropagateStore(ctx, t, store, pod)
err := store.GuaranteedUpdate(ctx, key, &example.Pod{}, false, nil,
storage.SimpleUpdate(func(o runtime.Object) (runtime.Object, error) {
pod := o.(*example.Pod)
pod.Spec.Hostname = "example"
return pod, nil
}), pod)
if err != nil {
t.Errorf("got error on update: %v", err)
}
}
func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, store storage.Interface) { func RunTestGuaranteedUpdateWithConflict(ctx context.Context, t *testing.T, store storage.Interface) {
key, _ := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}}) key, _ := testPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "test-ns"}})

View File

@ -117,6 +117,13 @@ func TestCreateWithKeyExist(t *testing.T) {
storagetesting.RunTestCreateWithKeyExist(ctx, t, store) storagetesting.RunTestCreateWithKeyExist(ctx, t, store)
} }
func TestValidUpdate(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc()
assert.NoError(t, err)
storagetesting.RunTestValidUpdate(ctx, t, store)
}
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
ctx, store, destroyFunc, err := testSetup(t) ctx, store, destroyFunc, err := testSetup(t)
defer destroyFunc() defer destroyFunc()

View File

@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"github.com/grafana/grafana/pkg/apimachinery/utils" "github.com/grafana/grafana/pkg/apimachinery/utils"
"github.com/grafana/grafana/pkg/storage/unified/backend"
) )
type CDKBackendOptions struct { type CDKBackendOptions struct {
@ -116,6 +117,17 @@ func (s *cdkBackend) GetResourceStats(ctx context.Context, namespace string, min
} }
func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) { func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) {
if event.Type == WatchEvent_ADDED {
// ReadResource deals with deleted values (i.e. a file exists but has generation -999).
resp := s.ReadResource(ctx, &ReadRequest{Key: event.Key})
if resp.Error != nil && resp.Error.Code != http.StatusNotFound {
return 0, GetError(resp.Error)
}
if resp.Value != nil {
return 0, backend.ErrResourceAlreadyExists
}
}
// Scope the lock // Scope the lock
{ {
s.mutex.Lock() s.mutex.Lock()

View File

@ -43,8 +43,8 @@ func AsErrorResult(err error) *ErrorResult {
return nil return nil
} }
apistatus, ok := err.(apierrors.APIStatus) var apistatus apierrors.APIStatus
if ok { if errors.As(err, &apistatus) {
s := apistatus.Status() s := apistatus.Status()
res := &ErrorResult{ res := &ErrorResult{
Message: s.Message, Message: s.Message,

View File

@ -476,22 +476,14 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons
return rsp, nil return rsp, nil
} }
found := s.backend.ReadResource(ctx, &ReadRequest{Key: req.Key})
if found != nil && len(found.Value) > 0 {
rsp.Error = &ErrorResult{
Code: http.StatusConflict,
Reason: string(metav1.StatusReasonAlreadyExists),
Message: "key already exists", // TODO?? soft delete replace?
}
return rsp, nil
}
event, e := s.newEvent(ctx, user, req.Key, req.Value, nil) event, e := s.newEvent(ctx, user, req.Key, req.Value, nil)
if e != nil { if e != nil {
rsp.Error = e rsp.Error = e
return rsp, nil return rsp, nil
} }
// If the resource already exists, the create will return an already exists error that is remapped appropriately by AsErrorResult.
// This also benefits from ACID behaviours on our databases, so we avoid race conditions.
var err error var err error
rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event) rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, *event)
if err != nil { if err != nil {

View File

@ -13,6 +13,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
unifiedbackend "github.com/grafana/grafana/pkg/storage/unified/backend" unifiedbackend "github.com/grafana/grafana/pkg/storage/unified/backend"
"github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgconn"
"github.com/lib/pq"
"github.com/mattn/go-sqlite3" "github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
@ -397,6 +398,12 @@ func isRowAlreadyExistsError(err error) bool {
return pg.Code == "23505" // unique_violation return pg.Code == "23505" // unique_violation
} }
var pqerr *pq.Error
if errors.As(err, &pqerr) {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
return pqerr.Code == "23505" // unique_violation
}
var mysqlerr *mysql.MySQLError var mysqlerr *mysql.MySQLError
if errors.As(err, &mysqlerr) { if errors.As(err, &mysqlerr) {
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html // https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html

View File

@ -3,6 +3,7 @@ package test
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"slices" "slices"
"strings" "strings"
"testing" "testing"
@ -10,8 +11,11 @@ import (
"github.com/go-jose/go-jose/v3/jwt" "github.com/go-jose/go-jose/v3/jwt"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apiserver/pkg/endpoints/request"
"github.com/grafana/authlib/authn" "github.com/grafana/authlib/authn"
"github.com/grafana/authlib/types" "github.com/grafana/authlib/types"
@ -22,12 +26,13 @@ import (
// Test names for the storage backend test suite // Test names for the storage backend test suite
const ( const (
TestHappyPath = "happy path" TestHappyPath = "happy path"
TestWatchWriteEvents = "watch write events from latest" TestWatchWriteEvents = "watch write events from latest"
TestList = "list" TestList = "list"
TestBlobSupport = "blob support" TestBlobSupport = "blob support"
TestGetResourceStats = "get resource stats" TestGetResourceStats = "get resource stats"
TestListHistory = "list history" TestListHistory = "list history"
TestCreateNewResource = "create new resource"
) )
type NewBackendFunc func(ctx context.Context) resource.StorageBackend type NewBackendFunc func(ctx context.Context) resource.StorageBackend
@ -70,6 +75,7 @@ func RunStorageBackendTest(t *testing.T, newBackend NewBackendFunc, opts *TestOp
{TestBlobSupport, runTestIntegrationBlobSupport}, {TestBlobSupport, runTestIntegrationBlobSupport},
{TestGetResourceStats, runTestIntegrationBackendGetResourceStats}, {TestGetResourceStats, runTestIntegrationBackendGetResourceStats},
{TestListHistory, runTestIntegrationBackendListHistory}, {TestListHistory, runTestIntegrationBackendListHistory},
{TestCreateNewResource, runTestIntegrationBackendCreateNewResource},
} }
for _, tc := range cases { for _, tc := range cases {
@ -907,6 +913,42 @@ func runTestIntegrationBlobSupport(t *testing.T, backend resource.StorageBackend
}) })
} }
func runTestIntegrationBackendCreateNewResource(t *testing.T, backend resource.StorageBackend, nsPrefix string) {
ctx := types.WithAuthInfo(t.Context(), authn.NewAccessTokenAuthInfo(authn.Claims[authn.AccessTokenClaims]{
Claims: jwt.Claims{
Subject: "testuser",
},
Rest: authn.AccessTokenClaims{},
}))
server := newServer(t, backend)
ns := nsPrefix + "-create-resource"
ctx = request.WithNamespace(ctx, ns)
request := &resource.CreateRequest{
Key: &resource.ResourceKey{
Namespace: "default",
Group: "test.grafana",
Resource: "Test",
Name: "test",
},
Value: []byte(`{"apiVersion":"test.grafana/v0alpha1","kind":"Test","metadata":{"name":"test","namespace":"default"}}`),
}
response, err := server.Create(ctx, request)
require.NoError(t, err, "create resource")
require.Nil(t, response.Error, "create resource response.Error")
t.Run("gracefully handles resource already exists error", func(t *testing.T) {
response, err := server.Create(ctx, request)
require.NoError(t, err, "create resource")
require.NotNil(t, response.GetError(), "create resource response.Error")
assert.Equal(t, int32(http.StatusConflict), response.GetError().GetCode(), "create resource response.Error.Code")
assert.Equal(t, string(metav1.StatusReasonAlreadyExists), response.GetError().GetReason(), "create resource response.Error.Reason")
t.Logf("Error: %v", response.GetError()) // only prints on failure, so this is fine
})
}
// WriteEventOption is a function that modifies WriteEventOptions // WriteEventOption is a function that modifies WriteEventOptions
type WriteEventOption func(*WriteEventOptions) type WriteEventOption func(*WriteEventOptions)