From 4b07d66045e75a534c495a432b7d2a51064b04a0 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Fri, 21 Feb 2025 16:35:09 +0100 Subject: [PATCH 1/8] Init code Signed-off-by: Thomas Poignant --- datastruct/even_store_test.go | 158 ++++++++++++++++++++++++++++++++++ datastruct/event_store.go | 148 +++++++++++++++++++++++++++++++ go.mod | 10 +++ go.sum | 9 ++ 4 files changed, 325 insertions(+) create mode 100644 datastruct/even_store_test.go create mode 100644 datastruct/event_store.go create mode 100644 go.mod create mode 100644 go.sum diff --git a/datastruct/even_store_test.go b/datastruct/even_store_test.go new file mode 100644 index 0000000..1a4a683 --- /dev/null +++ b/datastruct/even_store_test.go @@ -0,0 +1,158 @@ +package datastruct_test + +import ( + "context" + "fmt" + "github.com/stretchr/testify/assert" + "math/rand" + "sync" + "testing" + "time" + "untitled/datastruct" +) + +func Test_ConsumerNameInvalid(t *testing.T) { + t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { + consumerNames := []string{"consumer1"} + eventStore := datastruct.NewEventStore[string](consumerNames) + _, err := eventStore.GetSizeQueue("wrong name") + assert.NotNil(t, err) + }) + t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { + consumerNames := []string{"consumer1"} + eventStore := datastruct.NewEventStore[string](consumerNames) + _, err := eventStore.GetEvents("wrong name") + assert.NotNil(t, err) + }) +} + +func Test_SingleConsumer(t *testing.T) { + consumerNames := []string{"consumer1"} + eventStore := datastruct.NewEventStore[string](consumerNames) + + got, _ := eventStore.GetSizeQueue(consumerNames[0]) + assert.Equal(t, int64(0), got) + assert.Equal(t, int64(0), eventStore.GetCacheSize()) + + // start producer + ctx, cancel := context.WithCancel(context.Background()) + go startEventProducer(ctx, eventStore, 100, false) + time.Sleep(50 * time.Millisecond) + got, _ = eventStore.GetSizeQueue(consumerNames[0]) + assert.Equal(t, int64(100), got) + cancel() // stop producing + + // Consume + events, _ := eventStore.GetEvents(consumerNames[0]) + assert.Equal(t, 100, len(events)) + got, _ = eventStore.GetSizeQueue(consumerNames[0]) + assert.Equal(t, int64(0), got) + + // restart producing + ctx2, cancel2 := context.WithCancel(context.Background()) + defer cancel2() + go startEventProducer(ctx2, eventStore, 91, false) + time.Sleep(50 * time.Millisecond) + got, _ = eventStore.GetSizeQueue(consumerNames[0]) + assert.Equal(t, int64(91), got) + events, _ = eventStore.GetEvents(consumerNames[0]) + assert.Equal(t, 91, len(events)) + + time.Sleep(50 * time.Millisecond) // to wait until garbage collector remove the events + assert.Equal(t, int64(0), eventStore.GetCacheSize()) +} + +func Test_MultipleConsumersSingleThread(t *testing.T) { + consumerNames := []string{"consumer1", "consumer2"} + eventStore := datastruct.NewEventStore[string](consumerNames) + + // start producer + ctx, cancelProducer1 := context.WithCancel(context.Background()) + defer cancelProducer1() + go startEventProducer(ctx, eventStore, 1000, false) + time.Sleep(50 * time.Millisecond) + cancelProducer1() + assert.Equal(t, int64(1000), eventStore.GetCacheSize()) + + // Consume with Consumer1 only + consumer1Size, err := eventStore.GetSizeQueue(consumerNames[0]) + assert.Nil(t, err) + assert.Equal(t, int64(1000), consumer1Size) + eventsConsumer1, err := eventStore.GetEvents(consumerNames[0]) + assert.Nil(t, err) + assert.Equal(t, 1000, len(eventsConsumer1)) + + // Produce a second time + ctx, cancelProducer2 := context.WithCancel(context.Background()) + defer cancelProducer2() + go startEventProducer(ctx, eventStore, 1000, false) + time.Sleep(50 * time.Millisecond) + cancelProducer2() + + // Check queue size + assert.Equal(t, int64(2000), eventStore.GetCacheSize()) + consumer1Size, err = eventStore.GetSizeQueue(consumerNames[0]) + assert.Nil(t, err) + assert.Equal(t, int64(1000), consumer1Size) + consumer2Size, err := eventStore.GetSizeQueue(consumerNames[1]) + assert.Nil(t, err) + assert.Equal(t, int64(2000), consumer2Size) + + // Consumer with Consumer1 and Consumer2 + eventsConsumer1, err = eventStore.GetEvents(consumerNames[0]) + assert.Nil(t, err) + assert.Equal(t, 1000, len(eventsConsumer1)) + eventsConsumer2, err := eventStore.GetEvents(consumerNames[1]) + assert.Nil(t, err) + assert.Equal(t, 2000, len(eventsConsumer2)) + + // Check garbage collector + time.Sleep(50 * time.Millisecond) + assert.Equal(t, int64(0), eventStore.GetCacheSize()) +} + +func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { + consumerNames := []string{"consumer1", "consumer2"} + eventStore := datastruct.NewEventStore[string](consumerNames) + + // start producer + ctx, cancelProducer1 := context.WithCancel(context.Background()) + defer cancelProducer1() + go startEventProducer(ctx, eventStore, 100000, true) + time.Sleep(50 * time.Millisecond) + wg := &sync.WaitGroup{} + + consumFunc := func(eventStore datastruct.EventStore[string], consumerName string) { + wg.Add(1) + defer wg.Done() + events, err := eventStore.GetEvents(consumerName) + assert.Nil(t, err) + + assert.True(t, len(events) > 0) + time.Sleep(50 * time.Millisecond) // we wait to be sure that the producer has produce new events + events, err = eventStore.GetEvents(consumerName) + assert.Nil(t, err) + assert.True(t, len(events) > 0) + } + + go consumFunc(eventStore, consumerNames[0]) + go consumFunc(eventStore, consumerNames[1]) + wg.Wait() + cancelProducer1() +} + +func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[string], produceMax int, randomizeProducingTime bool) { + for i := 0; i < produceMax; i++ { + select { + case <-ctx.Done(): + fmt.Println("Goroutine stopped") + return + default: + if randomizeProducingTime { + randomNumber := rand.Intn(10) + 1 + time.Sleep(time.Duration(randomNumber) * time.Millisecond) + } + eventStore.Push("Hello") + } + } +} diff --git a/datastruct/event_store.go b/datastruct/event_store.go new file mode 100644 index 0000000..b1114e5 --- /dev/null +++ b/datastruct/event_store.go @@ -0,0 +1,148 @@ +package datastruct + +import ( + "fmt" + "math" + "sync" +) + +const minOffset = int64(math.MinInt64) + +type eventStoreImpl[T any] struct { + Events []event[T] + Mutex sync.RWMutex + Consumers *sync.Map + consumerNames []string + lastOffset int64 +} + +func NewEventStore[T any](consumerNames []string) EventStore[T] { + consumers := sync.Map{} + for _, name := range consumerNames { + consumers.Store(name, consumer{Offset: minOffset}) + } + return &eventStoreImpl[T]{ + Events: make([]event[T], 0), + Consumers: &consumers, + Mutex: sync.RWMutex{}, + consumerNames: consumerNames, + lastOffset: minOffset, + } +} + +type EventStore[T any] interface { + Push(data T) + GetEvents(consumerName string) ([]event[T], error) + GetSizeQueue(consumerName string) (int64, error) + GetCacheSize() int64 +} + +type event[T any] struct { + Offset int64 + Data T +} + +type consumer struct { + Offset int64 +} + +// GetCacheSize return the number of events in the cache +func (e *eventStoreImpl[T]) GetCacheSize() int64 { + e.Mutex.RLock() + defer e.Mutex.RUnlock() + return int64(len(e.Events)) +} + +// GetSizeQueue return the number of events that have not been consumed by the consumer +func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { + e.Mutex.RLock() + defer e.Mutex.RUnlock() + + consumer, err := e.getConsumer(consumerName) + if err != nil { + return 0, err + } + + if e.Events == nil || len(e.Events) == 0 { + return 0, nil + } + return e.lastOffset - consumer.Offset, nil +} + +// GetConsumer return the consumer with the given name +func (e *eventStoreImpl[T]) getConsumer(name string) (consumer, error) { + if e.Consumers == nil { + return consumer{}, fmt.Errorf("consumer not found") + } + if value, ok := e.Consumers.Load(name); ok { + typedValue, ok := value.(consumer) + if ok { + return typedValue, nil + } + } + return consumer{}, fmt.Errorf("invalid consumer found") +} + +// Push add a new event to the store +func (e *eventStoreImpl[T]) Push(data T) { + e.Mutex.Lock() + defer e.Mutex.Unlock() + e.lastOffset++ + e.Events = append(e.Events, event[T]{Offset: e.lastOffset, Data: data}) +} + +// GetEvents return all events that have not been consumed by the consumer +func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { + e.Mutex.RLock() + defer e.Mutex.RUnlock() + events := make([]event[T], 0) + currentConsumer, err := e.getConsumer(consumerName) + if err != nil { + return nil, fmt.Errorf("consumer with name %s not found: %s", consumerName, err) + } + for _, event := range e.Events { + if event.Offset > currentConsumer.Offset { + events = append(events, event) + } + } + if len(events) > 0 { + e.Consumers.Store(consumerName, consumer{Offset: e.lastOffset}) + } + + // trigger queue cleaning to remove all events that have been consumed by all consumers, we can optimize + // that part by calling this goroutine only on certain conditions and not every time we call GetEvents + go e.cleanQueue() + + return events, nil +} + +// cleanQueue remove all events that have been consumed by all consumers +func (e *eventStoreImpl[T]) cleanQueue() { + e.Mutex.Lock() + defer e.Mutex.Unlock() + if e.Events == nil || len(e.Events) == 0 { + // nothing to remove + return + } + consumerMinOffset := minOffset + for _, consumer := range e.consumerNames { + currentConsumer, _ := e.getConsumer(consumer) + if consumerMinOffset == minOffset || currentConsumer.Offset < consumerMinOffset { + consumerMinOffset = currentConsumer.Offset + } + } + if consumerMinOffset <= minOffset { + // nothing to remove + return + } + + index := 0 + for i, event := range e.Events { + if event.Offset == consumerMinOffset { + index = i + break + } + } + + e.Events = e.Events[index+1:] +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..4f34ced --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module untitled + +go 1.23 + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/stretchr/testify v1.10.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fe99d71 --- /dev/null +++ b/go.sum @@ -0,0 +1,9 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 9da08dce46aed0d6e39a0c1ec44ac0d1ed6c4014 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Fri, 21 Feb 2025 17:11:08 +0100 Subject: [PATCH 2/8] Add time based queue cleaning Signed-off-by: Thomas Poignant --- datastruct/even_store_test.go | 18 ++++++++++-------- datastruct/event_store.go | 35 +++++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/datastruct/even_store_test.go b/datastruct/even_store_test.go index 1a4a683..bde82ef 100644 --- a/datastruct/even_store_test.go +++ b/datastruct/even_store_test.go @@ -11,16 +11,18 @@ import ( "untitled/datastruct" ) +const defaultCleanQueueDuration = 100 * time.Millisecond + func Test_ConsumerNameInvalid(t *testing.T) { t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { consumerNames := []string{"consumer1"} - eventStore := datastruct.NewEventStore[string](consumerNames) + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) _, err := eventStore.GetSizeQueue("wrong name") assert.NotNil(t, err) }) t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { consumerNames := []string{"consumer1"} - eventStore := datastruct.NewEventStore[string](consumerNames) + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) _, err := eventStore.GetEvents("wrong name") assert.NotNil(t, err) }) @@ -28,7 +30,7 @@ func Test_ConsumerNameInvalid(t *testing.T) { func Test_SingleConsumer(t *testing.T) { consumerNames := []string{"consumer1"} - eventStore := datastruct.NewEventStore[string](consumerNames) + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) got, _ := eventStore.GetSizeQueue(consumerNames[0]) assert.Equal(t, int64(0), got) @@ -58,13 +60,13 @@ func Test_SingleConsumer(t *testing.T) { events, _ = eventStore.GetEvents(consumerNames[0]) assert.Equal(t, 91, len(events)) - time.Sleep(50 * time.Millisecond) // to wait until garbage collector remove the events + time.Sleep(120 * time.Millisecond) // to wait until garbage collector remove the events assert.Equal(t, int64(0), eventStore.GetCacheSize()) } func Test_MultipleConsumersSingleThread(t *testing.T) { consumerNames := []string{"consumer1", "consumer2"} - eventStore := datastruct.NewEventStore[string](consumerNames) + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) // start producer ctx, cancelProducer1 := context.WithCancel(context.Background()) @@ -107,13 +109,13 @@ func Test_MultipleConsumersSingleThread(t *testing.T) { assert.Equal(t, 2000, len(eventsConsumer2)) // Check garbage collector - time.Sleep(50 * time.Millisecond) + time.Sleep(120 * time.Millisecond) assert.Equal(t, int64(0), eventStore.GetCacheSize()) } func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { consumerNames := []string{"consumer1", "consumer2"} - eventStore := datastruct.NewEventStore[string](consumerNames) + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) // start producer ctx, cancelProducer1 := context.WithCancel(context.Background()) @@ -152,7 +154,7 @@ func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[st randomNumber := rand.Intn(10) + 1 time.Sleep(time.Duration(randomNumber) * time.Millisecond) } - eventStore.Push("Hello") + eventStore.Add("Hello") } } } diff --git a/datastruct/event_store.go b/datastruct/event_store.go index b1114e5..65893ee 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "sync" + "time" ) const minOffset = int64(math.MinInt64) @@ -16,22 +17,24 @@ type eventStoreImpl[T any] struct { lastOffset int64 } -func NewEventStore[T any](consumerNames []string) EventStore[T] { +func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] { consumers := sync.Map{} for _, name := range consumerNames { consumers.Store(name, consumer{Offset: minOffset}) } - return &eventStoreImpl[T]{ + store := &eventStoreImpl[T]{ Events: make([]event[T], 0), Consumers: &consumers, Mutex: sync.RWMutex{}, consumerNames: consumerNames, lastOffset: minOffset, } + go store.periodicCleanQueue(cleanQueueDuration) + return store } type EventStore[T any] interface { - Push(data T) + Add(data T) GetEvents(consumerName string) ([]event[T], error) GetSizeQueue(consumerName string) (int64, error) GetCacheSize() int64 @@ -46,14 +49,14 @@ type consumer struct { Offset int64 } -// GetCacheSize return the number of events in the cache +// GetCacheSize returns the number of events in the cache func (e *eventStoreImpl[T]) GetCacheSize() int64 { e.Mutex.RLock() defer e.Mutex.RUnlock() return int64(len(e.Events)) } -// GetSizeQueue return the number of events that have not been consumed by the consumer +// GetSizeQueue returns the number of events that have not been consumed by the consumer func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { e.Mutex.RLock() defer e.Mutex.RUnlock() @@ -69,7 +72,7 @@ func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { return e.lastOffset - consumer.Offset, nil } -// GetConsumer return the consumer with the given name +// GetConsumer returns the consumer with the given name func (e *eventStoreImpl[T]) getConsumer(name string) (consumer, error) { if e.Consumers == nil { return consumer{}, fmt.Errorf("consumer not found") @@ -83,15 +86,15 @@ func (e *eventStoreImpl[T]) getConsumer(name string) (consumer, error) { return consumer{}, fmt.Errorf("invalid consumer found") } -// Push add a new event to the store -func (e *eventStoreImpl[T]) Push(data T) { +// Add adds a new event to the store +func (e *eventStoreImpl[T]) Add(data T) { e.Mutex.Lock() defer e.Mutex.Unlock() e.lastOffset++ e.Events = append(e.Events, event[T]{Offset: e.lastOffset, Data: data}) } -// GetEvents return all events that have not been consumed by the consumer +// GetEvents returns all events that have not been consumed by the consumer func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { e.Mutex.RLock() defer e.Mutex.RUnlock() @@ -109,14 +112,10 @@ func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { e.Consumers.Store(consumerName, consumer{Offset: e.lastOffset}) } - // trigger queue cleaning to remove all events that have been consumed by all consumers, we can optimize - // that part by calling this goroutine only on certain conditions and not every time we call GetEvents - go e.cleanQueue() - return events, nil } -// cleanQueue remove all events that have been consumed by all consumers +// cleanQueue removes all events that have been consumed by all consumers func (e *eventStoreImpl[T]) cleanQueue() { e.Mutex.Lock() defer e.Mutex.Unlock() @@ -146,3 +145,11 @@ func (e *eventStoreImpl[T]) cleanQueue() { e.Events = e.Events[index+1:] } + +// periodicCleanQueue periodically cleans the queue +func (e *eventStoreImpl[T]) periodicCleanQueue(cleanQueueDuration time.Duration) { + for { + time.Sleep(cleanQueueDuration) + e.cleanQueue() + } +} From 2aaa1e1fe5203bcbb3f88bd65aa2a2a1a1128920 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Fri, 21 Feb 2025 17:26:44 +0100 Subject: [PATCH 3/8] change lock mechanism Signed-off-by: Thomas Poignant --- datastruct/event_store.go | 76 +++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/datastruct/event_store.go b/datastruct/event_store.go index 65893ee..98619e3 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -10,24 +10,22 @@ import ( const minOffset = int64(math.MinInt64) type eventStoreImpl[T any] struct { - Events []event[T] - Mutex sync.RWMutex - Consumers *sync.Map - consumerNames []string - lastOffset int64 + events []event[T] + mutex sync.RWMutex + consumers map[string]consumer + lastOffset int64 } func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] { - consumers := sync.Map{} + consumers := map[string]consumer{} for _, name := range consumerNames { - consumers.Store(name, consumer{Offset: minOffset}) + consumers[name] = consumer{Offset: minOffset} } store := &eventStoreImpl[T]{ - Events: make([]event[T], 0), - Consumers: &consumers, - Mutex: sync.RWMutex{}, - consumerNames: consumerNames, - lastOffset: minOffset, + events: make([]event[T], 0), + consumers: consumers, + mutex: sync.RWMutex{}, + lastOffset: minOffset, } go store.periodicCleanQueue(cleanQueueDuration) return store @@ -51,22 +49,22 @@ type consumer struct { // GetCacheSize returns the number of events in the cache func (e *eventStoreImpl[T]) GetCacheSize() int64 { - e.Mutex.RLock() - defer e.Mutex.RUnlock() - return int64(len(e.Events)) + e.mutex.RLock() + defer e.mutex.RUnlock() + return int64(len(e.events)) } // GetSizeQueue returns the number of events that have not been consumed by the consumer func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { - e.Mutex.RLock() - defer e.Mutex.RUnlock() + e.mutex.RLock() + defer e.mutex.RUnlock() consumer, err := e.getConsumer(consumerName) if err != nil { return 0, err } - if e.Events == nil || len(e.Events) == 0 { + if e.events == nil || len(e.events) == 0 { return 0, nil } return e.lastOffset - consumer.Offset, nil @@ -74,58 +72,56 @@ func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { // GetConsumer returns the consumer with the given name func (e *eventStoreImpl[T]) getConsumer(name string) (consumer, error) { - if e.Consumers == nil { + if e.consumers == nil { return consumer{}, fmt.Errorf("consumer not found") } - if value, ok := e.Consumers.Load(name); ok { - typedValue, ok := value.(consumer) - if ok { - return typedValue, nil - } + if value, ok := e.consumers[name]; ok { + return value, nil } return consumer{}, fmt.Errorf("invalid consumer found") } // Add adds a new event to the store func (e *eventStoreImpl[T]) Add(data T) { - e.Mutex.Lock() - defer e.Mutex.Unlock() + e.mutex.Lock() + defer e.mutex.Unlock() e.lastOffset++ - e.Events = append(e.Events, event[T]{Offset: e.lastOffset, Data: data}) + e.events = append(e.events, event[T]{Offset: e.lastOffset, Data: data}) } // GetEvents returns all events that have not been consumed by the consumer func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { - e.Mutex.RLock() - defer e.Mutex.RUnlock() - events := make([]event[T], 0) + e.mutex.RLock() currentConsumer, err := e.getConsumer(consumerName) if err != nil { + e.mutex.RUnlock() return nil, fmt.Errorf("consumer with name %s not found: %s", consumerName, err) } - for _, event := range e.Events { + events := make([]event[T], 0) + for _, event := range e.events { if event.Offset > currentConsumer.Offset { events = append(events, event) } } + e.mutex.RUnlock() if len(events) > 0 { - e.Consumers.Store(consumerName, consumer{Offset: e.lastOffset}) + e.mutex.Lock() + e.consumers[consumerName] = consumer{Offset: e.lastOffset} + e.mutex.Unlock() } - return events, nil } // cleanQueue removes all events that have been consumed by all consumers func (e *eventStoreImpl[T]) cleanQueue() { - e.Mutex.Lock() - defer e.Mutex.Unlock() - if e.Events == nil || len(e.Events) == 0 { + e.mutex.Lock() + defer e.mutex.Unlock() + if e.events == nil || len(e.events) == 0 { // nothing to remove return } consumerMinOffset := minOffset - for _, consumer := range e.consumerNames { - currentConsumer, _ := e.getConsumer(consumer) + for _, currentConsumer := range e.consumers { if consumerMinOffset == minOffset || currentConsumer.Offset < consumerMinOffset { consumerMinOffset = currentConsumer.Offset } @@ -136,14 +132,14 @@ func (e *eventStoreImpl[T]) cleanQueue() { } index := 0 - for i, event := range e.Events { + for i, event := range e.events { if event.Offset == consumerMinOffset { index = i break } } - e.Events = e.Events[index+1:] + e.events = e.events[index+1:] } // periodicCleanQueue periodically cleans the queue From a0ba69fc369848279d0fbbdda3c84f8b32579ba4 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Fri, 21 Feb 2025 17:36:10 +0100 Subject: [PATCH 4/8] add function to stop the cleaning Signed-off-by: Thomas Poignant --- datastruct/even_store_test.go | 13 ++++++------ datastruct/event_store.go | 37 ++++++++++++++++++++++++++--------- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/datastruct/even_store_test.go b/datastruct/even_store_test.go index bde82ef..9f73338 100644 --- a/datastruct/even_store_test.go +++ b/datastruct/even_store_test.go @@ -17,13 +17,15 @@ func Test_ConsumerNameInvalid(t *testing.T) { t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { consumerNames := []string{"consumer1"} eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) + defer eventStore.Stop() _, err := eventStore.GetSizeQueue("wrong name") assert.NotNil(t, err) }) t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { consumerNames := []string{"consumer1"} - eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) - _, err := eventStore.GetEvents("wrong name") + eventStore2 := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) + defer eventStore2.Stop() + _, err := eventStore2.GetEvents("wrong name") assert.NotNil(t, err) }) } @@ -31,7 +33,7 @@ func Test_ConsumerNameInvalid(t *testing.T) { func Test_SingleConsumer(t *testing.T) { consumerNames := []string{"consumer1"} eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) - + defer eventStore.Stop() got, _ := eventStore.GetSizeQueue(consumerNames[0]) assert.Equal(t, int64(0), got) assert.Equal(t, int64(0), eventStore.GetCacheSize()) @@ -67,7 +69,7 @@ func Test_SingleConsumer(t *testing.T) { func Test_MultipleConsumersSingleThread(t *testing.T) { consumerNames := []string{"consumer1", "consumer2"} eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) - + defer eventStore.Stop() // start producer ctx, cancelProducer1 := context.WithCancel(context.Background()) defer cancelProducer1() @@ -116,7 +118,7 @@ func Test_MultipleConsumersSingleThread(t *testing.T) { func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { consumerNames := []string{"consumer1", "consumer2"} eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) - + defer eventStore.Stop() // start producer ctx, cancelProducer1 := context.WithCancel(context.Background()) defer cancelProducer1() @@ -140,7 +142,6 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { go consumFunc(eventStore, consumerNames[0]) go consumFunc(eventStore, consumerNames[1]) wg.Wait() - cancelProducer1() } func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[string], produceMax int, randomizeProducingTime bool) { diff --git a/datastruct/event_store.go b/datastruct/event_store.go index 98619e3..4598850 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -10,10 +10,16 @@ import ( const minOffset = int64(math.MinInt64) type eventStoreImpl[T any] struct { - events []event[T] - mutex sync.RWMutex - consumers map[string]consumer + // events is a list of events to store + events []event[T] + // mutex to protect the events and consumers + mutex sync.RWMutex + // consumers is a map of consumers with their name as key + consumers map[string]consumer + // lastOffset is the last offset used for the event store lastOffset int64 + // stopPeriodicCleaning is a channel to stop the periodic cleaning goroutine + stopPeriodicCleaning chan struct{} } func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] { @@ -22,10 +28,11 @@ func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Durati consumers[name] = consumer{Offset: minOffset} } store := &eventStoreImpl[T]{ - events: make([]event[T], 0), - consumers: consumers, - mutex: sync.RWMutex{}, - lastOffset: minOffset, + events: make([]event[T], 0), + consumers: consumers, + mutex: sync.RWMutex{}, + lastOffset: minOffset, + stopPeriodicCleaning: make(chan struct{}), } go store.periodicCleanQueue(cleanQueueDuration) return store @@ -36,6 +43,7 @@ type EventStore[T any] interface { GetEvents(consumerName string) ([]event[T], error) GetSizeQueue(consumerName string) (int64, error) GetCacheSize() int64 + Stop() } type event[T any] struct { @@ -144,8 +152,19 @@ func (e *eventStoreImpl[T]) cleanQueue() { // periodicCleanQueue periodically cleans the queue func (e *eventStoreImpl[T]) periodicCleanQueue(cleanQueueDuration time.Duration) { + ticker := time.NewTicker(cleanQueueDuration) + defer ticker.Stop() for { - time.Sleep(cleanQueueDuration) - e.cleanQueue() + select { + case <-ticker.C: + e.cleanQueue() + case <-e.stopPeriodicCleaning: + return + } } } + +// Stop stops the periodic clean queue goroutine +func (e *eventStoreImpl[T]) Stop() { + close(e.stopPeriodicCleaning) +} From 538e8e820337f3e7dc6e94841c89fbd6522bffa2 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Fri, 21 Feb 2025 19:40:23 +0100 Subject: [PATCH 5/8] Use memory pointer Signed-off-by: Thomas Poignant --- datastruct/event_store.go | 37 ++++++++++++++----------------------- 1 file changed, 14 insertions(+), 23 deletions(-) diff --git a/datastruct/event_store.go b/datastruct/event_store.go index 4598850..6b95406 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -15,7 +15,7 @@ type eventStoreImpl[T any] struct { // mutex to protect the events and consumers mutex sync.RWMutex // consumers is a map of consumers with their name as key - consumers map[string]consumer + consumers map[string]*consumer // lastOffset is the last offset used for the event store lastOffset int64 // stopPeriodicCleaning is a channel to stop the periodic cleaning goroutine @@ -23,9 +23,9 @@ type eventStoreImpl[T any] struct { } func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] { - consumers := map[string]consumer{} + consumers := make(map[string]*consumer, len(consumerNames)) for _, name := range consumerNames { - consumers[name] = consumer{Offset: minOffset} + consumers[name] = &consumer{Offset: minOffset} } store := &eventStoreImpl[T]{ events: make([]event[T], 0), @@ -67,9 +67,9 @@ func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { e.mutex.RLock() defer e.mutex.RUnlock() - consumer, err := e.getConsumer(consumerName) - if err != nil { - return 0, err + consumer, ok := e.consumers[consumerName] + if !ok { + return 0, fmt.Errorf("consumer with name %s not found", consumerName) } if e.events == nil || len(e.events) == 0 { @@ -78,17 +78,6 @@ func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { return e.lastOffset - consumer.Offset, nil } -// GetConsumer returns the consumer with the given name -func (e *eventStoreImpl[T]) getConsumer(name string) (consumer, error) { - if e.consumers == nil { - return consumer{}, fmt.Errorf("consumer not found") - } - if value, ok := e.consumers[name]; ok { - return value, nil - } - return consumer{}, fmt.Errorf("invalid consumer found") -} - // Add adds a new event to the store func (e *eventStoreImpl[T]) Add(data T) { e.mutex.Lock() @@ -100,10 +89,10 @@ func (e *eventStoreImpl[T]) Add(data T) { // GetEvents returns all events that have not been consumed by the consumer func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { e.mutex.RLock() - currentConsumer, err := e.getConsumer(consumerName) - if err != nil { - e.mutex.RUnlock() - return nil, fmt.Errorf("consumer with name %s not found: %s", consumerName, err) + defer e.mutex.RUnlock() + currentConsumer, ok := e.consumers[consumerName] + if !ok { + return nil, fmt.Errorf("consumer with name %s not found", consumerName) } events := make([]event[T], 0) for _, event := range e.events { @@ -111,11 +100,13 @@ func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { events = append(events, event) } } - e.mutex.RUnlock() + if len(events) > 0 { + e.mutex.RUnlock() e.mutex.Lock() - e.consumers[consumerName] = consumer{Offset: e.lastOffset} + e.consumers[consumerName].Offset = e.lastOffset e.mutex.Unlock() + e.mutex.RLock() } return events, nil } From 262958294eccd05a4d12f90b9b4d3d1b887d9505 Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Sat, 22 Feb 2025 10:26:44 +0100 Subject: [PATCH 6/8] move duration in the struct Signed-off-by: Thomas Poignant --- datastruct/event_store.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/datastruct/event_store.go b/datastruct/event_store.go index 6b95406..505ee0d 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -20,6 +20,8 @@ type eventStoreImpl[T any] struct { lastOffset int64 // stopPeriodicCleaning is a channel to stop the periodic cleaning goroutine stopPeriodicCleaning chan struct{} + // cleanQueueDuration is the duration between each cleaning + cleanQueueDuration time.Duration } func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] { @@ -33,8 +35,9 @@ func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Durati mutex: sync.RWMutex{}, lastOffset: minOffset, stopPeriodicCleaning: make(chan struct{}), + cleanQueueDuration: cleanQueueDuration, } - go store.periodicCleanQueue(cleanQueueDuration) + go store.periodicCleanQueue() return store } @@ -71,10 +74,6 @@ func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { if !ok { return 0, fmt.Errorf("consumer with name %s not found", consumerName) } - - if e.events == nil || len(e.events) == 0 { - return 0, nil - } return e.lastOffset - consumer.Offset, nil } @@ -130,20 +129,18 @@ func (e *eventStoreImpl[T]) cleanQueue() { return } - index := 0 for i, event := range e.events { if event.Offset == consumerMinOffset { - index = i + e.events = e.events[i+1:] break } } - e.events = e.events[index+1:] } // periodicCleanQueue periodically cleans the queue -func (e *eventStoreImpl[T]) periodicCleanQueue(cleanQueueDuration time.Duration) { - ticker := time.NewTicker(cleanQueueDuration) +func (e *eventStoreImpl[T]) periodicCleanQueue() { + ticker := time.NewTicker(e.cleanQueueDuration) defer ticker.Stop() for { select { From 3fa1c57935922280d85d67639094b86be92e06ea Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Fri, 28 Feb 2025 20:20:13 +0100 Subject: [PATCH 7/8] Add setOffset function Signed-off-by: Thomas Poignant --- datastruct/even_store_test.go | 37 ++++++++++++++++++++++++++++------- datastruct/event_store.go | 26 +++++++++++++++--------- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/datastruct/even_store_test.go b/datastruct/even_store_test.go index 9f73338..02bdca2 100644 --- a/datastruct/even_store_test.go +++ b/datastruct/even_store_test.go @@ -48,7 +48,8 @@ func Test_SingleConsumer(t *testing.T) { // Consume events, _ := eventStore.GetEvents(consumerNames[0]) - assert.Equal(t, 100, len(events)) + assert.Equal(t, 100, len(events.Events)) + eventStore.SetOffset(consumerNames[0], events.NewOffset) got, _ = eventStore.GetSizeQueue(consumerNames[0]) assert.Equal(t, int64(0), got) @@ -60,7 +61,8 @@ func Test_SingleConsumer(t *testing.T) { got, _ = eventStore.GetSizeQueue(consumerNames[0]) assert.Equal(t, int64(91), got) events, _ = eventStore.GetEvents(consumerNames[0]) - assert.Equal(t, 91, len(events)) + eventStore.SetOffset(consumerNames[0], events.NewOffset) + assert.Equal(t, 91, len(events.Events)) time.Sleep(120 * time.Millisecond) // to wait until garbage collector remove the events assert.Equal(t, int64(0), eventStore.GetCacheSize()) @@ -84,7 +86,8 @@ func Test_MultipleConsumersSingleThread(t *testing.T) { assert.Equal(t, int64(1000), consumer1Size) eventsConsumer1, err := eventStore.GetEvents(consumerNames[0]) assert.Nil(t, err) - assert.Equal(t, 1000, len(eventsConsumer1)) + assert.Equal(t, 1000, len(eventsConsumer1.Events)) + eventStore.SetOffset(consumerNames[0], eventsConsumer1.NewOffset) // Produce a second time ctx, cancelProducer2 := context.WithCancel(context.Background()) @@ -105,10 +108,12 @@ func Test_MultipleConsumersSingleThread(t *testing.T) { // Consumer with Consumer1 and Consumer2 eventsConsumer1, err = eventStore.GetEvents(consumerNames[0]) assert.Nil(t, err) - assert.Equal(t, 1000, len(eventsConsumer1)) + assert.Equal(t, 1000, len(eventsConsumer1.Events)) + eventStore.SetOffset(consumerNames[0], eventsConsumer1.NewOffset) eventsConsumer2, err := eventStore.GetEvents(consumerNames[1]) assert.Nil(t, err) - assert.Equal(t, 2000, len(eventsConsumer2)) + assert.Equal(t, 2000, len(eventsConsumer2.Events)) + eventStore.SetOffset(consumerNames[1], eventsConsumer1.NewOffset) // Check garbage collector time.Sleep(120 * time.Millisecond) @@ -131,12 +136,14 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { defer wg.Done() events, err := eventStore.GetEvents(consumerName) assert.Nil(t, err) + eventStore.SetOffset(consumerName, events.NewOffset) - assert.True(t, len(events) > 0) + assert.True(t, len(events.Events) > 0) time.Sleep(50 * time.Millisecond) // we wait to be sure that the producer has produce new events events, err = eventStore.GetEvents(consumerName) assert.Nil(t, err) - assert.True(t, len(events) > 0) + eventStore.SetOffset(consumerName, events.NewOffset) + assert.True(t, len(events.Events) > 0) } go consumFunc(eventStore, consumerNames[0]) @@ -144,6 +151,22 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { wg.Wait() } +func Test_MultipleGetEventsWithoutSettingOffset(t *testing.T) { + consumerNames := []string{"consumer1"} + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) + defer eventStore.Stop() + + // start producer + ctx := context.Background() + startEventProducer(ctx, eventStore, 100, false) + + firstCall, err := eventStore.GetEvents(consumerNames[0]) + assert.Nil(t, err) + secondCall, err := eventStore.GetEvents(consumerNames[0]) + assert.Nil(t, err) + assert.Equal(t, firstCall, secondCall) +} + func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[string], produceMax int, randomizeProducingTime bool) { for i := 0; i < produceMax; i++ { select { diff --git a/datastruct/event_store.go b/datastruct/event_store.go index 505ee0d..96f09e7 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -41,11 +41,18 @@ func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Durati return store } +type EventList[T any] struct { + Events []event[T] + InitialOffset int64 + NewOffset int64 +} + type EventStore[T any] interface { Add(data T) - GetEvents(consumerName string) ([]event[T], error) + GetEvents(consumerName string) (*EventList[T], error) GetSizeQueue(consumerName string) (int64, error) GetCacheSize() int64 + SetOffset(consumerName string, offset int64) Stop() } @@ -86,7 +93,7 @@ func (e *eventStoreImpl[T]) Add(data T) { } // GetEvents returns all events that have not been consumed by the consumer -func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { +func (e *eventStoreImpl[T]) GetEvents(consumerName string) (*EventList[T], error) { e.mutex.RLock() defer e.mutex.RUnlock() currentConsumer, ok := e.consumers[consumerName] @@ -99,15 +106,16 @@ func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) { events = append(events, event) } } + return &EventList[T]{Events: events, InitialOffset: currentConsumer.Offset, NewOffset: e.lastOffset}, nil +} - if len(events) > 0 { - e.mutex.RUnlock() - e.mutex.Lock() - e.consumers[consumerName].Offset = e.lastOffset - e.mutex.Unlock() - e.mutex.RLock() +func (e *eventStoreImpl[T]) SetOffset(consumerName string, offset int64) { + e.mutex.Lock() + defer e.mutex.Unlock() + if offset > e.lastOffset { + return } - return events, nil + e.consumers[consumerName].Offset = e.lastOffset } // cleanQueue removes all events that have been consumed by all consumers From e0d9eb31cd53565d82f19729d461d8add20bd26c Mon Sep 17 00:00:00 2001 From: Thomas Poignant Date: Mon, 3 Mar 2025 15:55:08 +0100 Subject: [PATCH 8/8] wip Signed-off-by: Thomas Poignant --- datastruct/even_store_test.go | 100 +++++++++++++++++++++++----------- datastruct/event_store.go | 41 +++++++++----- 2 files changed, 95 insertions(+), 46 deletions(-) diff --git a/datastruct/even_store_test.go b/datastruct/even_store_test.go index 02bdca2..c4898e0 100644 --- a/datastruct/even_store_test.go +++ b/datastruct/even_store_test.go @@ -14,18 +14,18 @@ import ( const defaultCleanQueueDuration = 100 * time.Millisecond func Test_ConsumerNameInvalid(t *testing.T) { - t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { + t.Run("GetPendingEventCount: should return an error if the consumer name is invalid", func(t *testing.T) { consumerNames := []string{"consumer1"} eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) defer eventStore.Stop() - _, err := eventStore.GetSizeQueue("wrong name") + _, err := eventStore.GetPendingEventCount("wrong name") assert.NotNil(t, err) }) - t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) { + t.Run("GetPendingEventCount: should return an error if the consumer name is invalid", func(t *testing.T) { consumerNames := []string{"consumer1"} eventStore2 := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) defer eventStore2.Stop() - _, err := eventStore2.GetEvents("wrong name") + _, err := eventStore2.FetchPendingEvents("wrong name") assert.NotNil(t, err) }) } @@ -34,23 +34,24 @@ func Test_SingleConsumer(t *testing.T) { consumerNames := []string{"consumer1"} eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) defer eventStore.Stop() - got, _ := eventStore.GetSizeQueue(consumerNames[0]) + got, _ := eventStore.GetPendingEventCount(consumerNames[0]) assert.Equal(t, int64(0), got) - assert.Equal(t, int64(0), eventStore.GetCacheSize()) + assert.Equal(t, int64(0), eventStore.GetTotalEventCount()) // start producer ctx, cancel := context.WithCancel(context.Background()) go startEventProducer(ctx, eventStore, 100, false) time.Sleep(50 * time.Millisecond) - got, _ = eventStore.GetSizeQueue(consumerNames[0]) + got, _ = eventStore.GetPendingEventCount(consumerNames[0]) assert.Equal(t, int64(100), got) cancel() // stop producing // Consume - events, _ := eventStore.GetEvents(consumerNames[0]) + events, _ := eventStore.FetchPendingEvents(consumerNames[0]) assert.Equal(t, 100, len(events.Events)) - eventStore.SetOffset(consumerNames[0], events.NewOffset) - got, _ = eventStore.GetSizeQueue(consumerNames[0]) + err := eventStore.UpdateConsumerOffset(consumerNames[0], events.NewOffset) + assert.Nil(t, err) + got, _ = eventStore.GetPendingEventCount(consumerNames[0]) assert.Equal(t, int64(0), got) // restart producing @@ -58,14 +59,15 @@ func Test_SingleConsumer(t *testing.T) { defer cancel2() go startEventProducer(ctx2, eventStore, 91, false) time.Sleep(50 * time.Millisecond) - got, _ = eventStore.GetSizeQueue(consumerNames[0]) + got, _ = eventStore.GetPendingEventCount(consumerNames[0]) assert.Equal(t, int64(91), got) - events, _ = eventStore.GetEvents(consumerNames[0]) - eventStore.SetOffset(consumerNames[0], events.NewOffset) + events, _ = eventStore.FetchPendingEvents(consumerNames[0]) + err = eventStore.UpdateConsumerOffset(consumerNames[0], events.NewOffset) + assert.Nil(t, err) assert.Equal(t, 91, len(events.Events)) time.Sleep(120 * time.Millisecond) // to wait until garbage collector remove the events - assert.Equal(t, int64(0), eventStore.GetCacheSize()) + assert.Equal(t, int64(0), eventStore.GetTotalEventCount()) } func Test_MultipleConsumersSingleThread(t *testing.T) { @@ -78,16 +80,17 @@ func Test_MultipleConsumersSingleThread(t *testing.T) { go startEventProducer(ctx, eventStore, 1000, false) time.Sleep(50 * time.Millisecond) cancelProducer1() - assert.Equal(t, int64(1000), eventStore.GetCacheSize()) + assert.Equal(t, int64(1000), eventStore.GetTotalEventCount()) // Consume with Consumer1 only - consumer1Size, err := eventStore.GetSizeQueue(consumerNames[0]) + consumer1Size, err := eventStore.GetPendingEventCount(consumerNames[0]) assert.Nil(t, err) assert.Equal(t, int64(1000), consumer1Size) - eventsConsumer1, err := eventStore.GetEvents(consumerNames[0]) + eventsConsumer1, err := eventStore.FetchPendingEvents(consumerNames[0]) assert.Nil(t, err) assert.Equal(t, 1000, len(eventsConsumer1.Events)) - eventStore.SetOffset(consumerNames[0], eventsConsumer1.NewOffset) + err = eventStore.UpdateConsumerOffset(consumerNames[0], eventsConsumer1.NewOffset) + assert.Nil(t, err) // Produce a second time ctx, cancelProducer2 := context.WithCancel(context.Background()) @@ -97,27 +100,29 @@ func Test_MultipleConsumersSingleThread(t *testing.T) { cancelProducer2() // Check queue size - assert.Equal(t, int64(2000), eventStore.GetCacheSize()) - consumer1Size, err = eventStore.GetSizeQueue(consumerNames[0]) + assert.Equal(t, int64(2000), eventStore.GetTotalEventCount()) + consumer1Size, err = eventStore.GetPendingEventCount(consumerNames[0]) assert.Nil(t, err) assert.Equal(t, int64(1000), consumer1Size) - consumer2Size, err := eventStore.GetSizeQueue(consumerNames[1]) + consumer2Size, err := eventStore.GetPendingEventCount(consumerNames[1]) assert.Nil(t, err) assert.Equal(t, int64(2000), consumer2Size) // Consumer with Consumer1 and Consumer2 - eventsConsumer1, err = eventStore.GetEvents(consumerNames[0]) + eventsConsumer1, err = eventStore.FetchPendingEvents(consumerNames[0]) assert.Nil(t, err) assert.Equal(t, 1000, len(eventsConsumer1.Events)) - eventStore.SetOffset(consumerNames[0], eventsConsumer1.NewOffset) - eventsConsumer2, err := eventStore.GetEvents(consumerNames[1]) + err = eventStore.UpdateConsumerOffset(consumerNames[0], eventsConsumer1.NewOffset) + assert.Nil(t, err) + eventsConsumer2, err := eventStore.FetchPendingEvents(consumerNames[1]) assert.Nil(t, err) assert.Equal(t, 2000, len(eventsConsumer2.Events)) - eventStore.SetOffset(consumerNames[1], eventsConsumer1.NewOffset) + err = eventStore.UpdateConsumerOffset(consumerNames[1], eventsConsumer1.NewOffset) + assert.Nil(t, err) // Check garbage collector time.Sleep(120 * time.Millisecond) - assert.Equal(t, int64(0), eventStore.GetCacheSize()) + assert.Equal(t, int64(0), eventStore.GetTotalEventCount()) } func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { @@ -134,15 +139,17 @@ func Test_MultipleConsumersMultipleGORoutines(t *testing.T) { consumFunc := func(eventStore datastruct.EventStore[string], consumerName string) { wg.Add(1) defer wg.Done() - events, err := eventStore.GetEvents(consumerName) + events, err := eventStore.FetchPendingEvents(consumerName) + assert.Nil(t, err) + err = eventStore.UpdateConsumerOffset(consumerName, events.NewOffset) assert.Nil(t, err) - eventStore.SetOffset(consumerName, events.NewOffset) assert.True(t, len(events.Events) > 0) time.Sleep(50 * time.Millisecond) // we wait to be sure that the producer has produce new events - events, err = eventStore.GetEvents(consumerName) + events, err = eventStore.FetchPendingEvents(consumerName) + assert.Nil(t, err) + err = eventStore.UpdateConsumerOffset(consumerName, events.NewOffset) assert.Nil(t, err) - eventStore.SetOffset(consumerName, events.NewOffset) assert.True(t, len(events.Events) > 0) } @@ -160,13 +167,42 @@ func Test_MultipleGetEventsWithoutSettingOffset(t *testing.T) { ctx := context.Background() startEventProducer(ctx, eventStore, 100, false) - firstCall, err := eventStore.GetEvents(consumerNames[0]) + firstCall, err := eventStore.FetchPendingEvents(consumerNames[0]) assert.Nil(t, err) - secondCall, err := eventStore.GetEvents(consumerNames[0]) + secondCall, err := eventStore.FetchPendingEvents(consumerNames[0]) assert.Nil(t, err) assert.Equal(t, firstCall, secondCall) } +func Test_UpdateWithInvalidOffset(t *testing.T) { + consumerNames := []string{"consumer1"} + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) + defer eventStore.Stop() + + // start producer + ctx := context.Background() + startEventProducer(ctx, eventStore, 100, false) + + eventList, err := eventStore.FetchPendingEvents(consumerNames[0]) + assert.Nil(t, err) + errUpdate := eventStore.UpdateConsumerOffset(consumerNames[0], eventList.NewOffset+100) + assert.NotNil(t, errUpdate) +} + +func Test_WaitForEmptyClean(t *testing.T) { + consumerNames := []string{"consumer1"} + eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration) + defer eventStore.Stop() + + // start producer + ctx := context.Background() + startEventProducer(ctx, eventStore, 100, false) + list, _ := eventStore.FetchPendingEvents(consumerNames[0]) + _ = eventStore.UpdateConsumerOffset(consumerNames[0], list.NewOffset) + time.Sleep(3 * defaultCleanQueueDuration) + assert.Equal(t, int64(0), eventStore.GetTotalEventCount()) +} + func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[string], produceMax int, randomizeProducingTime bool) { for i := 0; i < produceMax; i++ { select { diff --git a/datastruct/event_store.go b/datastruct/event_store.go index 96f09e7..9e3041a 100644 --- a/datastruct/event_store.go +++ b/datastruct/event_store.go @@ -48,11 +48,22 @@ type EventList[T any] struct { } type EventStore[T any] interface { + // Add is adding item of type T in the event store. Add(data T) - GetEvents(consumerName string) (*EventList[T], error) - GetSizeQueue(consumerName string) (int64, error) - GetCacheSize() int64 - SetOffset(consumerName string, offset int64) + + // FetchPendingEvents is returning all the available item in the event store for this consumer. + FetchPendingEvents(consumerName string) (*EventList[T], error) + + // GetPendingEventCount is returning the number items available in the event store for this consumer. + GetPendingEventCount(consumerName string) (int64, error) + + // GetTotalEventCount returns the total number of events in the store. + GetTotalEventCount() int64 + + // UpdateConsumerOffset updates the offset of the consumer to the new offset. + UpdateConsumerOffset(consumerName string, offset int64) error + + // Stop is closing the event store and stop the periodic cleaning. Stop() } @@ -65,15 +76,15 @@ type consumer struct { Offset int64 } -// GetCacheSize returns the number of events in the cache -func (e *eventStoreImpl[T]) GetCacheSize() int64 { +// GetTotalEventCount returns the total number of events in the store. +func (e *eventStoreImpl[T]) GetTotalEventCount() int64 { e.mutex.RLock() defer e.mutex.RUnlock() return int64(len(e.events)) } -// GetSizeQueue returns the number of events that have not been consumed by the consumer -func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { +// GetPendingEventCount is returning the number items available in the event store for this consumer. +func (e *eventStoreImpl[T]) GetPendingEventCount(consumerName string) (int64, error) { e.mutex.RLock() defer e.mutex.RUnlock() @@ -84,7 +95,7 @@ func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) { return e.lastOffset - consumer.Offset, nil } -// Add adds a new event to the store +// Add is adding item of type T in the event store. func (e *eventStoreImpl[T]) Add(data T) { e.mutex.Lock() defer e.mutex.Unlock() @@ -92,8 +103,8 @@ func (e *eventStoreImpl[T]) Add(data T) { e.events = append(e.events, event[T]{Offset: e.lastOffset, Data: data}) } -// GetEvents returns all events that have not been consumed by the consumer -func (e *eventStoreImpl[T]) GetEvents(consumerName string) (*EventList[T], error) { +// FetchPendingEvents is returning all the available item in the event store for this consumer. +func (e *eventStoreImpl[T]) FetchPendingEvents(consumerName string) (*EventList[T], error) { e.mutex.RLock() defer e.mutex.RUnlock() currentConsumer, ok := e.consumers[consumerName] @@ -109,13 +120,15 @@ func (e *eventStoreImpl[T]) GetEvents(consumerName string) (*EventList[T], error return &EventList[T]{Events: events, InitialOffset: currentConsumer.Offset, NewOffset: e.lastOffset}, nil } -func (e *eventStoreImpl[T]) SetOffset(consumerName string, offset int64) { +// UpdateConsumerOffset updates the offset of the consumer to the new offset. +func (e *eventStoreImpl[T]) UpdateConsumerOffset(consumerName string, offset int64) error { e.mutex.Lock() defer e.mutex.Unlock() if offset > e.lastOffset { - return + return fmt.Errorf("invalid offset: offset %d is greater than the last offset %d", offset, e.lastOffset) } e.consumers[consumerName].Offset = e.lastOffset + return nil } // cleanQueue removes all events that have been consumed by all consumers @@ -160,7 +173,7 @@ func (e *eventStoreImpl[T]) periodicCleanQueue() { } } -// Stop stops the periodic clean queue goroutine +// Stop is closing the event store and stop the periodic cleaning. func (e *eventStoreImpl[T]) Stop() { close(e.stopPeriodicCleaning) }