Storage: store uploaded files in SQL rather than on the disk (#49034)

* #48259: set up storages per org id

* #48259: migrate to storage_sql
This commit is contained in:
Artur Wierzbicki
2022-05-21 16:55:11 -07:00
committed by GitHub
parent 313d203a87
commit 03fe1435a0
7 changed files with 220 additions and 45 deletions

View File

@ -32,7 +32,8 @@ type StorageGitConfig struct {
} }
type StorageSQLConfig struct { type StorageSQLConfig struct {
// no custom settings // SQLStorage will prefix all paths with orgId for isolation between orgs
orgId int64
} }
type StorageS3Config struct { type StorageS3Config struct {

View File

@ -6,14 +6,13 @@ import (
"io/ioutil" "io/ioutil"
"mime/multipart" "mime/multipart"
"net/http" "net/http"
"os"
"path/filepath"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/filestorage" "github.com/grafana/grafana/pkg/infra/filestorage"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/registry" "github.com/grafana/grafana/pkg/registry"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore" "github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
@ -51,7 +50,7 @@ type Response struct {
} }
func ProvideService(sql *sqlstore.SQLStore, features featuremgmt.FeatureToggles, cfg *setting.Cfg) StorageService { func ProvideService(sql *sqlstore.SQLStore, features featuremgmt.FeatureToggles, cfg *setting.Cfg) StorageService {
roots := []storageRuntime{ globalRoots := []storageRuntime{
newDiskStorage(RootPublicStatic, "Public static files", &StorageLocalDiskConfig{ newDiskStorage(RootPublicStatic, "Public static files", &StorageLocalDiskConfig{
Path: cfg.StaticRootPath, Path: cfg.StaticRootPath,
Roots: []string{ Roots: []string{
@ -65,27 +64,27 @@ func ProvideService(sql *sqlstore.SQLStore, features featuremgmt.FeatureToggles,
}).setReadOnly(true).setBuiltin(true), }).setReadOnly(true).setBuiltin(true),
} }
storage := filepath.Join(cfg.DataPath, "storage") initializeOrgStorages := func(orgId int64) []storageRuntime {
_ = os.MkdirAll(storage, 0700) storages := make([]storageRuntime, 0)
if features.IsEnabled(featuremgmt.FlagStorageLocalUpload) {
if features.IsEnabled(featuremgmt.FlagStorageLocalUpload) { config := &StorageSQLConfig{orgId: orgId}
upload := filepath.Join(storage, "upload") storages = append(storages, newSQLStorage("upload", "Local file upload", config, sql).setBuiltin(true))
_ = os.MkdirAll(upload, 0700) }
roots = append(roots, newDiskStorage("upload", "Local file upload", &StorageLocalDiskConfig{ return storages
Path: upload,
Roots: []string{
"/",
},
}).setBuiltin(true))
} }
s := newStandardStorageService(roots)
s := newStandardStorageService(globalRoots, initializeOrgStorages)
s.sql = sql s.sql = sql
return s return s
} }
func newStandardStorageService(roots []storageRuntime) *standardStorageService { func newStandardStorageService(globalRoots []storageRuntime, initializeOrgStorages func(orgId int64) []storageRuntime) *standardStorageService {
rootsByOrgId := make(map[int64][]storageRuntime)
rootsByOrgId[ac.GlobalOrgID] = globalRoots
res := &nestedTree{ res := &nestedTree{
roots: roots, initializeOrgStorages: initializeOrgStorages,
rootsByOrgId: rootsByOrgId,
} }
res.init() res.init()
return &standardStorageService{ return &standardStorageService{
@ -98,14 +97,22 @@ func (s *standardStorageService) Run(ctx context.Context) error {
return nil return nil
} }
func getOrgId(user *models.SignedInUser) int64 {
if user == nil {
return ac.GlobalOrgID
}
return user.OrgId
}
func (s *standardStorageService) List(ctx context.Context, user *models.SignedInUser, path string) (*data.Frame, error) { func (s *standardStorageService) List(ctx context.Context, user *models.SignedInUser, path string) (*data.Frame, error) {
// apply access control here // apply access control here
return s.tree.ListFolder(ctx, path) return s.tree.ListFolder(ctx, getOrgId(user), path)
} }
func (s *standardStorageService) Read(ctx context.Context, user *models.SignedInUser, path string) (*filestorage.File, error) { func (s *standardStorageService) Read(ctx context.Context, user *models.SignedInUser, path string) (*filestorage.File, error) {
// TODO: permission check! // TODO: permission check!
return s.tree.GetFile(ctx, path) return s.tree.GetFile(ctx, getOrgId(user), path)
} }
func isFileTypeValid(filetype string) bool { func isFileTypeValid(filetype string) bool {
@ -119,7 +126,7 @@ func (s *standardStorageService) Upload(ctx context.Context, user *models.Signed
response := Response{ response := Response{
path: "upload", path: "upload",
} }
upload, _ := s.tree.getRoot("upload") upload, _ := s.tree.getRoot(getOrgId(user), "upload")
if upload == nil { if upload == nil {
response.statusCode = 404 response.statusCode = 404
response.message = "upload feature is not enabled" response.message = "upload feature is not enabled"
@ -179,7 +186,7 @@ func (s *standardStorageService) Upload(ctx context.Context, user *models.Signed
} }
func (s *standardStorageService) Delete(ctx context.Context, user *models.SignedInUser, path string) error { func (s *standardStorageService) Delete(ctx context.Context, user *models.SignedInUser, path string) error {
upload, _ := s.tree.getRoot("upload") upload, _ := s.tree.getRoot(getOrgId(user), "upload")
if upload == nil { if upload == nil {
return fmt.Errorf("upload feature is not enabled") return fmt.Errorf("upload feature is not enabled")
} }

View File

@ -10,6 +10,7 @@ import (
"testing" "testing"
"github.com/grafana/grafana-plugin-sdk-go/experimental" "github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/testdatasource" "github.com/grafana/grafana/pkg/tsdb/testdatasource"
@ -17,6 +18,10 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
var (
dummyUser = &models.SignedInUser{OrgId: 1}
)
func TestListFiles(t *testing.T) { func TestListFiles(t *testing.T) {
publicRoot, err := filepath.Abs("../../../public") publicRoot, err := filepath.Abs("../../../public")
require.NoError(t, err) require.NoError(t, err)
@ -34,14 +39,16 @@ func TestListFiles(t *testing.T) {
}).setReadOnly(true).setBuiltin(true), }).setReadOnly(true).setBuiltin(true),
} }
store := newStandardStorageService(roots) store := newStandardStorageService(roots, func(orgId int64) []storageRuntime {
frame, err := store.List(context.Background(), nil, "public/testdata") return make([]storageRuntime, 0)
})
frame, err := store.List(context.Background(), dummyUser, "public/testdata")
require.NoError(t, err) require.NoError(t, err)
err = experimental.CheckGoldenFrame(path.Join("testdata", "public_testdata.golden.txt"), frame, true) err = experimental.CheckGoldenFrame(path.Join("testdata", "public_testdata.golden.txt"), frame, true)
require.NoError(t, err) require.NoError(t, err)
file, err := store.Read(context.Background(), nil, "public/testdata/js_libraries.csv") file, err := store.Read(context.Background(), dummyUser, "public/testdata/js_libraries.csv")
require.NoError(t, err) require.NoError(t, err)
require.NotNil(t, file) require.NotNil(t, file)
@ -61,7 +68,7 @@ func TestUpload(t *testing.T) {
Value: map[string][]string{}, Value: map[string][]string{},
File: map[string][]*multipart.FileHeader{}, File: map[string][]*multipart.FileHeader{},
} }
res, err := s.Upload(context.Background(), nil, testForm) res, err := s.Upload(context.Background(), dummyUser, testForm)
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, res.path, "upload") assert.Equal(t, res.path, "upload")
} }

View File

@ -0,0 +1,82 @@
package store
import (
"context"
"fmt"
"strings"
"github.com/grafana/grafana/pkg/infra/filestorage"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana-plugin-sdk-go/data"
)
const rootStorageTypeSQL = "sql"
type rootStorageSQL struct {
baseStorageRuntime
settings *StorageSQLConfig
}
// getDbRootFolder creates a DB path prefix for a given storage name and orgId.
// example:
// orgId: 5
// storageName: "upload"
// => prefix: "/5/upload/"
func getDbStoragePathPrefix(orgId int64, storageName string) string {
return filestorage.Join(fmt.Sprintf("%d", orgId), storageName+filestorage.Delimiter)
}
func newSQLStorage(prefix string, name string, cfg *StorageSQLConfig, sql *sqlstore.SQLStore) *rootStorageSQL {
if cfg == nil {
cfg = &StorageSQLConfig{}
}
meta := RootStorageMeta{
Config: RootStorageConfig{
Type: rootStorageTypeSQL,
Prefix: prefix,
Name: name,
SQL: cfg,
},
}
if prefix == "" {
meta.Notice = append(meta.Notice, data.Notice{
Severity: data.NoticeSeverityError,
Text: "Missing prefix",
})
}
s := &rootStorageSQL{}
s.store = filestorage.NewDbStorage(
grafanaStorageLogger,
sql, nil, getDbStoragePathPrefix(cfg.orgId, prefix))
meta.Ready = true
s.meta = meta
s.settings = cfg
return s
}
func (s *rootStorageSQL) Write(ctx context.Context, cmd *WriteValueRequest) (*WriteValueResponse, error) {
byteAray := []byte(cmd.Body)
path := cmd.Path
if !strings.HasPrefix(path, filestorage.Delimiter) {
path = filestorage.Delimiter + path
}
err := s.store.Upsert(ctx, &filestorage.UpsertFileCommand{
Path: path,
Contents: byteAray,
})
if err != nil {
return nil, err
}
return &WriteValueResponse{Code: 200}, nil
}
func (s *rootStorageSQL) Sync() error {
return nil // already in sync
}

View File

@ -0,0 +1,27 @@
package store
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetDbStoragePathPrefix(t *testing.T) {
tests := []struct {
orgId int64
storageName string
expected string
}{
{
orgId: 124,
storageName: "long-storage-name",
expected: "/124/long-storage-name/",
},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("orgId: %d, storageName: %s", tt.orgId, tt.storageName), func(t *testing.T) {
assert.Equal(t, tt.expected, getDbStoragePathPrefix(tt.orgId, tt.storageName))
})
}
}

View File

@ -2,66 +2,117 @@ package store
import ( import (
"context" "context"
"sync"
"github.com/grafana/grafana-plugin-sdk-go/data" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/filestorage" "github.com/grafana/grafana/pkg/infra/filestorage"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
) )
type nestedTree struct { type nestedTree struct {
roots []storageRuntime rootsByOrgId map[int64][]storageRuntime
lookup map[string]filestorage.FileStorage lookup map[int64]map[string]filestorage.FileStorage
orgInitMutex sync.Mutex
initializeOrgStorages func(orgId int64) []storageRuntime
} }
var ( var (
_ storageTree = (*nestedTree)(nil) _ storageTree = (*nestedTree)(nil)
) )
func asNameToFileStorageMap(storages []storageRuntime) map[string]filestorage.FileStorage {
lookup := make(map[string]filestorage.FileStorage)
for _, storage := range storages {
lookup[storage.Meta().Config.Prefix] = storage.Store()
}
return lookup
}
func (t *nestedTree) init() { func (t *nestedTree) init() {
t.lookup = make(map[string]filestorage.FileStorage, len(t.roots)) t.orgInitMutex.Lock()
for _, root := range t.roots { defer t.orgInitMutex.Unlock()
t.lookup[root.Meta().Config.Prefix] = root.Store()
t.lookup = make(map[int64]map[string]filestorage.FileStorage, len(t.rootsByOrgId))
for orgId, storages := range t.rootsByOrgId {
t.lookup[orgId] = asNameToFileStorageMap(storages)
} }
} }
func (t *nestedTree) getRoot(path string) (filestorage.FileStorage, string) { func (t *nestedTree) assureOrgIsInitialized(orgId int64) {
t.orgInitMutex.Lock()
defer t.orgInitMutex.Unlock()
if _, ok := t.rootsByOrgId[orgId]; !ok {
orgStorages := t.initializeOrgStorages(orgId)
t.rootsByOrgId[orgId] = orgStorages
t.lookup[orgId] = asNameToFileStorageMap(orgStorages)
}
}
func (t *nestedTree) getRoot(orgId int64, path string) (filestorage.FileStorage, string) {
t.assureOrgIsInitialized(orgId)
if path == "" { if path == "" {
return nil, "" return nil, ""
} }
rootKey, path := splitFirstSegment(path) rootKey, path := splitFirstSegment(path)
root, ok := t.lookup[rootKey] root, ok := t.lookup[orgId][rootKey]
if !ok || root == nil { if ok && root != nil {
return nil, path // not found or not ready return root, filestorage.Delimiter + path
} }
return root, filestorage.Delimiter + path
if orgId != ac.GlobalOrgID {
globalRoot, ok := t.lookup[ac.GlobalOrgID][rootKey]
if ok && globalRoot != nil {
return globalRoot, filestorage.Delimiter + path
}
}
return nil, path // not found or not ready
} }
func (t *nestedTree) GetFile(ctx context.Context, path string) (*filestorage.File, error) { func (t *nestedTree) GetFile(ctx context.Context, orgId int64, path string) (*filestorage.File, error) {
if path == "" { if path == "" {
return nil, nil // not found return nil, nil // not found
} }
root, path := t.getRoot(path) root, path := t.getRoot(orgId, path)
if root == nil { if root == nil {
return nil, nil // not found (or not ready) return nil, nil // not found (or not ready)
} }
return root.Get(ctx, path) return root.Get(ctx, path)
} }
func (t *nestedTree) ListFolder(ctx context.Context, path string) (*data.Frame, error) { func (t *nestedTree) ListFolder(ctx context.Context, orgId int64, path string) (*data.Frame, error) {
if path == "" || path == "/" { if path == "" || path == "/" {
count := len(t.roots) t.assureOrgIsInitialized(orgId)
count := len(t.rootsByOrgId[ac.GlobalOrgID])
if orgId != ac.GlobalOrgID {
count += len(t.rootsByOrgId[orgId])
}
title := data.NewFieldFromFieldType(data.FieldTypeString, count) title := data.NewFieldFromFieldType(data.FieldTypeString, count)
names := data.NewFieldFromFieldType(data.FieldTypeString, count) names := data.NewFieldFromFieldType(data.FieldTypeString, count)
mtype := data.NewFieldFromFieldType(data.FieldTypeString, count) mtype := data.NewFieldFromFieldType(data.FieldTypeString, count)
title.Name = "title" title.Name = "title"
names.Name = "name" names.Name = "name"
mtype.Name = "mediaType" mtype.Name = "mediaType"
for i, f := range t.roots { for i, f := range t.rootsByOrgId[ac.GlobalOrgID] {
names.Set(i, f.Meta().Config.Prefix) names.Set(i, f.Meta().Config.Prefix)
title.Set(i, f.Meta().Config.Name) title.Set(i, f.Meta().Config.Name)
mtype.Set(i, "directory") mtype.Set(i, "directory")
} }
if orgId != ac.GlobalOrgID {
for i, f := range t.rootsByOrgId[orgId] {
names.Set(i, f.Meta().Config.Prefix)
title.Set(i, f.Meta().Config.Name)
mtype.Set(i, "directory")
}
}
frame := data.NewFrame("", names, title, mtype) frame := data.NewFrame("", names, title, mtype)
frame.SetMeta(&data.FrameMeta{ frame.SetMeta(&data.FrameMeta{
Type: data.FrameTypeDirectoryListing, Type: data.FrameTypeDirectoryListing,
@ -69,7 +120,7 @@ func (t *nestedTree) ListFolder(ctx context.Context, path string) (*data.Frame,
return frame, nil return frame, nil
} }
root, path := t.getRoot(path) root, path := t.getRoot(orgId, path)
if root == nil { if root == nil {
return nil, nil // not found (or not ready) return nil, nil // not found (or not ready)
} }

View File

@ -29,8 +29,8 @@ type WriteValueResponse struct {
} }
type storageTree interface { type storageTree interface {
GetFile(ctx context.Context, path string) (*filestorage.File, error) GetFile(ctx context.Context, orgId int64, path string) (*filestorage.File, error)
ListFolder(ctx context.Context, path string) (*data.Frame, error) ListFolder(ctx context.Context, orgId int64, path string) (*data.Frame, error)
} }
//------------------------------------------- //-------------------------------------------