feat: decouple dataobj consumers from the reader service (#20315)

This commit is contained in:
George Robinson
2026-01-08 21:44:06 +00:00
committed by GitHub
parent 4469f826d2
commit c3e909d538
11 changed files with 671 additions and 156 deletions

View File

@@ -7,7 +7,7 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafkav2"
)
type downscalePermittedFunc func(context.Context) (bool, error)
@@ -27,23 +27,23 @@ func newChainedDownscalePermittedFunc(funcs ...downscalePermittedFunc) downscale
// newOffsetCommittedDownscaleFunc returns a downscalePermittedFunc that checks
// if the consumer has committed all records up to the end offset.
func newOffsetCommittedDownscaleFunc(offsetManager *partition.KafkaOffsetManager, partitionID int32, logger log.Logger) downscalePermittedFunc {
func newOffsetCommittedDownscaleFunc(offsetReader *kafkav2.OffsetReader, partitionID int32, logger log.Logger) downscalePermittedFunc {
return func(ctx context.Context) (bool, error) {
endOffset, err := offsetManager.PartitionOffset(ctx, partitionID, partition.KafkaEndOffset)
endOffset, err := offsetReader.EndOffset(ctx, partitionID)
if err != nil {
return false, fmt.Errorf("failed to get end offset: %w", err)
}
// The end offset is the offset of the next record to be produced.
// That means if the end offset is zero no records have been produced
// for this partition, which in turn means we can downscale.
// The end offset is the offset of the next record to be produced. If the
// end offset is zero this means no records have been produced for this
// partition, which in turn means we can downscale.
if endOffset == 0 {
level.Debug(logger).Log("msg", "no records produced for partition")
return true, nil
}
// If some records have been produced for this partition we need to
// make sure the consumer has processed and committed all of them
// otherwise we risk data loss.
lastCommittedOffset, err := offsetManager.LastCommittedOffset(ctx, partitionID)
// If some records have been produced for this partition we need to make sure
// the consumer has processed and committed all of them otherwise we risk data
// loss. If no offsets have been committed, the last committed offset is -1.
lastCommittedOffset, err := offsetReader.LastCommittedOffset(ctx, partitionID)
if err != nil {
return false, fmt.Errorf("failed to get last committed offset: %w", err)
}

View File

@@ -140,7 +140,7 @@ type mockCommitter struct {
offsets []int64
}
func (m *mockCommitter) Commit(_ context.Context, offset int64) error {
func (m *mockCommitter) Commit(_ context.Context, _ int32, offset int64) error {
m.offsets = append(m.offsets, offset)
return nil
}

View File

@@ -23,7 +23,6 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/scratch"
)
@@ -40,7 +39,7 @@ type builder interface {
// committer allows mocking of certain [kgo.Client] methods in tests.
type committer interface {
Commit(ctx context.Context, offset int64) error
Commit(ctx context.Context, partition int32, offset int64) error
}
type producer interface {
@@ -54,7 +53,7 @@ type partitionProcessor struct {
partition int32
// lastRecord contains the last record appended to the builder. It is used
// to commit the correct offset after a flush.
lastRecord *partition.Record
lastRecord *kgo.Record
builder builder
decoder *kafka.Decoder
uploader *uploader.Uploader
@@ -88,6 +87,8 @@ type partitionProcessor struct {
eventsProducerClient producer
metastorePartitionRatio int32
recordsChan chan *kgo.Record
// Used for tests.
clock quartz.Clock
}
@@ -105,6 +106,7 @@ func newPartitionProcessor(
eventsProducerClient *kgo.Client,
topic string,
partition int32,
recordsChan chan *kgo.Record,
) *partitionProcessor {
decoder, err := kafka.NewDecoder()
if err != nil {
@@ -142,10 +144,11 @@ func newPartitionProcessor(
eventsProducerClient: eventsProducerClient,
clock: quartz.NewReal(),
metastorePartitionRatio: int32(metastoreCfg.PartitionRatio),
recordsChan: recordsChan,
}
}
func (p *partitionProcessor) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
func (p *partitionProcessor) Start(ctx context.Context) func() {
// This is a hack to avoid duplicate metrics registration panics. The
// problem occurs because [kafka.ReaderService] creates a consumer to
// process lag on startup, tears it down, and then creates another one
@@ -164,7 +167,7 @@ func (p *partitionProcessor) Start(ctx context.Context, recordsChan <-chan []par
case <-ctx.Done():
level.Info(p.logger).Log("msg", "stopping partition processor, context canceled")
return
case records, ok := <-recordsChan:
case record, ok := <-p.recordsChan:
if !ok {
level.Info(p.logger).Log("msg", "stopping partition processor, channel closed")
// Channel was closed. This means no more records will be
@@ -176,10 +179,7 @@ func (p *partitionProcessor) Start(ctx context.Context, recordsChan <-chan []par
}
return
}
// Process the records received.
for _, record := range records {
p.processRecord(ctx, record)
}
p.processRecord(ctx, record)
// This partition is idle, flush it.
case <-time.After(p.idleFlushTimeout):
if _, err := p.idleFlush(ctx); err != nil {
@@ -240,7 +240,7 @@ func (p *partitionProcessor) emitObjectWrittenEvent(ctx context.Context, objectP
return results.FirstErr()
}
func (p *partitionProcessor) processRecord(ctx context.Context, record partition.Record) {
func (p *partitionProcessor) processRecord(ctx context.Context, record *kgo.Record) {
p.metrics.processedRecords.Inc()
// Update offset metric at the end of processing
@@ -259,8 +259,8 @@ func (p *partitionProcessor) processRecord(ctx context.Context, record partition
return
}
tenant := record.TenantID
stream, err := p.decoder.DecodeWithoutLabels(record.Content)
tenant := string(record.Key)
stream, err := p.decoder.DecodeWithoutLabels(record.Value)
if err != nil {
level.Error(p.logger).Log("msg", "failed to decode record", "err", err)
return
@@ -286,7 +286,7 @@ func (p *partitionProcessor) processRecord(ctx context.Context, record partition
}
}
p.lastRecord = &record
p.lastRecord = record
p.lastModified = p.clock.Now()
}
@@ -369,7 +369,8 @@ func (p *partitionProcessor) commit(ctx context.Context) error {
backoff.Reset()
for backoff.Ongoing() {
p.metrics.incCommitsTotal()
err := p.committer.Commit(ctx, p.lastRecord.Offset)
level.Debug(p.logger).Log("msg", "committed offset", "partition", p.partition, "offset", p.lastRecord.Offset)
err := p.committer.Commit(ctx, p.partition, p.lastRecord.Offset)
if err == nil {
return nil
}

View File

@@ -1,70 +0,0 @@
package consumer
import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/scratch"
)
// partitionProcessorFactory is a factory for partition processors.
type partitionProcessorFactory struct {
cfg Config
// TODO(grobinson): We should see if we can move metastore.Config inside
// Config instead of having a separate field just for the metastore.
metastoreCfg metastore.Config
metastoreEvents *kgo.Client
bucket objstore.Bucket
scratchStore scratch.Store
logger log.Logger
reg prometheus.Registerer
topic string
partition int32
}
// newPartitionProcessorFactory returns a new partitionProcessorFactory.
func newPartitionProcessorFactory(
cfg Config,
metastoreCfg metastore.Config,
metastoreEvents *kgo.Client,
bucket objstore.Bucket,
scratchStore scratch.Store,
logger log.Logger,
reg prometheus.Registerer,
topic string,
partition int32,
) *partitionProcessorFactory {
return &partitionProcessorFactory{
cfg: cfg,
metastoreCfg: metastoreCfg,
metastoreEvents: metastoreEvents,
bucket: bucket,
scratchStore: scratchStore,
logger: logger,
reg: reg,
topic: topic,
partition: partition,
}
}
// New returns a new processor for the partition.
func (f *partitionProcessorFactory) New(committer partition.Committer, logger log.Logger) (partition.Consumer, error) {
return newPartitionProcessor(
committer,
f.cfg.BuilderConfig,
f.cfg.UploaderConfig,
f.metastoreCfg,
f.bucket,
f.scratchStore,
f.logger,
f.reg,
f.cfg.IdleFlushTimeout,
f.metastoreEvents,
f.topic,
f.partition,
), nil
}

View File

@@ -9,11 +9,11 @@ import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/scratch"
@@ -52,9 +52,9 @@ func TestPartitionProcessor_Flush(t *testing.T) {
}
b, err := s.Marshal()
require.NoError(t, err)
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: now,
})
@@ -94,9 +94,9 @@ func TestPartitionProcessor_Flush(t *testing.T) {
}
b, err := s.Marshal()
require.NoError(t, err)
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: now,
})
@@ -149,9 +149,9 @@ func TestPartitionProcessor_IdleFlush(t *testing.T) {
}
b, err := s.Marshal()
require.NoError(t, err)
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: clock.Now(),
})
// A modification should have happened.
@@ -195,9 +195,9 @@ func TestPartitionProcessor_OffsetsCommitted(t *testing.T) {
}
b, err := s.Marshal()
require.NoError(t, err)
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: now1,
Offset: 1,
})
@@ -212,9 +212,9 @@ func TestPartitionProcessor_OffsetsCommitted(t *testing.T) {
// Append another record.
clock.Advance(time.Minute)
now2 := clock.Now()
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: now2,
Offset: 2,
})
@@ -247,9 +247,9 @@ func TestPartitionProcessor_OffsetsCommitted(t *testing.T) {
}
b, err := s.Marshal()
require.NoError(t, err)
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: now1,
Offset: 1,
})
@@ -294,9 +294,9 @@ func TestPartitionProcessor_ProcessRecord(t *testing.T) {
}
b, err := s.Marshal()
require.NoError(t, err)
p.processRecord(ctx, partition.Record{
TenantID: "test-tenant",
Content: b,
p.processRecord(ctx, &kgo.Record{
Key: []byte("test-tenant"),
Value: b,
Timestamp: clock.Now(),
})
@@ -322,6 +322,7 @@ func newTestPartitionProcessor(t *testing.T, clock quartz.Clock) *partitionProce
nil,
"test-topic",
1,
nil,
)
p.clock = clock
p.eventsProducerClient = &mockKafka{}

View File

@@ -3,6 +3,7 @@ package consumer
import (
"context"
"fmt"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
@@ -11,13 +12,14 @@ import (
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
"github.com/grafana/loki/v3/pkg/kafkav2"
"github.com/grafana/loki/v3/pkg/scratch"
)
@@ -34,7 +36,8 @@ type Service struct {
metastoreEvents *kgo.Client
lifecycler *ring.Lifecycler
partitionInstanceLifecycler *ring.PartitionInstanceLifecycler
partitionReader *partition.ReaderService
consumer *kafkav2.SinglePartitionConsumer
processor *partitionProcessor
downscalePermitted downscalePermittedFunc
watcher *services.FailureWatcher
logger log.Logger
@@ -63,6 +66,7 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto
}
s.metastoreEvents = metastoreEvents
// Set up the ring.
lifecycler, err := ring.NewLifecycler(
cfg.LifecyclerConfig,
s,
@@ -77,18 +81,20 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto
}
s.lifecycler = lifecycler
// An instance must register itself in the partition ring. Each instance
// is responsible for consuming exactly one partition determined by
// its partition ID. Once ready, the instance will declare its partition
// as active in the partition ring. This is how distributors know which
// partitions have a ready consumer.
partitionID, err := partitionring.ExtractPartitionID(cfg.LifecyclerConfig.ID)
// Set up the partition ring. Each instance of a dataobj consumer is responsible
// for consuming exactly one partition, determined by its partition ID.
// Once ready, the instance will declare its partition as active in the partition
// ring. This is how distributors know which partitions can receive records and
// which partitions can not (for example, we dont' want to send new records to
// a dataobj consumer that is about to scale down).
instanceID := cfg.LifecyclerConfig.ID
partitionID, err := partitionring.ExtractPartitionID(instanceID)
if err != nil {
return nil, fmt.Errorf("failed to extract partition ID from lifecycler configuration: %w", err)
}
// The mock KV is used in tests. If this is not a test then we must initialize
// a real kv.
partitionRingKV := cfg.PartitionRingConfig.KVStore.Mock
// The mock KV is used in tests. If this is not a test then we must
// initialize a real kv.
if partitionRingKV == nil {
partitionRingKV, err = kv.NewClient(
cfg.PartitionRingConfig.KVStore,
@@ -101,7 +107,7 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto
}
}
partitionInstanceLifecycler := ring.NewPartitionInstanceLifecycler(
cfg.PartitionRingConfig.ToLifecyclerConfig(partitionID, cfg.LifecyclerConfig.ID),
cfg.PartitionRingConfig.ToLifecyclerConfig(partitionID, instanceID),
PartitionRingName,
PartitionRingKey,
partitionRingKV,
@@ -109,39 +115,53 @@ func New(kafkaCfg kafka.Config, cfg Config, mCfg metastore.Config, bucket objsto
prometheus.WrapRegistererWithPrefix("loki_", reg))
s.partitionInstanceLifecycler = partitionInstanceLifecycler
processorFactory := newPartitionProcessorFactory(
cfg,
// Set up the Kafka client that receives log entries. These entries are used to build
// data objects.
readerCfg := kafkaCfg
readerCfg.Topic = cfg.Topic
readerClient, err := client.NewReaderClient("loki.dataobj_consumer", readerCfg, logger, reg)
if err != nil {
return nil, fmt.Errorf("failed to create client for data topic: %w", err)
}
offsetReader := kafkav2.NewOffsetReader(readerClient, cfg.Topic, instanceID, logger)
// Since dataobj consumers do not group consume, we need to fetch the initial
// offset ourselves.
resumeOffsetCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
initialOffset, err := offsetReader.ResumeOffset(resumeOffsetCtx, partitionID)
if err != nil {
// TODO(grobinson): We need to use a backoff retry mechanism in case we cannot
// fetch offsets on the first attempt.
return nil, fmt.Errorf("failed to fetch resume offset: %w", err)
}
committer := kafkav2.NewGroupCommitter(kadm.NewClient(readerClient), cfg.Topic, instanceID)
records := make(chan *kgo.Record)
s.consumer = kafkav2.NewSinglePartitionConsumer(
readerClient,
cfg.Topic,
partitionID,
initialOffset,
records,
logger,
prometheus.WrapRegistererWithPrefix("loki_dataobj_consumer_", reg),
)
s.processor = newPartitionProcessor(
committer,
cfg.BuilderConfig,
cfg.UploaderConfig,
mCfg,
metastoreEvents,
bucket,
scratchStore,
logger,
reg,
cfg.IdleFlushTimeout,
metastoreEvents,
cfg.Topic,
partitionID,
records,
)
kafkaCfg.Topic = cfg.Topic
partitionReader, err := partition.NewReaderService(
kafkaCfg,
partitionID,
cfg.LifecyclerConfig.ID,
processorFactory.New,
logger,
prometheus.WrapRegistererWithPrefix("loki_dataobj_consumer_", reg),
partitionInstanceLifecycler,
)
if err != nil {
return nil, err
}
s.partitionReader = partitionReader
// TODO: We have to pass prometheus.NewRegistry() to avoid duplicate
// metric registration with partition.NewReaderService.
offsetManager, err := partition.NewKafkaOffsetManager(kafkaCfg, cfg.LifecyclerConfig.ID, logger, prometheus.NewRegistry())
if err != nil {
return nil, err
}
s.downscalePermitted = newOffsetCommittedDownscaleFunc(offsetManager, partitionID, logger)
s.downscalePermitted = newOffsetCommittedDownscaleFunc(offsetReader, partitionID, logger)
watcher := services.NewFailureWatcher()
watcher.WatchService(lifecycler)
@@ -161,14 +181,16 @@ func (s *Service) starting(ctx context.Context) error {
if err := services.StartAndAwaitRunning(ctx, s.partitionInstanceLifecycler); err != nil {
return fmt.Errorf("failed to start partition instance lifecycler: %w", err)
}
if err := services.StartAndAwaitRunning(ctx, s.partitionReader); err != nil {
return fmt.Errorf("failed to start partition reader: %w", err)
if err := services.StartAndAwaitRunning(ctx, s.consumer); err != nil {
return fmt.Errorf("failed to start consumer: %w", err)
}
return nil
}
// running implements the Service interface's running method.
func (s *Service) running(ctx context.Context) error {
// TODO(grobinson): Turn this into a [services.Service] instead.
s.processor.Start(ctx)
<-ctx.Done()
return nil
}
@@ -177,7 +199,7 @@ func (s *Service) running(ctx context.Context) error {
func (s *Service) stopping(failureCase error) error {
level.Info(s.logger).Log("msg", "stopping")
ctx := context.TODO()
if err := services.StopAndAwaitTerminated(ctx, s.partitionReader); err != nil {
if err := services.StopAndAwaitTerminated(ctx, s.consumer); err != nil {
level.Warn(s.logger).Log("msg", "failed to stop partition reader", "err", err)
}
if err := services.StopAndAwaitTerminated(ctx, s.partitionInstanceLifecycler); err != nil {

131
pkg/kafkav2/consumer.go Normal file
View File

@@ -0,0 +1,131 @@
package kafkav2
import (
"context"
"errors"
"strconv"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kgo"
)
// A SinglePartitionConsumer consumes records from a single partition.
type SinglePartitionConsumer struct {
*services.BasicService
client *kgo.Client
topic string
partition int32
initialOffset int64
records chan<- *kgo.Record
logger log.Logger
fetchErrors prometheus.Counter
polls prometheus.Counter
}
// NewSinglePartitionConsumer returns a new SinglePartitionConsumer. It
// consumes records from the specified offset. It accepts the two special
// offsets of -2 to consume from the start and -1 to consume from the end.
func NewSinglePartitionConsumer(
client *kgo.Client,
topic string,
partition int32,
initialOffset int64,
records chan<- *kgo.Record,
logger log.Logger,
r prometheus.Registerer,
) *SinglePartitionConsumer {
// Wrap the registerer with labels for the topic and partition so we don't
// need to add it to each metric.
r = prometheus.WrapRegistererWith(prometheus.Labels{
"topic": topic,
"partition": strconv.Itoa(int(partition)),
}, r)
// Consume the topic and partition from the specified offset.
client.AddConsumePartitions(map[string]map[int32]kgo.Offset{
topic: {
partition: kgo.NewOffset().At(initialOffset),
},
})
c := SinglePartitionConsumer{
client: client,
topic: topic,
partition: partition,
initialOffset: initialOffset,
records: records,
logger: log.With(logger, "topic", topic, "partition", partition),
fetchErrors: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "single_partition_consumer_fetch_erros_total",
Help: "The number of fetch errors.",
}),
polls: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "single_partition_consumer_polls_total",
Help: "Total number of polls.",
}),
}
c.BasicService = services.NewBasicService(c.starting, c.running, c.stopping)
return &c
}
// starting implements [services.StartingFn].
func (c *SinglePartitionConsumer) starting(_ context.Context) error {
return nil
}
// running implements [services.RunningFn].
func (c *SinglePartitionConsumer) running(ctx context.Context) error {
return c.Run(ctx)
}
// running implements [services.StoppingFn].
func (c *SinglePartitionConsumer) stopping(_ error) error {
return nil
}
func (c *SinglePartitionConsumer) Run(ctx context.Context) error {
b := backoff.New(ctx, backoff.Config{
MinBackoff: time.Millisecond * 100,
MaxBackoff: time.Second * 10,
MaxRetries: 0, // Infinite retries.
})
for b.Ongoing() {
select {
case <-ctx.Done():
return ctx.Err()
default:
c.polls.Inc()
fetches := c.client.PollRecords(ctx, -1)
// If the client is closed, or the context was canceled, return the error
// as no fetches were polled. We use this instead of [kgo.IsClientClosed]
// so we can also check if the context was canceled.
if err := fetches.Err0(); errors.Is(err, kgo.ErrClientClosed) || errors.Is(err, context.Canceled) {
return err
}
// The client can fetch from multiple brokers in a single poll. This means
// we must handle both records and errors at the same time, as some brokers
// might be polled successfully while others return errors.
fetches.EachRecord(func(record *kgo.Record) {
c.records <- record
})
var numErrs int
fetches.EachError(func(_ string, _ int32, err error) {
level.Error(c.logger).Log("msg", "failed to poll fetches", "err", err)
c.fetchErrors.Inc()
numErrs++
})
if numErrs == 0 {
b.Reset()
} else {
b.Wait()
}
}
}
return nil
}

View File

@@ -0,0 +1,51 @@
package kafkav2
import (
"context"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
)
func TestSinglePartitionConsumer(t *testing.T) {
const testTopic = "test-topic"
ctx := t.Context()
cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(1, testTopic))
require.NoError(t, err)
t.Cleanup(cluster.Close)
// Produce some records to be consumed.
client := mustKafkaClient(t, cluster.ListenAddrs()[0])
res1 := client.ProduceSync(ctx, &kgo.Record{Topic: testTopic, Key: []byte("key1"), Value: []byte("value1")})
require.NoError(t, res1.FirstErr())
res2 := client.ProduceSync(ctx, &kgo.Record{Topic: testTopic, Key: []byte("key2"), Value: []byte("value2")})
require.NoError(t, res2.FirstErr())
// Set up the consumer.
dst := make(chan *kgo.Record)
consumer := NewSinglePartitionConsumer(client, testTopic, 0, -2, dst, log.NewNopLogger(), prometheus.NewRegistry())
cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
go consumer.Run(cancelCtx) //nolint:errcheck
// Wait for the expected number of records to arrive.
var records []*kgo.Record
for len(records) < 2 {
select {
case <-cancelCtx.Done():
t.Fatal("context canceled before all records received")
case record := <-dst:
records = append(records, record)
}
}
// Check that the records are as expected.
require.Len(t, records, 2)
require.Equal(t, []byte("value1"), records[0].Value)
require.Equal(t, []byte("value2"), records[1].Value)
}

189
pkg/kafkav2/offset.go Normal file
View File

@@ -0,0 +1,189 @@
package kafkav2
import (
"context"
"errors"
"fmt"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)
const (
// Special offsets in Kafka that refer to the start or end offset for
// a partition.
OffsetStart = -2
OffsetEnd = -1
)
type Committer struct {
client *kadm.Client
}
// NewCommitter returns a new Committer.
func NewCommitter(client *kadm.Client) *Committer {
return &Committer{
client: client,
}
}
// Commit commits the offset. It returns an error if the offset could not
// be committed.
func (c *Committer) Commit(ctx context.Context, topic string, partition int32, consumerGroup string, offset int64) error {
offsets := kadm.Offsets{}
offsets.AddOffset(topic, partition, offset, -1)
committed, err := c.client.CommitOffsets(ctx, consumerGroup, offsets)
if err != nil {
return err
}
if !committed.Ok() {
return committed.Error()
}
return nil
}
type GroupCommitter struct {
topic string
consumerGroup string
*Committer
}
// NewGroupCommitter returns a new GroupCommitter.
func NewGroupCommitter(client *kadm.Client, topic string, consumerGroup string) *GroupCommitter {
return &GroupCommitter{
topic: topic,
consumerGroup: consumerGroup,
Committer: NewCommitter(client),
}
}
// Commit commits the offset. It returns an error if the offset could not
// be committed.
func (c *GroupCommitter) Commit(ctx context.Context, partition int32, offset int64) error {
return c.Committer.Commit(ctx, c.topic, partition, c.consumerGroup, offset)
}
type OffsetReader struct {
adm *kadm.Client
client *kgo.Client
topic string
consumerGroup string
logger log.Logger
}
// NewOffsetReader returns a new OffsetReader.
func NewOffsetReader(
client *kgo.Client,
topic string,
consumerGroup string,
logger log.Logger,
) *OffsetReader {
return &OffsetReader{
adm: kadm.NewClient(client),
client: client,
topic: topic,
consumerGroup: consumerGroup,
logger: log.With(logger, "topic", topic, "consumer_group", consumerGroup),
}
}
// LastCommittedOffset returns the last committed offset for the partition.
func (r *OffsetReader) LastCommittedOffset(ctx context.Context, partition int32) (int64, error) {
req := kmsg.NewPtrOffsetFetchRequest()
req.Group = r.consumerGroup
req.Topics = []kmsg.OffsetFetchRequestTopic{{
Topic: r.topic,
Partitions: []int32{partition},
}}
resps := r.client.RequestSharded(ctx, req)
// Since we issued a request for only 1 partition, we expect exactly 1 response.
if expected, actual := 1, len(resps); actual != expected {
return 0, fmt.Errorf("unexpected number of responses: %d", len(resps))
}
// Ensure no error occurred.
res := resps[0]
if res.Err != nil {
return 0, res.Err
}
// Parse the response.
fetchRes, ok := res.Resp.(*kmsg.OffsetFetchResponse)
if !ok {
return 0, errors.New("unexpected response type")
}
if len(fetchRes.Groups) != 1 ||
len(fetchRes.Groups[0].Topics) != 1 ||
len(fetchRes.Groups[0].Topics[0].Partitions) != 1 {
level.Debug(r.logger).Log(
"msg", "malformed response, setting to start offset",
)
return int64(OffsetStart), nil
}
partitionRes := fetchRes.Groups[0].Topics[0].Partitions[0]
if err := kerr.ErrorForCode(partitionRes.ErrorCode); err != nil {
return 0, err
}
return partitionRes.Offset, nil
}
// ResumeOffset returns the next offset to consume.
func (r *OffsetReader) ResumeOffset(ctx context.Context, partition int32) (int64, error) {
lastCommittedOffset, err := r.LastCommittedOffset(ctx, partition)
if err != nil {
return 0, fmt.Errorf("failed to fetch last committed offset: %w", err)
}
initialOffset := int64(OffsetStart)
if lastCommittedOffset >= 0 {
initialOffset = lastCommittedOffset + 1
}
return initialOffset, nil
}
// EndOffset returns the end offset.
func (r *OffsetReader) EndOffset(ctx context.Context, partition int32) (int64, error) {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = partition
partitionReq.Timestamp = int64(OffsetEnd)
topicReq := kmsg.NewListOffsetsRequestTopic()
topicReq.Topic = r.topic
topicReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{partitionReq}
req := kmsg.NewPtrListOffsetsRequest()
req.IsolationLevel = 0 // 0 means READ_UNCOMMITTED.
req.Topics = []kmsg.ListOffsetsRequestTopic{topicReq}
// Even if we share the same client, other in-flight requests are not canceled once this context is canceled
// (or its deadline is exceeded). We've verified it with a unit test.
resps := r.client.RequestSharded(ctx, req)
// Since we issued a request for only 1 partition, we expect exactly 1 response.
if len(resps) != 1 {
return 0, fmt.Errorf("unexpected number of responses: %d", len(resps))
}
// Ensure no error occurred.
res := resps[0]
if res.Err != nil {
return 0, res.Err
}
listRes, ok := res.Resp.(*kmsg.ListOffsetsResponse)
if !ok {
return 0, errors.New("unexpected response type")
}
if len(listRes.Topics) != 1 ||
len(listRes.Topics[0].Partitions) != 1 {
return 0, errors.New("malformed response")
}
partitionRes := listRes.Topics[0].Partitions[0]
if err := kerr.ErrorForCode(partitionRes.ErrorCode); err != nil {
return 0, err
}
return partitionRes.Offset, nil
}

166
pkg/kafkav2/offset_test.go Normal file
View File

@@ -0,0 +1,166 @@
package kafkav2
import (
"testing"
"time"
"github.com/go-kit/log"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kfake"
"github.com/twmb/franz-go/pkg/kgo"
)
// This test asserts that the correct offset is committed for the intended
// topic, partition and consumer group, and that no offsets are incorrectly
// committed for any other topics, partitions or consumer groups.
func TestCommitter_Commit(t *testing.T) {
const (
testTopic = "test-topic"
testConsumerGroup = "test-consumer-group"
)
ctx := t.Context()
// Create a fake cluster for the test.
cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(1, testTopic))
require.NoError(t, err)
t.Cleanup(cluster.Close)
client := mustKafkaClient(t, cluster.ListenAddrs()[0])
adm := kadm.NewClient(client)
// There should be no committed offsets.
offsets, err := adm.FetchOffsets(ctx, testConsumerGroup)
require.Equal(t, kerr.GroupIDNotFound, err)
require.Nil(t, offsets)
// Commit an offset, it should succeed.
m := NewCommitter(adm)
require.NoError(t, m.Commit(ctx, testTopic, 0, testConsumerGroup, 100))
// Check that the offset was committed.
offsets, err = adm.FetchOffsets(ctx, testConsumerGroup)
require.NoError(t, err)
topicOffsets, ok := offsets[testTopic]
require.True(t, ok)
require.Len(t, topicOffsets, 1)
offset := topicOffsets[0]
require.Equal(t, testTopic, offset.Topic)
require.Equal(t, int32(0), offset.Partition)
require.Equal(t, int64(100), offset.At)
// No other consumer groups should exist. If they do, we have somehow
// committed offsets for the wrong consumer group.
groups, err := adm.ListGroups(ctx)
require.NoError(t, err)
require.Len(t, groups, 1)
require.Contains(t, groups, testConsumerGroup)
}
func TestGroupCommitter_Commit(t *testing.T) {
const (
testTopic = "test-topic"
testConsumerGroup = "test-consumer-group"
)
ctx := t.Context()
// Create a fake cluster for the test.
cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(1, testTopic))
require.NoError(t, err)
t.Cleanup(cluster.Close)
client := mustKafkaClient(t, cluster.ListenAddrs()[0])
adm := kadm.NewClient(client)
m := NewGroupCommitter(adm, testTopic, testConsumerGroup)
// Commit an offset, it should succeed.
require.NoError(t, m.Commit(ctx, 0, 100))
// Check that the offset was committed.
offsets, err := adm.FetchOffsets(ctx, testConsumerGroup)
require.NoError(t, err)
topicOffsets, ok := offsets[testTopic]
require.True(t, ok)
require.Len(t, topicOffsets, 1)
offset := topicOffsets[0]
require.Equal(t, testTopic, offset.Topic)
require.Equal(t, int32(0), offset.Partition)
require.Equal(t, int64(100), offset.At)
// No other consumer groups should exist. If they do, we have somehow
// committed offsets for the wrong consumer group.
groups, err := adm.ListGroups(ctx)
require.NoError(t, err)
require.Len(t, groups, 1)
require.Contains(t, groups, testConsumerGroup)
}
func TestOffsetReader_LastCommittedOffset(t *testing.T) {
const (
testTopic = "test-topic"
testConsumerGroup = "test-consumer-group"
)
ctx := t.Context()
// Create a fake cluster for the test.
cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(1, testTopic))
require.NoError(t, err)
t.Cleanup(cluster.Close)
client := mustKafkaClient(t, cluster.ListenAddrs()[0])
adm := kadm.NewClient(client)
m := NewOffsetReader(client, testTopic, testConsumerGroup, log.NewNopLogger())
// There should be no committed offsets.
offset, err := m.LastCommittedOffset(ctx, 0)
require.NoError(t, err)
// -2 is a special offset which means start offset.
require.Equal(t, int64(-2), offset)
// Commit an offset, it should be returned in the next call.
toCommit := kadm.Offsets{}
toCommit.AddOffset(testTopic, 0, 100, -1)
_, err = adm.CommitOffsets(ctx, testConsumerGroup, toCommit)
require.NoError(t, err)
offset, err = m.LastCommittedOffset(ctx, 0)
require.NoError(t, err)
require.Equal(t, int64(100), offset)
}
func TestOffsetReader_EndOffset(t *testing.T) {
const (
testTopic = "test-topic"
testConsumerGroup = "test-consumer-group"
)
ctx := t.Context()
// Create a fake cluster for the test.
cluster, err := kfake.NewCluster(kfake.NumBrokers(1), kfake.SeedTopics(1, testTopic))
require.NoError(t, err)
t.Cleanup(cluster.Close)
client := mustKafkaClient(t, cluster.ListenAddrs()[0])
m := NewOffsetReader(client, testTopic, testConsumerGroup, log.NewNopLogger())
// The offset should be 0 as no records have been produced.
offset, err := m.EndOffset(ctx, 0)
require.NoError(t, err)
require.Equal(t, int64(0), offset)
// Produce a record, the end offset should be 1.
res := client.ProduceSync(ctx, &kgo.Record{
Topic: testTopic,
Key: []byte("foo"),
Value: []byte("bar"),
Timestamp: time.Now(),
})
require.NoError(t, res.FirstErr())
offset, err = m.EndOffset(ctx, 0)
require.NoError(t, err)
require.Equal(t, int64(1), offset)
// Produce another record, the end offset should be 2.
res = client.ProduceSync(ctx, &kgo.Record{
Topic: testTopic,
Key: []byte("baz"),
Value: []byte("qux"),
Timestamp: time.Now(),
})
require.NoError(t, res.FirstErr())
offset, err = m.EndOffset(ctx, 0)
require.NoError(t, err)
require.Equal(t, int64(2), offset)
}

24
pkg/kafkav2/util_test.go Normal file
View File

@@ -0,0 +1,24 @@
package kafkav2
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
)
// mustKafkaClient returns a new Kafka client for tests. It fails the test
// if an error occurs.
func mustKafkaClient(t *testing.T, seed string, opts ...kgo.Opt) *kgo.Client {
clientOpts := []kgo.Opt{
kgo.SeedBrokers(seed),
kgo.AllowAutoTopicCreation(),
// We will choose the partition of each record.
kgo.RecordPartitioner(kgo.ManualPartitioner()),
}
clientOpts = append(clientOpts, opts...)
client, err := kgo.NewClient(clientOpts...)
require.NoError(t, err)
t.Cleanup(client.Close)
return client
}