Skip to content

Init code #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 220 additions & 0 deletions datastruct/even_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
package datastruct_test

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"math/rand"
"sync"
"testing"
"time"
"untitled/datastruct"
)

const defaultCleanQueueDuration = 100 * time.Millisecond

func Test_ConsumerNameInvalid(t *testing.T) {
t.Run("GetPendingEventCount: should return an error if the consumer name is invalid", func(t *testing.T) {
consumerNames := []string{"consumer1"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()
_, err := eventStore.GetPendingEventCount("wrong name")
assert.NotNil(t, err)
})
t.Run("GetPendingEventCount: should return an error if the consumer name is invalid", func(t *testing.T) {
consumerNames := []string{"consumer1"}
eventStore2 := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore2.Stop()
_, err := eventStore2.FetchPendingEvents("wrong name")
assert.NotNil(t, err)
})
}

func Test_SingleConsumer(t *testing.T) {
consumerNames := []string{"consumer1"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()
got, _ := eventStore.GetPendingEventCount(consumerNames[0])
assert.Equal(t, int64(0), got)
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())

// start producer
ctx, cancel := context.WithCancel(context.Background())
go startEventProducer(ctx, eventStore, 100, false)
time.Sleep(50 * time.Millisecond)
got, _ = eventStore.GetPendingEventCount(consumerNames[0])
assert.Equal(t, int64(100), got)
cancel() // stop producing

// Consume
events, _ := eventStore.FetchPendingEvents(consumerNames[0])
assert.Equal(t, 100, len(events.Events))
err := eventStore.UpdateConsumerOffset(consumerNames[0], events.NewOffset)
assert.Nil(t, err)
got, _ = eventStore.GetPendingEventCount(consumerNames[0])
assert.Equal(t, int64(0), got)

// restart producing
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
go startEventProducer(ctx2, eventStore, 91, false)
time.Sleep(50 * time.Millisecond)
got, _ = eventStore.GetPendingEventCount(consumerNames[0])
assert.Equal(t, int64(91), got)
events, _ = eventStore.FetchPendingEvents(consumerNames[0])
err = eventStore.UpdateConsumerOffset(consumerNames[0], events.NewOffset)
assert.Nil(t, err)
assert.Equal(t, 91, len(events.Events))

time.Sleep(120 * time.Millisecond) // to wait until garbage collector remove the events
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func Test_MultipleConsumersSingleThread(t *testing.T) {
consumerNames := []string{"consumer1", "consumer2"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()
// start producer
ctx, cancelProducer1 := context.WithCancel(context.Background())
defer cancelProducer1()
go startEventProducer(ctx, eventStore, 1000, false)
time.Sleep(50 * time.Millisecond)
cancelProducer1()
assert.Equal(t, int64(1000), eventStore.GetTotalEventCount())

// Consume with Consumer1 only
consumer1Size, err := eventStore.GetPendingEventCount(consumerNames[0])
assert.Nil(t, err)
assert.Equal(t, int64(1000), consumer1Size)
eventsConsumer1, err := eventStore.FetchPendingEvents(consumerNames[0])
assert.Nil(t, err)
assert.Equal(t, 1000, len(eventsConsumer1.Events))
err = eventStore.UpdateConsumerOffset(consumerNames[0], eventsConsumer1.NewOffset)
assert.Nil(t, err)

// Produce a second time
ctx, cancelProducer2 := context.WithCancel(context.Background())
defer cancelProducer2()
go startEventProducer(ctx, eventStore, 1000, false)
time.Sleep(50 * time.Millisecond)
cancelProducer2()

// Check queue size
assert.Equal(t, int64(2000), eventStore.GetTotalEventCount())
consumer1Size, err = eventStore.GetPendingEventCount(consumerNames[0])
assert.Nil(t, err)
assert.Equal(t, int64(1000), consumer1Size)
consumer2Size, err := eventStore.GetPendingEventCount(consumerNames[1])
assert.Nil(t, err)
assert.Equal(t, int64(2000), consumer2Size)

// Consumer with Consumer1 and Consumer2
eventsConsumer1, err = eventStore.FetchPendingEvents(consumerNames[0])
assert.Nil(t, err)
assert.Equal(t, 1000, len(eventsConsumer1.Events))
err = eventStore.UpdateConsumerOffset(consumerNames[0], eventsConsumer1.NewOffset)
assert.Nil(t, err)
eventsConsumer2, err := eventStore.FetchPendingEvents(consumerNames[1])
assert.Nil(t, err)
assert.Equal(t, 2000, len(eventsConsumer2.Events))
err = eventStore.UpdateConsumerOffset(consumerNames[1], eventsConsumer1.NewOffset)
assert.Nil(t, err)

// Check garbage collector
time.Sleep(120 * time.Millisecond)
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
consumerNames := []string{"consumer1", "consumer2"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()
// start producer
ctx, cancelProducer1 := context.WithCancel(context.Background())
defer cancelProducer1()
go startEventProducer(ctx, eventStore, 100000, true)
time.Sleep(50 * time.Millisecond)
wg := &sync.WaitGroup{}

consumFunc := func(eventStore datastruct.EventStore[string], consumerName string) {
wg.Add(1)
defer wg.Done()
events, err := eventStore.FetchPendingEvents(consumerName)
assert.Nil(t, err)
err = eventStore.UpdateConsumerOffset(consumerName, events.NewOffset)
assert.Nil(t, err)

assert.True(t, len(events.Events) > 0)
time.Sleep(50 * time.Millisecond) // we wait to be sure that the producer has produce new events
events, err = eventStore.FetchPendingEvents(consumerName)
assert.Nil(t, err)
err = eventStore.UpdateConsumerOffset(consumerName, events.NewOffset)
assert.Nil(t, err)
assert.True(t, len(events.Events) > 0)
}

go consumFunc(eventStore, consumerNames[0])
go consumFunc(eventStore, consumerNames[1])
wg.Wait()
}

func Test_MultipleGetEventsWithoutSettingOffset(t *testing.T) {
consumerNames := []string{"consumer1"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()

// start producer
ctx := context.Background()
startEventProducer(ctx, eventStore, 100, false)

firstCall, err := eventStore.FetchPendingEvents(consumerNames[0])
assert.Nil(t, err)
secondCall, err := eventStore.FetchPendingEvents(consumerNames[0])
assert.Nil(t, err)
assert.Equal(t, firstCall, secondCall)
}

func Test_UpdateWithInvalidOffset(t *testing.T) {
consumerNames := []string{"consumer1"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()

// start producer
ctx := context.Background()
startEventProducer(ctx, eventStore, 100, false)

eventList, err := eventStore.FetchPendingEvents(consumerNames[0])
assert.Nil(t, err)
errUpdate := eventStore.UpdateConsumerOffset(consumerNames[0], eventList.NewOffset+100)
assert.NotNil(t, errUpdate)
}

func Test_WaitForEmptyClean(t *testing.T) {
consumerNames := []string{"consumer1"}
eventStore := datastruct.NewEventStore[string](consumerNames, defaultCleanQueueDuration)
defer eventStore.Stop()

// start producer
ctx := context.Background()
startEventProducer(ctx, eventStore, 100, false)
list, _ := eventStore.FetchPendingEvents(consumerNames[0])
_ = eventStore.UpdateConsumerOffset(consumerNames[0], list.NewOffset)
time.Sleep(3 * defaultCleanQueueDuration)
assert.Equal(t, int64(0), eventStore.GetTotalEventCount())
}

func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[string], produceMax int, randomizeProducingTime bool) {
for i := 0; i < produceMax; i++ {
select {
case <-ctx.Done():
fmt.Println("Goroutine stopped")
return
default:
if randomizeProducingTime {
randomNumber := rand.Intn(10) + 1
time.Sleep(time.Duration(randomNumber) * time.Millisecond)
}
eventStore.Add("Hello")
}
}
}
179 changes: 179 additions & 0 deletions datastruct/event_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package datastruct

import (
"fmt"
"math"
"sync"
"time"
)

const minOffset = int64(math.MinInt64)

type eventStoreImpl[T any] struct {
// events is a list of events to store
events []event[T]
// mutex to protect the events and consumers
mutex sync.RWMutex
// consumers is a map of consumers with their name as key
consumers map[string]*consumer
// lastOffset is the last offset used for the event store
lastOffset int64
// stopPeriodicCleaning is a channel to stop the periodic cleaning goroutine
stopPeriodicCleaning chan struct{}
// cleanQueueDuration is the duration between each cleaning
cleanQueueDuration time.Duration
}

func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] {
consumers := make(map[string]*consumer, len(consumerNames))
for _, name := range consumerNames {
consumers[name] = &consumer{Offset: minOffset}
}
store := &eventStoreImpl[T]{
events: make([]event[T], 0),
consumers: consumers,
mutex: sync.RWMutex{},
lastOffset: minOffset,
stopPeriodicCleaning: make(chan struct{}),
cleanQueueDuration: cleanQueueDuration,
}
go store.periodicCleanQueue()
return store
}

type EventList[T any] struct {
Events []event[T]
InitialOffset int64
NewOffset int64
}

type EventStore[T any] interface {
// Add is adding item of type T in the event store.
Add(data T)

// FetchPendingEvents is returning all the available item in the event store for this consumer.
FetchPendingEvents(consumerName string) (*EventList[T], error)

// GetPendingEventCount is returning the number items available in the event store for this consumer.
GetPendingEventCount(consumerName string) (int64, error)

// GetTotalEventCount returns the total number of events in the store.
GetTotalEventCount() int64

// UpdateConsumerOffset updates the offset of the consumer to the new offset.
UpdateConsumerOffset(consumerName string, offset int64) error

// Stop is closing the event store and stop the periodic cleaning.
Stop()
}

type event[T any] struct {
Offset int64
Data T
}

type consumer struct {
Offset int64
}

// GetTotalEventCount returns the total number of events in the store.
func (e *eventStoreImpl[T]) GetTotalEventCount() int64 {
e.mutex.RLock()
defer e.mutex.RUnlock()
return int64(len(e.events))
}

// GetPendingEventCount is returning the number items available in the event store for this consumer.
func (e *eventStoreImpl[T]) GetPendingEventCount(consumerName string) (int64, error) {
e.mutex.RLock()
defer e.mutex.RUnlock()

consumer, ok := e.consumers[consumerName]
if !ok {
return 0, fmt.Errorf("consumer with name %s not found", consumerName)
}
return e.lastOffset - consumer.Offset, nil
}

// Add is adding item of type T in the event store.
func (e *eventStoreImpl[T]) Add(data T) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.lastOffset++
e.events = append(e.events, event[T]{Offset: e.lastOffset, Data: data})
}

// FetchPendingEvents is returning all the available item in the event store for this consumer.
func (e *eventStoreImpl[T]) FetchPendingEvents(consumerName string) (*EventList[T], error) {
e.mutex.RLock()
defer e.mutex.RUnlock()
currentConsumer, ok := e.consumers[consumerName]
if !ok {
return nil, fmt.Errorf("consumer with name %s not found", consumerName)
}
events := make([]event[T], 0)
for _, event := range e.events {
if event.Offset > currentConsumer.Offset {
events = append(events, event)
}
}
return &EventList[T]{Events: events, InitialOffset: currentConsumer.Offset, NewOffset: e.lastOffset}, nil
}

// UpdateConsumerOffset updates the offset of the consumer to the new offset.
func (e *eventStoreImpl[T]) UpdateConsumerOffset(consumerName string, offset int64) error {
e.mutex.Lock()
defer e.mutex.Unlock()
if offset > e.lastOffset {
return fmt.Errorf("invalid offset: offset %d is greater than the last offset %d", offset, e.lastOffset)
}
e.consumers[consumerName].Offset = e.lastOffset
return nil
}

// cleanQueue removes all events that have been consumed by all consumers
func (e *eventStoreImpl[T]) cleanQueue() {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.events == nil || len(e.events) == 0 {
// nothing to remove
return
}
consumerMinOffset := minOffset
for _, currentConsumer := range e.consumers {
if consumerMinOffset == minOffset || currentConsumer.Offset < consumerMinOffset {
consumerMinOffset = currentConsumer.Offset
}
}
if consumerMinOffset <= minOffset {
// nothing to remove
return
}

for i, event := range e.events {
if event.Offset == consumerMinOffset {
e.events = e.events[i+1:]
break
}
}

}

// periodicCleanQueue periodically cleans the queue
func (e *eventStoreImpl[T]) periodicCleanQueue() {
ticker := time.NewTicker(e.cleanQueueDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
e.cleanQueue()
case <-e.stopPeriodicCleaning:
return
}
}
}

// Stop is closing the event store and stop the periodic cleaning.
func (e *eventStoreImpl[T]) Stop() {
close(e.stopPeriodicCleaning)
}
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module untitled

go 1.23

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.10.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=