mirror of
https://github.com/grafana/loki.git
synced 2026-03-13 09:33:58 +08:00
feat(limits): Add emptiness check to prepare for multiAZ (#20927)
This commit is contained in:
@@ -419,7 +419,13 @@ func (s *usageStore) updateWithBuckets(i int, tenant string, partition int32, po
|
||||
stream.totalSize = 0
|
||||
stream.policy = policyBucket
|
||||
stream.rateBuckets = make([]rateBucket, s.numBuckets)
|
||||
} else if len(stream.rateBuckets) == 0 {
|
||||
// If the stream exists but rateBuckets is not initialized (e.g., created via Update()),
|
||||
// initialize it now. This can happen when ExceedsLimits creates a stream, then
|
||||
// UpdateRates is called for the same stream.
|
||||
stream.rateBuckets = make([]rateBucket, s.numBuckets)
|
||||
}
|
||||
|
||||
seenAtUnixNano := seenAt.UnixNano()
|
||||
if stream.lastSeenAt <= seenAtUnixNano {
|
||||
stream.lastSeenAt = seenAtUnixNano
|
||||
|
||||
@@ -219,6 +219,43 @@ func TestUsageStore_RateBucketsAreNotUsed(t *testing.T) {
|
||||
require.Nil(t, stream.rateBuckets)
|
||||
}
|
||||
|
||||
// TestUsageStore_UpdateRates_AfterUpdate asserts that UpdateRates works correctly
|
||||
// when called on a stream that was previously created via Update() (which doesn't
|
||||
// initialize rateBuckets). This prevents panics when a stream is created by
|
||||
// ExceedsLimits and then UpdateRates is called on it (e.g., from cross-zone replication).
|
||||
func TestUsageStore_UpdateRates_AfterUpdate(t *testing.T) {
|
||||
s, err := newUsageStore(15*time.Minute, 5*time.Minute, time.Minute, 1, &mockLimits{}, prometheus.NewRegistry())
|
||||
require.NoError(t, err)
|
||||
clock := quartz.NewMock(t)
|
||||
s.clock = clock
|
||||
|
||||
// First, create a stream via Update() (simulates ExceedsLimits creating the stream)
|
||||
err = s.Update("tenant", &proto.StreamMetadata{
|
||||
StreamHash: 0x1,
|
||||
TotalSize: 50,
|
||||
}, clock.Now())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify the stream exists but has no rate buckets
|
||||
s.withRLock("tenant", func(i int) {
|
||||
partition := s.getPartitionForHash(0x1)
|
||||
stream, ok := s.stripes[i]["tenant"][partition][noPolicy][0x1]
|
||||
require.True(t, ok, "stream should exist")
|
||||
require.Nil(t, stream.rateBuckets, "rateBuckets should be nil after Update()")
|
||||
})
|
||||
|
||||
// Now call UpdateRates on the same stream (simulates cross-zone replication)
|
||||
// This should NOT panic and should initialize rateBuckets
|
||||
rates, err := s.UpdateRates("tenant", []*proto.StreamMetadata{{
|
||||
StreamHash: 0x1,
|
||||
TotalSize: 100,
|
||||
}}, clock.Now())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, rates, 1)
|
||||
require.NotNil(t, rates[0].rateBuckets, "rateBuckets should be initialized")
|
||||
require.Greater(t, len(rates[0].rateBuckets), 0, "rateBuckets should have entries")
|
||||
}
|
||||
|
||||
func TestUsageStore_UpdateCond(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
Reference in New Issue
Block a user