mirror of
https://github.com/grafana/grafana.git
synced 2025-07-30 00:42:27 +08:00
176 lines
5.2 KiB
Go
176 lines
5.2 KiB
Go
package resource
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/mock"
|
|
|
|
"github.com/grafana/grafana/pkg/storage/unified/resourcepb"
|
|
)
|
|
|
|
func TestNewIndexQueueProcessor(t *testing.T) {
|
|
mockIndex := &MockResourceIndex{}
|
|
mockBuilder := &MockDocumentBuilder{}
|
|
nsr := NamespacedResource{Resource: "test"}
|
|
|
|
resChan := make(chan *IndexEvent)
|
|
|
|
processor := newIndexQueueProcessor(mockIndex, nsr, 10, mockBuilder, resChan)
|
|
|
|
assert.NotNil(t, processor)
|
|
assert.Equal(t, 10, processor.batchSize)
|
|
assert.NotNil(t, processor.queue)
|
|
}
|
|
|
|
func TestIndexQueueProcessor_SingleEvent(t *testing.T) {
|
|
mockIndex := &MockResourceIndex{}
|
|
mockBuilder := &MockDocumentBuilder{}
|
|
nsr := NamespacedResource{Resource: "test"}
|
|
|
|
resChan := make(chan *IndexEvent)
|
|
|
|
processor := newIndexQueueProcessor(mockIndex, nsr, 10, mockBuilder, resChan)
|
|
|
|
// Test data
|
|
key := resourcepb.ResourceKey{Resource: "test", Name: "obj1", Namespace: "default"}
|
|
evt := &WrittenEvent{
|
|
Key: &key,
|
|
ResourceVersion: time.Now().UnixMicro(),
|
|
Type: resourcepb.WatchEvent_ADDED,
|
|
Value: []byte(`{"test": "data"}`),
|
|
}
|
|
|
|
// Setup expectations
|
|
mockBuilder.On("BuildDocument", mock.Anything, &key, evt.ResourceVersion, evt.Value).Return(&IndexableDocument{Key: &key}, nil)
|
|
mockIndex.On("BulkIndex", mock.MatchedBy(func(req *BulkIndexRequest) bool {
|
|
return len(req.Items) == 1 && req.Items[0].Action == ActionIndex
|
|
})).Return(nil)
|
|
|
|
// Start processor and wait for the document to be indexed
|
|
processor.Add(evt)
|
|
|
|
resp := <-resChan
|
|
assert.NotNil(t, resp)
|
|
assert.Nil(t, resp.Err)
|
|
assert.Equal(t, &key, resp.IndexableDocument.Key)
|
|
|
|
mockBuilder.AssertExpectations(t)
|
|
mockIndex.AssertExpectations(t)
|
|
}
|
|
|
|
func TestIndexQueueProcessor_BatchProcessing(t *testing.T) {
|
|
mockIndex := &MockResourceIndex{}
|
|
mockBuilder := &MockDocumentBuilder{}
|
|
nsr := NamespacedResource{Namespace: "default", Resource: "test"}
|
|
|
|
resChan := make(chan *IndexEvent)
|
|
|
|
processor := newIndexQueueProcessor(mockIndex, nsr, 2, mockBuilder, resChan)
|
|
|
|
// Test data for two events
|
|
events := []*WrittenEvent{
|
|
{
|
|
Key: &resourcepb.ResourceKey{Resource: "test", Name: "obj1", Namespace: "default"},
|
|
ResourceVersion: time.Now().UnixMicro(),
|
|
Type: resourcepb.WatchEvent_ADDED,
|
|
Value: []byte(`{"test": "data1"}`),
|
|
},
|
|
{
|
|
Key: &resourcepb.ResourceKey{Resource: "test", Name: "obj2", Namespace: "default"},
|
|
ResourceVersion: time.Now().UnixMicro(),
|
|
Type: resourcepb.WatchEvent_DELETED,
|
|
},
|
|
}
|
|
|
|
// Setup expectations
|
|
mockBuilder.On("BuildDocument", mock.Anything, events[0].Key, events[0].ResourceVersion, events[0].Value).
|
|
Return(&IndexableDocument{Key: events[0].Key}, nil)
|
|
mockIndex.On("BulkIndex", mock.MatchedBy(func(req *BulkIndexRequest) bool {
|
|
return len(req.Items) == 2 &&
|
|
req.Items[0].Action == ActionIndex &&
|
|
req.Items[1].Action == ActionDelete
|
|
})).Return(nil)
|
|
|
|
// Start processor and add events
|
|
processor.Add(events[0])
|
|
processor.Add(events[1])
|
|
|
|
r0 := <-resChan
|
|
assert.Nil(t, r0.Err)
|
|
assert.Equal(t, events[0].Key, r0.IndexableDocument.Key)
|
|
|
|
r1 := <-resChan
|
|
assert.Nil(t, r1.Err)
|
|
assert.Nil(t, r1.IndexableDocument) // deleted event
|
|
|
|
mockBuilder.AssertExpectations(t)
|
|
mockIndex.AssertExpectations(t)
|
|
}
|
|
|
|
func TestIndexQueueProcessor_BuildDocumentError(t *testing.T) {
|
|
mockIndex := &MockResourceIndex{}
|
|
mockBuilder := &MockDocumentBuilder{}
|
|
nsr := NamespacedResource{Resource: "test"}
|
|
|
|
resChan := make(chan *IndexEvent)
|
|
|
|
processor := newIndexQueueProcessor(mockIndex, nsr, 10, mockBuilder, resChan)
|
|
|
|
evt := &WrittenEvent{
|
|
Key: &resourcepb.ResourceKey{Resource: "test", Name: "obj1", Namespace: "default"},
|
|
ResourceVersion: time.Now().UnixMicro(),
|
|
Type: resourcepb.WatchEvent_ADDED,
|
|
Value: []byte(`invalid json`),
|
|
}
|
|
|
|
// Setup expectations for error case
|
|
mockBuilder.On("BuildDocument", mock.Anything, evt.Key, evt.ResourceVersion, evt.Value).
|
|
Return(nil, assert.AnError)
|
|
|
|
// The bulk index should not be called since document building failed
|
|
mockIndex.On("BulkIndex", mock.Anything).Return(nil).Maybe()
|
|
|
|
processor.Add(evt)
|
|
|
|
resp := <-resChan
|
|
assert.NotNil(t, resp)
|
|
assert.Error(t, resp.Err)
|
|
assert.Nil(t, resp.IndexableDocument)
|
|
|
|
mockBuilder.AssertExpectations(t)
|
|
mockIndex.AssertExpectations(t)
|
|
}
|
|
|
|
func TestIndexQueueProcessor_BulkIndexError(t *testing.T) {
|
|
mockIndex := &MockResourceIndex{}
|
|
mockBuilder := &MockDocumentBuilder{}
|
|
nsr := NamespacedResource{Resource: "test"}
|
|
|
|
resChan := make(chan *IndexEvent)
|
|
|
|
processor := newIndexQueueProcessor(mockIndex, nsr, 10, mockBuilder, resChan)
|
|
|
|
evt := &WrittenEvent{
|
|
Key: &resourcepb.ResourceKey{Resource: "test", Name: "obj1", Namespace: "default"},
|
|
ResourceVersion: time.Now().UnixMicro(),
|
|
Type: resourcepb.WatchEvent_ADDED,
|
|
Value: []byte(`{"test": "data"}`),
|
|
}
|
|
|
|
// Setup expectations
|
|
mockBuilder.On("BuildDocument", mock.Anything, evt.Key, evt.ResourceVersion, evt.Value).
|
|
Return(&IndexableDocument{Key: evt.Key}, nil)
|
|
mockIndex.On("BulkIndex", mock.Anything).Return(assert.AnError)
|
|
|
|
processor.Add(evt)
|
|
|
|
resp := <-resChan
|
|
assert.NotNil(t, resp)
|
|
assert.Error(t, resp.Err)
|
|
|
|
mockBuilder.AssertExpectations(t)
|
|
mockIndex.AssertExpectations(t)
|
|
}
|