Files
grafana/pkg/util/debouncer/debouncer_test.go
2025-05-16 13:47:09 +02:00

219 lines
5.5 KiB
Go

package debouncer
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)
func TestDebouncer(t *testing.T) {
t.Run("should process values after min wait", func(t *testing.T) {
var processedMu sync.Mutex
processedValues := make(map[string]int)
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
processedMu.Lock()
processedValues[value]++
processedMu.Unlock()
return nil
},
MinWait: 10 * time.Millisecond,
MaxWait: 500 * time.Millisecond,
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
require.NoError(t, group.Add("key1"))
require.NoError(t, group.Add("key2"))
// Should be deduplicated.
require.NoError(t, group.Add("key1"))
require.Eventually(t, func() bool {
// We should have processed key1 and key2 exactly once.
processedMu.Lock()
if processedValues["key1"] == 1 && processedValues["key2"] == 1 {
return true
}
processedMu.Unlock()
return false
}, time.Millisecond*200, time.Millisecond*20)
})
t.Run("should process values after max wait", func(t *testing.T) {
processed := make(map[string]int, 1)
clockMock := clock.NewMock()
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
processed[value]++
return nil
},
MinWait: 50 * time.Millisecond,
MaxWait: 500 * time.Millisecond,
clock: clockMock,
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
start := clockMock.Now()
for counter := 0; counter < 25; counter++ {
_ = group.Add("key1")
clockMock.Add(time.Millisecond * 40)
if processed["key1"] == 1 {
break
}
}
// Make sure that the execution happened after the maxTimeout of 500ms, but before the next MaxTimeout.
require.WithinDuration(t, start.Add(time.Millisecond*500), clockMock.Now(), time.Millisecond*499)
})
t.Run("should handle buffer full", func(t *testing.T) {
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 1,
ProcessHandler: func(ctx context.Context, value string) error { return nil },
MinWait: 10 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
})
require.NoError(t, err)
require.NoError(t, group.Add("key1"))
// Buffer should be full by now as we are not reading from it yet.
require.ErrorIs(t, group.Add("key2"), ErrBufferFull)
})
t.Run("should track metrics", func(t *testing.T) {
var wg sync.WaitGroup
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
wg.Done()
return nil
},
MinWait: 10 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
Name: "test",
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
wg.Add(1)
require.NoError(t, group.Add("key1"))
require.NoError(t, group.Add("key1"))
wg.Wait()
require.Equal(t, float64(2), testutil.ToFloat64(group.metrics.itemsAddedCounter))
require.Equal(t, float64(1), testutil.ToFloat64(group.metrics.itemsProcessedCounter))
})
t.Run("should handle errors", func(t *testing.T) {
var (
wg sync.WaitGroup
errs = make(chan error, 10)
expectedErr = errors.New("test error")
)
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
wg.Done()
return expectedErr
},
MinWait: 10 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
Reg: prometheus.NewPedanticRegistry(),
Name: "test_errors",
ErrorHandler: func(_ string, err error) { errs <- err },
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
wg.Add(1)
require.NoError(t, group.Add("key1"))
wg.Wait()
select {
case err := <-errs:
require.Equal(t, expectedErr, err)
default:
t.Fatal("expected error")
}
require.Equal(t, float64(1), testutil.ToFloat64(group.metrics.processingErrorsCounter))
})
t.Run("should gracefully handle stops", func(t *testing.T) {
// Create a channel to signal when processing is done.
done := make(chan struct{})
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, item string) error {
// Start a goroutine to wait for context cancellation.
go func() {
<-ctx.Done()
close(done)
}()
return nil
},
MinWait: 50 * time.Millisecond,
MaxWait: 500 * time.Millisecond,
})
require.NoError(t, err)
// Start the group with a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
// Send an item to trigger processing.
require.NoError(t, group.Add("key-1"))
// Give the group a moment to process the item.
time.Sleep(100 * time.Millisecond)
// Stop the group, which should cancel the context.
group.Stop()
// Wait for the done signal or timeout.
select {
case <-done:
// Success - the group was stopped and the context was canceled
case <-time.After(time.Second):
t.Fatal("Timed out waiting for group to stop")
}
})
}