mirror of
https://github.com/grafana/grafana.git
synced 2025-08-03 02:42:22 +08:00
Feature/unified storage search dual reader (#108291)
* Add UnifiedStorageSearchDualReaderEnabled feature flag Signed-off-by: Maicon Costa <maiconscosta@gmail.com> * Refactor UniSearch Dual Reader Signed-off-by: Maicon Costa <maiconscosta@gmail.com> * Run make gen-feature-toggles Signed-off-by: Maicon Costa <maiconscosta@gmail.com> * fix: unit tests search_client Signed-off-by: Bruno Abrantes <bruno@brunoabrantes.com> * feat: cancels shadow search requests after 500ms Signed-off-by: Bruno Abrantes <bruno@brunoabrantes.com> --------- Signed-off-by: Maicon Costa <maiconscosta@gmail.com> Signed-off-by: Bruno Abrantes <bruno@brunoabrantes.com> Co-authored-by: Will Assis <william@williamassis.com> Co-authored-by: Bruno Abrantes <bruno@brunoabrantes.com>
This commit is contained in:
@ -1055,4 +1055,8 @@ export interface FeatureToggles {
|
||||
* @default false
|
||||
*/
|
||||
pluginAssetProvider?: boolean;
|
||||
/**
|
||||
* Enable dual reader for unified storage search
|
||||
*/
|
||||
unifiedStorageSearchDualReaderEnabled?: boolean;
|
||||
}
|
||||
|
@ -122,7 +122,7 @@ func RegisterAPIService(
|
||||
dbp := legacysql.NewDatabaseProvider(sql)
|
||||
namespacer := request.GetNamespaceMapper(cfg)
|
||||
legacyDashboardSearcher := legacysearcher.NewDashboardSearchClient(dashStore, sorter)
|
||||
folderClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), folders.FolderResourceInfo.GroupVersionResource(), restConfigProvider.GetRestConfig, dashStore, userService, unified, sorter)
|
||||
folderClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), folders.FolderResourceInfo.GroupVersionResource(), restConfigProvider.GetRestConfig, dashStore, userService, unified, sorter, features)
|
||||
builder := &DashboardsAPIBuilder{
|
||||
log: log.New("grafana-apiserver.dashboards"),
|
||||
|
||||
|
@ -43,7 +43,7 @@ type SearchHandler struct {
|
||||
}
|
||||
|
||||
func NewSearchHandler(tracer trace.Tracer, dual dualwrite.Service, legacyDashboardSearcher resourcepb.ResourceIndexClient, resourceClient resource.ResourceClient, features featuremgmt.FeatureToggles) *SearchHandler {
|
||||
searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), dashboardv0alpha1.DashboardResourceInfo.GroupResource(), resourceClient, legacyDashboardSearcher)
|
||||
searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), dashboardv0alpha1.DashboardResourceInfo.GroupResource(), resourceClient, legacyDashboardSearcher, features)
|
||||
return &SearchHandler{
|
||||
client: searchClient,
|
||||
log: log.New("grafana-apiserver.dashboards.search"),
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/registry/apis/dashboard/legacysearcher"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/search/sort"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
||||
@ -46,9 +47,9 @@ type k8sHandler struct {
|
||||
}
|
||||
|
||||
func NewK8sHandler(dual dualwrite.Service, namespacer request.NamespaceMapper, gvr schema.GroupVersionResource,
|
||||
restConfig func(context.Context) (*rest.Config, error), dashStore dashboards.Store, userSvc user.Service, resourceClient resource.ResourceClient, sorter sort.Service) K8sHandler {
|
||||
restConfig func(context.Context) (*rest.Config, error), dashStore dashboards.Store, userSvc user.Service, resourceClient resource.ResourceClient, sorter sort.Service, features featuremgmt.FeatureToggles) K8sHandler {
|
||||
legacySearcher := legacysearcher.NewDashboardSearchClient(dashStore, sorter)
|
||||
searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), gvr.GroupResource(), resourceClient, legacySearcher)
|
||||
searchClient := resource.NewSearchClient(dualwrite.NewSearchAdapter(dual), gvr.GroupResource(), resourceClient, legacySearcher, features)
|
||||
|
||||
return &k8sHandler{
|
||||
namespacer: namespacer,
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
@ -18,11 +17,13 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/client"
|
||||
"github.com/grafana/grafana/pkg/services/apiserver/endpoints/request"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/search/sort"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resource"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type K8sClientFactory func(ctx context.Context, version string) client.K8sHandler
|
||||
@ -44,8 +45,9 @@ func NewK8sClientWithFallback(
|
||||
sorter sort.Service,
|
||||
dual dualwrite.Service,
|
||||
reg prometheus.Registerer,
|
||||
features featuremgmt.FeatureToggles,
|
||||
) *K8sClientWithFallback {
|
||||
newClientFunc := newK8sClientFactory(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual)
|
||||
newClientFunc := newK8sClientFactory(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, features)
|
||||
return &K8sClientWithFallback{
|
||||
K8sHandler: newClientFunc(context.Background(), dashboardv0.VERSION),
|
||||
newClientFunc: newClientFunc,
|
||||
@ -112,6 +114,7 @@ func newK8sClientFactory(
|
||||
resourceClient resource.ResourceClient,
|
||||
sorter sort.Service,
|
||||
dual dualwrite.Service,
|
||||
features featuremgmt.FeatureToggles,
|
||||
) K8sClientFactory {
|
||||
clientCache := make(map[string]client.K8sHandler)
|
||||
cacheMutex := &sync.RWMutex{}
|
||||
@ -149,7 +152,7 @@ func newK8sClientFactory(
|
||||
}
|
||||
|
||||
span.AddEvent("Creating new client")
|
||||
newClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), gvr, restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter)
|
||||
newClient := client.NewK8sHandler(dual, request.GetNamespaceMapper(cfg), gvr, restConfigProvider.GetRestConfig, dashboardStore, userService, resourceClient, sorter, features)
|
||||
clientCache[version] = newClient
|
||||
|
||||
return newClient
|
||||
|
@ -387,7 +387,7 @@ func ProvideDashboardServiceImpl(
|
||||
serverLockService *serverlock.ServerLockService,
|
||||
kvstore kvstore.KVStore,
|
||||
) (*DashboardServiceImpl, error) {
|
||||
k8sclient := dashboardclient.NewK8sClientWithFallback(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, r)
|
||||
k8sclient := dashboardclient.NewK8sClientWithFallback(cfg, restConfigProvider, dashboardStore, userService, resourceClient, sorter, dual, r, features)
|
||||
dashSvc := &DashboardServiceImpl{
|
||||
cfg: cfg,
|
||||
log: log.New("dashboard-service"),
|
||||
|
@ -61,6 +61,7 @@ func ProvideService(cfg *setting.Cfg, db db.DB, dashboardService dashboards.Dash
|
||||
userService,
|
||||
unified,
|
||||
sorter,
|
||||
features,
|
||||
),
|
||||
dashSvc: dashboardService,
|
||||
log: log.New("dashboard-version"),
|
||||
|
@ -1819,6 +1819,14 @@ var (
|
||||
Expression: "false",
|
||||
RequiresRestart: true,
|
||||
},
|
||||
{
|
||||
Name: "unifiedStorageSearchDualReaderEnabled",
|
||||
Description: "Enable dual reader for unified storage search",
|
||||
Stage: FeatureStageExperimental,
|
||||
Owner: grafanaSearchAndStorageSquad,
|
||||
HideFromAdminPage: true,
|
||||
HideFromDocs: true,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -235,3 +235,4 @@ enablePluginImporter,experimental,@grafana/plugins-platform-backend,false,false,
|
||||
otelLogsFormatting,experimental,@grafana/observability-logs,false,false,true
|
||||
alertingNotificationHistory,experimental,@grafana/alerting-squad,false,false,false
|
||||
pluginAssetProvider,experimental,@grafana/plugins-platform-backend,false,true,false
|
||||
unifiedStorageSearchDualReaderEnabled,experimental,@grafana/search-and-storage,false,false,false
|
||||
|
|
@ -950,4 +950,8 @@ const (
|
||||
// FlagPluginAssetProvider
|
||||
// Allows decoupled core plugins to load from the Grafana CDN
|
||||
FlagPluginAssetProvider = "pluginAssetProvider"
|
||||
|
||||
// FlagUnifiedStorageSearchDualReaderEnabled
|
||||
// Enable dual reader for unified storage search
|
||||
FlagUnifiedStorageSearchDualReaderEnabled = "unifiedStorageSearchDualReaderEnabled"
|
||||
)
|
||||
|
@ -2450,6 +2450,22 @@
|
||||
"requiresRestart": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "pluginAssetProvider",
|
||||
"resourceVersion": "1752486584712",
|
||||
"creationTimestamp": "2025-07-14T09:49:44Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Allows decoupled core plugins to load from the Grafana CDN",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/plugins-platform-backend",
|
||||
"requiresRestart": true,
|
||||
"hideFromAdminPage": true,
|
||||
"hideFromDocs": true,
|
||||
"expression": "false"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "pluginLoadingRefactor",
|
||||
@ -2467,22 +2483,6 @@
|
||||
"expression": "false"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "pluginAssetProvider",
|
||||
"resourceVersion": "1752486584712",
|
||||
"creationTimestamp": "2025-07-14T09:49:44Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Allows decoupled core plugins to load from the Grafana CDN",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/plugins-platform-backend",
|
||||
"requiresRestart": true,
|
||||
"hideFromAdminPage": true,
|
||||
"hideFromDocs": true,
|
||||
"expression": "false"
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "pluginProxyPreserveTrailingSlash",
|
||||
@ -3350,6 +3350,20 @@
|
||||
"hideFromDocs": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "unifiedStorageSearchDualReaderEnabled",
|
||||
"resourceVersion": "1752500336818",
|
||||
"creationTimestamp": "2025-07-14T13:38:56Z"
|
||||
},
|
||||
"spec": {
|
||||
"description": "Enable dual reader for unified storage search",
|
||||
"stage": "experimental",
|
||||
"codeowner": "@grafana/search-and-storage",
|
||||
"hideFromAdminPage": true,
|
||||
"hideFromDocs": true
|
||||
}
|
||||
},
|
||||
{
|
||||
"metadata": {
|
||||
"name": "unifiedStorageSearchPermissionFiltering",
|
||||
@ -3436,4 +3450,4 @@
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
@ -122,6 +122,7 @@ func ProvideService(
|
||||
userService,
|
||||
resourceClient,
|
||||
sorter,
|
||||
features,
|
||||
)
|
||||
|
||||
unifiedStore := ProvideUnifiedStore(k8sHandler, userService, tracer)
|
||||
@ -140,6 +141,7 @@ func ProvideService(
|
||||
userService,
|
||||
resourceClient,
|
||||
sorter,
|
||||
features,
|
||||
)
|
||||
srv.dashboardK8sClient = dashHandler
|
||||
}
|
||||
|
@ -199,7 +199,7 @@ func TestIntegrationFolderServiceViaUnifiedStorage(t *testing.T) {
|
||||
|
||||
tracer := noop.NewTracerProvider().Tracer("TestIntegrationFolderServiceViaUnifiedStorage")
|
||||
dashboardStore := dashboards.NewFakeDashboardStore(t)
|
||||
k8sCli := client.NewK8sHandler(dualwrite.ProvideTestService(), request.GetNamespaceMapper(cfg), folderv1.FolderResourceInfo.GroupVersionResource(), restCfgProvider.GetRestConfig, dashboardStore, userService, nil, sort.ProvideService())
|
||||
k8sCli := client.NewK8sHandler(dualwrite.ProvideTestService(), request.GetNamespaceMapper(cfg), folderv1.FolderResourceInfo.GroupVersionResource(), restCfgProvider.GetRestConfig, dashboardStore, userService, nil, sort.ProvideService(), nil)
|
||||
unifiedStore := ProvideUnifiedStore(k8sCli, userService, tracer)
|
||||
|
||||
ctx := context.Background()
|
||||
|
@ -2,26 +2,36 @@ package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
)
|
||||
|
||||
const (
|
||||
// backgroundRequestTimeout is the timeout for background shadow traffic requests
|
||||
backgroundRequestTimeout = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
type DualWriter interface {
|
||||
IsEnabled(schema.GroupResource) bool
|
||||
ReadFromUnified(context.Context, schema.GroupResource) (bool, error)
|
||||
}
|
||||
|
||||
func NewSearchClient(dual DualWriter, gr schema.GroupResource, unifiedClient resourcepb.ResourceIndexClient,
|
||||
legacyClient resourcepb.ResourceIndexClient) resourcepb.ResourceIndexClient {
|
||||
legacyClient resourcepb.ResourceIndexClient, features featuremgmt.FeatureToggles) resourcepb.ResourceIndexClient {
|
||||
if dual.IsEnabled(gr) {
|
||||
return &searchWrapper{
|
||||
dual: dual,
|
||||
groupResource: gr,
|
||||
unifiedClient: unifiedClient,
|
||||
legacyClient: legacyClient,
|
||||
features: features,
|
||||
logger: log.New("unified-storage.search-client"),
|
||||
}
|
||||
}
|
||||
//nolint:errcheck
|
||||
@ -37,6 +47,8 @@ type searchWrapper struct {
|
||||
|
||||
unifiedClient resourcepb.ResourceIndexClient
|
||||
legacyClient resourcepb.ResourceIndexClient
|
||||
features featuremgmt.FeatureToggles
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func (s *searchWrapper) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest,
|
||||
@ -49,6 +61,26 @@ func (s *searchWrapper) GetStats(ctx context.Context, in *resourcepb.ResourceSta
|
||||
if unified {
|
||||
client = s.unifiedClient
|
||||
}
|
||||
|
||||
// If dual reader feature flag is enabled, and legacy is the main storage,
|
||||
// make a background call to unified
|
||||
if s.features != nil && s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) && !unified {
|
||||
// Create background context with timeout but ignore parent cancelation
|
||||
ctxBg := context.WithoutCancel(ctx)
|
||||
ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout)
|
||||
|
||||
// Make background call without blocking the main request
|
||||
go func() {
|
||||
defer cancel() // Ensure we clean up the context
|
||||
_, bgErr := s.unifiedClient.GetStats(ctxBgWithTimeout, in, opts...)
|
||||
if bgErr != nil {
|
||||
s.logger.Error("Background GetStats call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout)
|
||||
} else {
|
||||
s.logger.Debug("Background GetStats call to unified succeeded", "timeout", backgroundRequestTimeout)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return client.GetStats(ctx, in, opts...)
|
||||
}
|
||||
|
||||
@ -62,5 +94,25 @@ func (s *searchWrapper) Search(ctx context.Context, in *resourcepb.ResourceSearc
|
||||
if unified {
|
||||
client = s.unifiedClient
|
||||
}
|
||||
|
||||
// If dual reader feature flag is enabled, and legacy is the main storage,
|
||||
// make a background call to unified
|
||||
if s.features != nil && s.features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled) && !unified {
|
||||
// Create background context with timeout but ignore parent cancelation
|
||||
ctxBg := context.WithoutCancel(ctx)
|
||||
ctxBgWithTimeout, cancel := context.WithTimeout(ctxBg, backgroundRequestTimeout)
|
||||
|
||||
// Make background call without blocking the main request
|
||||
go func() {
|
||||
defer cancel() // Ensure we clean up the context
|
||||
_, bgErr := s.unifiedClient.Search(ctxBgWithTimeout, in, opts...)
|
||||
if bgErr != nil {
|
||||
s.logger.Error("Background Search call to unified failed", "error", bgErr, "timeout", backgroundRequestTimeout)
|
||||
} else {
|
||||
s.logger.Debug("Background Search call to unified succeeded", "timeout", backgroundRequestTimeout)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return client.Search(ctx, in, opts...)
|
||||
}
|
||||
|
445
pkg/storage/unified/resource/search_client_test.go
Normal file
445
pkg/storage/unified/resource/search_client_test.go
Normal file
@ -0,0 +1,445 @@
|
||||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
||||
"github.com/grafana/grafana/pkg/util/testutil"
|
||||
)
|
||||
|
||||
// Mock DualWriter
|
||||
type MockDualWriter struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func (m *MockDualWriter) IsEnabled(gr schema.GroupResource) bool {
|
||||
args := m.Called(gr)
|
||||
return args.Bool(0)
|
||||
}
|
||||
|
||||
func (m *MockDualWriter) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) {
|
||||
args := m.Called(ctx, gr)
|
||||
return args.Bool(0), args.Error(1)
|
||||
}
|
||||
|
||||
// Mock ResourceIndexClient with enhanced timeout testing capabilities
|
||||
type MockResourceIndexClient struct {
|
||||
mock.Mock
|
||||
searchCalled chan struct{}
|
||||
statsCalled chan struct{}
|
||||
searchDelay time.Duration
|
||||
statsDelay time.Duration
|
||||
contextCanceled chan context.Context
|
||||
}
|
||||
|
||||
func NewMockResourceIndexClient() *MockResourceIndexClient {
|
||||
return &MockResourceIndexClient{
|
||||
searchCalled: make(chan struct{}, 1),
|
||||
statsCalled: make(chan struct{}, 1),
|
||||
contextCanceled: make(chan context.Context, 10), // Buffer for multiple calls
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MockResourceIndexClient) SetSearchDelay(delay time.Duration) {
|
||||
m.searchDelay = delay
|
||||
}
|
||||
|
||||
func (m *MockResourceIndexClient) SetStatsDelay(delay time.Duration) {
|
||||
m.statsDelay = delay
|
||||
}
|
||||
|
||||
func (m *MockResourceIndexClient) Search(ctx context.Context, in *resourcepb.ResourceSearchRequest, opts ...grpc.CallOption) (*resourcepb.ResourceSearchResponse, error) {
|
||||
args := m.Called(ctx, in, opts)
|
||||
|
||||
// Simulate delay if configured
|
||||
if m.searchDelay > 0 {
|
||||
select {
|
||||
case <-time.After(m.searchDelay):
|
||||
// Delay completed normally
|
||||
case <-ctx.Done():
|
||||
// Context was canceled during delay
|
||||
m.contextCanceled <- ctx
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Signal that Search was called
|
||||
select {
|
||||
case m.searchCalled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
return args.Get(0).(*resourcepb.ResourceSearchResponse), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockResourceIndexClient) GetStats(ctx context.Context, in *resourcepb.ResourceStatsRequest, opts ...grpc.CallOption) (*resourcepb.ResourceStatsResponse, error) {
|
||||
args := m.Called(ctx, in, opts)
|
||||
|
||||
// Simulate delay if configured
|
||||
if m.statsDelay > 0 {
|
||||
select {
|
||||
case <-time.After(m.statsDelay):
|
||||
// Delay completed normally
|
||||
case <-ctx.Done():
|
||||
// Context was canceled during delay
|
||||
m.contextCanceled <- ctx
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Signal that GetStats was called
|
||||
select {
|
||||
case m.statsCalled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
return args.Get(0).(*resourcepb.ResourceStatsResponse), args.Error(1)
|
||||
}
|
||||
|
||||
func setupTestSearchClient(t *testing.T) (schema.GroupResource, *MockResourceIndexClient, *MockResourceIndexClient, featuremgmt.FeatureToggles) {
|
||||
t.Helper()
|
||||
gr := schema.GroupResource{Group: "test", Resource: "items"}
|
||||
unifiedClient := NewMockResourceIndexClient()
|
||||
legacyClient := NewMockResourceIndexClient()
|
||||
features := featuremgmt.WithFeatures()
|
||||
return gr, unifiedClient, legacyClient, features
|
||||
}
|
||||
|
||||
func setupTestSearchWrapper(t *testing.T, dual *MockDualWriter, unifiedClient, legacyClient *MockResourceIndexClient, features featuremgmt.FeatureToggles, gr schema.GroupResource) *searchWrapper {
|
||||
t.Helper()
|
||||
return &searchWrapper{
|
||||
dual: dual,
|
||||
groupResource: gr,
|
||||
unifiedClient: unifiedClient,
|
||||
legacyClient: legacyClient,
|
||||
features: features,
|
||||
logger: log.NewNopLogger(),
|
||||
}
|
||||
}
|
||||
|
||||
func TestSearchClient_NewSearchClient(t *testing.T) {
|
||||
gr, unifiedClient, legacyClient, features := setupTestSearchClient(t)
|
||||
|
||||
t.Run("returns wrapper when dual writer is enabled", func(t *testing.T) {
|
||||
dual := &MockDualWriter{} // Create fresh mock for this test
|
||||
dual.On("IsEnabled", gr).Return(true)
|
||||
|
||||
client := NewSearchClient(dual, gr, unifiedClient, legacyClient, features)
|
||||
|
||||
wrapper, ok := client.(*searchWrapper)
|
||||
require.True(t, ok)
|
||||
assert.Equal(t, dual, wrapper.dual)
|
||||
assert.Equal(t, gr, wrapper.groupResource)
|
||||
assert.Equal(t, unifiedClient, wrapper.unifiedClient)
|
||||
assert.Equal(t, legacyClient, wrapper.legacyClient)
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("returns unified client when dual writer disabled but read from unified", func(t *testing.T) {
|
||||
dual := &MockDualWriter{} // Create fresh mock for this test
|
||||
dual.On("IsEnabled", gr).Return(false)
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(true, nil)
|
||||
|
||||
client := NewSearchClient(dual, gr, unifiedClient, legacyClient, features)
|
||||
|
||||
assert.Equal(t, unifiedClient, client)
|
||||
dual.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("returns legacy client when dual writer disabled and not reading from unified", func(t *testing.T) {
|
||||
dual := &MockDualWriter{} // Create fresh mock for this test
|
||||
dual.On("IsEnabled", gr).Return(false)
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
|
||||
client := NewSearchClient(dual, gr, unifiedClient, legacyClient, features)
|
||||
|
||||
assert.Equal(t, legacyClient, client)
|
||||
dual.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSearchWrapper_Search(t *testing.T) {
|
||||
gr, unifiedClient, legacyClient, features := setupTestSearchClient(t)
|
||||
req := &resourcepb.ResourceSearchRequest{Query: "test"}
|
||||
expectedResponse := &resourcepb.ResourceSearchResponse{TotalHits: 0}
|
||||
|
||||
t.Run("uses unified client when reading from unified", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(true, nil)
|
||||
unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, features, gr)
|
||||
|
||||
resp, err := wrapper.Search(ctx, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
legacyClient.AssertNotCalled(t, "Search")
|
||||
})
|
||||
|
||||
t.Run("uses legacy client when not reading from unified", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, features, gr)
|
||||
|
||||
resp, err := wrapper.Search(ctx, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertNotCalled(t, "Search")
|
||||
})
|
||||
|
||||
t.Run("makes background call to unified when feature flag enabled and using legacy", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled)
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
// Expect background call to unified client
|
||||
unifiedBgResponse := &resourcepb.ResourceSearchResponse{TotalHits: 0}
|
||||
unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return(unifiedBgResponse, nil)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr)
|
||||
|
||||
resp, err := wrapper.Search(ctx, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
|
||||
// Wait for background goroutine to complete
|
||||
select {
|
||||
case <-unifiedClient.searchCalled:
|
||||
// Background call was made
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("Background unified client call was not made within timeout")
|
||||
}
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("handles background call error gracefully", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled)
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
// Background call returns error - should be handled gracefully
|
||||
unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return((*resourcepb.ResourceSearchResponse)(nil), assert.AnError)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr)
|
||||
|
||||
resp, err := wrapper.Search(ctx, req)
|
||||
|
||||
// Main request should still succeed despite background error
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
|
||||
// Wait for background goroutine to complete
|
||||
select {
|
||||
case <-unifiedClient.searchCalled:
|
||||
// Background call was made (even though it failed)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("Background unified client call was not made within timeout")
|
||||
}
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("background request times out after 500ms", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled)
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
// Configure unified client to take longer than the 500ms timeout
|
||||
unifiedClient.SetSearchDelay(600 * time.Millisecond) // Longer than 500ms timeout
|
||||
unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return((*resourcepb.ResourceSearchResponse)(nil), context.DeadlineExceeded)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := wrapper.Search(ctx, req)
|
||||
mainRequestDuration := time.Since(start)
|
||||
|
||||
// Main request should succeed quickly despite background timeout
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
assert.Less(t, mainRequestDuration, 50*time.Millisecond, "Main request should not be blocked by background timeout")
|
||||
|
||||
// Wait for background context to be canceled
|
||||
select {
|
||||
case canceledCtx := <-unifiedClient.contextCanceled:
|
||||
assert.Error(t, canceledCtx.Err(), "Background context should be canceled")
|
||||
assert.Equal(t, context.DeadlineExceeded, canceledCtx.Err())
|
||||
case <-time.After(700 * time.Millisecond):
|
||||
t.Fatal("Background request should have been canceled due to timeout")
|
||||
}
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("background request completes successfully when within timeout", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled)
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("Search", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
// Configure unified client to respond within the 500ms timeout
|
||||
unifiedClient.SetSearchDelay(100 * time.Millisecond) // Well within 500ms timeout
|
||||
unifiedClient.On("Search", mock.Anything, req, mock.Anything).Return(&resourcepb.ResourceSearchResponse{TotalHits: 0}, nil)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := wrapper.Search(ctx, req)
|
||||
mainRequestDuration := time.Since(start)
|
||||
|
||||
// Main request should succeed quickly
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
assert.Less(t, mainRequestDuration, 50*time.Millisecond, "Main request should not be blocked")
|
||||
|
||||
// Wait for successful background call
|
||||
select {
|
||||
case <-unifiedClient.searchCalled:
|
||||
// Background call completed successfully
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Fatal("Expected successful background call")
|
||||
}
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSearchWrapper_GetStats(t *testing.T) {
|
||||
gr, unifiedClient, legacyClient, features := setupTestSearchClient(t)
|
||||
req := &resourcepb.ResourceStatsRequest{Namespace: "test"}
|
||||
expectedResponse := &resourcepb.ResourceStatsResponse{Stats: []*resourcepb.ResourceStatsResponse_Stats{{Count: 100}}}
|
||||
|
||||
t.Run("uses unified client when reading from unified", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(true, nil)
|
||||
unifiedClient.On("GetStats", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, features, gr)
|
||||
|
||||
resp, err := wrapper.GetStats(ctx, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
legacyClient.AssertNotCalled(t, "GetStats")
|
||||
})
|
||||
|
||||
t.Run("makes background call to unified when feature flag enabled and using legacy", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled)
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("GetStats", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
// Expect background call to unified client
|
||||
unifiedBgResponse := &resourcepb.ResourceStatsResponse{Stats: []*resourcepb.ResourceStatsResponse_Stats{{Count: 50}}}
|
||||
unifiedClient.On("GetStats", mock.Anything, req, mock.Anything).Return(unifiedBgResponse, nil)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr)
|
||||
|
||||
resp, err := wrapper.GetStats(ctx, req)
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
|
||||
// Wait for background goroutine to complete
|
||||
select {
|
||||
case <-unifiedClient.statsCalled:
|
||||
// Background call was made
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Fatal("Background unified client GetStats call was not made within timeout")
|
||||
}
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
})
|
||||
|
||||
t.Run("background GetStats request times out after 500ms", func(t *testing.T) {
|
||||
ctx := testutil.NewDefaultTestContext(t)
|
||||
dual := &MockDualWriter{}
|
||||
featuresWithFlag := featuremgmt.WithFeatures(featuremgmt.FlagUnifiedStorageSearchDualReaderEnabled)
|
||||
|
||||
dual.On("ReadFromUnified", mock.Anything, gr).Return(false, nil)
|
||||
legacyClient.On("GetStats", mock.Anything, req, mock.Anything).Return(expectedResponse, nil)
|
||||
|
||||
// Configure unified client to take longer than the 500ms timeout
|
||||
unifiedClient.SetStatsDelay(600 * time.Millisecond) // Longer than 500ms timeout
|
||||
unifiedClient.On("GetStats", mock.Anything, req, mock.Anything).Return((*resourcepb.ResourceStatsResponse)(nil), context.DeadlineExceeded)
|
||||
|
||||
wrapper := setupTestSearchWrapper(t, dual, unifiedClient, legacyClient, featuresWithFlag, gr)
|
||||
|
||||
start := time.Now()
|
||||
resp, err := wrapper.GetStats(ctx, req)
|
||||
mainRequestDuration := time.Since(start)
|
||||
|
||||
// Main request should succeed quickly despite background timeout
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp)
|
||||
assert.Less(t, mainRequestDuration, 50*time.Millisecond, "Main request should not be blocked by background timeout")
|
||||
|
||||
// Wait for background context to be canceled
|
||||
select {
|
||||
case canceledCtx := <-unifiedClient.contextCanceled:
|
||||
assert.Error(t, canceledCtx.Err(), "Background context should be canceled")
|
||||
assert.Equal(t, context.DeadlineExceeded, canceledCtx.Err())
|
||||
case <-time.After(700 * time.Millisecond):
|
||||
t.Fatal("Background request should have been canceled due to timeout")
|
||||
}
|
||||
|
||||
dual.AssertExpectations(t)
|
||||
legacyClient.AssertExpectations(t)
|
||||
unifiedClient.AssertExpectations(t)
|
||||
})
|
||||
}
|
Reference in New Issue
Block a user