Skip to content

Commit 3cdb664

Browse files
Init code
Signed-off-by: Thomas Poignant <[email protected]>
1 parent 836a316 commit 3cdb664

File tree

5 files changed

+357
-0
lines changed

5 files changed

+357
-0
lines changed

datastruct/even_store_test.go

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package datastruct_test
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"github.com/stretchr/testify/assert"
7+
"math/rand"
8+
"sync"
9+
"testing"
10+
"time"
11+
"untitled/datastruct"
12+
)
13+
14+
func Test_ConsumerNameInvalid(t *testing.T) {
15+
t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) {
16+
consumerNames := []string{"consumer1"}
17+
eventStore := datastruct.NewEventStore[string](consumerNames)
18+
_, err := eventStore.GetSizeQueue("wrong name")
19+
assert.NotNil(t, err)
20+
})
21+
t.Run("GetSizeQueue: should return an error if the consumer name is invalid", func(t *testing.T) {
22+
consumerNames := []string{"consumer1"}
23+
eventStore := datastruct.NewEventStore[string](consumerNames)
24+
_, err := eventStore.GetEvents("wrong name")
25+
assert.NotNil(t, err)
26+
})
27+
}
28+
29+
func Test_SingleConsumer(t *testing.T) {
30+
consumerNames := []string{"consumer1"}
31+
eventStore := datastruct.NewEventStore[string](consumerNames)
32+
33+
got, _ := eventStore.GetSizeQueue(consumerNames[0])
34+
assert.Equal(t, int64(0), got)
35+
assert.Equal(t, int64(0), eventStore.GetCacheSize())
36+
37+
// start producer
38+
ctx, cancel := context.WithCancel(context.Background())
39+
go startEventProducer(ctx, eventStore, 100, false)
40+
time.Sleep(50 * time.Millisecond)
41+
got, _ = eventStore.GetSizeQueue(consumerNames[0])
42+
assert.Equal(t, int64(100), got)
43+
cancel() // stop producing
44+
45+
// Consume
46+
events, _ := eventStore.GetEvents(consumerNames[0])
47+
assert.Equal(t, 100, len(events))
48+
got, _ = eventStore.GetSizeQueue(consumerNames[0])
49+
assert.Equal(t, int64(0), got)
50+
51+
// restart producing
52+
ctx2, cancel2 := context.WithCancel(context.Background())
53+
defer cancel2()
54+
go startEventProducer(ctx2, eventStore, 91, false)
55+
time.Sleep(50 * time.Millisecond)
56+
got, _ = eventStore.GetSizeQueue(consumerNames[0])
57+
assert.Equal(t, int64(91), got)
58+
events, _ = eventStore.GetEvents(consumerNames[0])
59+
assert.Equal(t, 91, len(events))
60+
61+
time.Sleep(50 * time.Millisecond) // to wait until garbage collector remove the events
62+
assert.Equal(t, int64(0), eventStore.GetCacheSize())
63+
}
64+
65+
func Test_MultipleConsumersSingleThread(t *testing.T) {
66+
consumerNames := []string{"consumer1", "consumer2"}
67+
eventStore := datastruct.NewEventStore[string](consumerNames)
68+
69+
// start producer
70+
ctx, cancelProducer1 := context.WithCancel(context.Background())
71+
defer cancelProducer1()
72+
go startEventProducer(ctx, eventStore, 1000, false)
73+
time.Sleep(50 * time.Millisecond)
74+
cancelProducer1()
75+
assert.Equal(t, int64(1000), eventStore.GetCacheSize())
76+
77+
// Consume with Consumer1 only
78+
consumer1Size, err := eventStore.GetSizeQueue(consumerNames[0])
79+
assert.Nil(t, err)
80+
assert.Equal(t, int64(1000), consumer1Size)
81+
eventsConsumer1, err := eventStore.GetEvents(consumerNames[0])
82+
assert.Nil(t, err)
83+
assert.Equal(t, 1000, len(eventsConsumer1))
84+
85+
// Produce a second time
86+
ctx, cancelProducer2 := context.WithCancel(context.Background())
87+
defer cancelProducer2()
88+
go startEventProducer(ctx, eventStore, 1000, false)
89+
time.Sleep(50 * time.Millisecond)
90+
cancelProducer2()
91+
92+
// Check queue size
93+
assert.Equal(t, int64(2000), eventStore.GetCacheSize())
94+
consumer1Size, err = eventStore.GetSizeQueue(consumerNames[0])
95+
assert.Nil(t, err)
96+
assert.Equal(t, int64(1000), consumer1Size)
97+
consumer2Size, err := eventStore.GetSizeQueue(consumerNames[1])
98+
assert.Nil(t, err)
99+
assert.Equal(t, int64(2000), consumer2Size)
100+
101+
// Consumer with Consumer1 and Consumer2
102+
eventsConsumer1, err = eventStore.GetEvents(consumerNames[0])
103+
assert.Nil(t, err)
104+
assert.Equal(t, 1000, len(eventsConsumer1))
105+
eventsConsumer2, err := eventStore.GetEvents(consumerNames[1])
106+
assert.Nil(t, err)
107+
assert.Equal(t, 2000, len(eventsConsumer2))
108+
109+
// Check garbage collector
110+
time.Sleep(50 * time.Millisecond)
111+
assert.Equal(t, int64(0), eventStore.GetCacheSize())
112+
}
113+
114+
func Test_MultipleConsumersMultipleGORoutines(t *testing.T) {
115+
consumerNames := []string{"consumer1", "consumer2"}
116+
eventStore := datastruct.NewEventStore[string](consumerNames)
117+
118+
// start producer
119+
ctx, cancelProducer1 := context.WithCancel(context.Background())
120+
defer cancelProducer1()
121+
go startEventProducer(ctx, eventStore, 100000, true)
122+
time.Sleep(50 * time.Millisecond)
123+
wg := &sync.WaitGroup{}
124+
125+
consumFunc := func(eventStore datastruct.EventStore[string], consumerName string) {
126+
wg.Add(1)
127+
defer wg.Done()
128+
events, err := eventStore.GetEvents(consumerName)
129+
assert.Nil(t, err)
130+
131+
assert.True(t, len(events) > 0)
132+
time.Sleep(50 * time.Millisecond) // we wait to be sure that the producer has produce new events
133+
events, err = eventStore.GetEvents(consumerName)
134+
assert.Nil(t, err)
135+
assert.True(t, len(events) > 0)
136+
}
137+
138+
go consumFunc(eventStore, consumerNames[0])
139+
go consumFunc(eventStore, consumerNames[1])
140+
wg.Wait()
141+
cancelProducer1()
142+
}
143+
144+
func startEventProducer(ctx context.Context, eventStore datastruct.EventStore[string], produceMax int, randomizeProducingTime bool) {
145+
for i := 0; i < produceMax; i++ {
146+
select {
147+
case <-ctx.Done():
148+
fmt.Println("Goroutine stopped")
149+
return
150+
default:
151+
if randomizeProducingTime {
152+
randomNumber := rand.Intn(10) + 1
153+
time.Sleep(time.Duration(randomNumber) * time.Millisecond)
154+
}
155+
eventStore.Push("Hello")
156+
}
157+
}
158+
}

datastruct/event_store.go

+148
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package datastruct
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"sync"
7+
)
8+
9+
const minOffset = int64(math.MinInt64)
10+
11+
type eventStoreImpl[T any] struct {
12+
Events []event[T]
13+
Mutex sync.RWMutex
14+
Consumers *sync.Map
15+
consumerNames []string
16+
lastOffset int64
17+
}
18+
19+
func NewEventStore[T any](consumerNames []string) EventStore[T] {
20+
consumers := sync.Map{}
21+
for _, name := range consumerNames {
22+
consumers.Store(name, consumer{Offset: minOffset})
23+
}
24+
return &eventStoreImpl[T]{
25+
Events: make([]event[T], 0),
26+
Consumers: &consumers,
27+
Mutex: sync.RWMutex{},
28+
consumerNames: consumerNames,
29+
lastOffset: minOffset,
30+
}
31+
}
32+
33+
type EventStore[T any] interface {
34+
Push(data T)
35+
GetEvents(consumerName string) ([]event[T], error)
36+
GetSizeQueue(consumerName string) (int64, error)
37+
GetCacheSize() int64
38+
}
39+
40+
type event[T any] struct {
41+
Offset int64
42+
Data T
43+
}
44+
45+
type consumer struct {
46+
Offset int64
47+
}
48+
49+
// GetCacheSize return the number of events in the cache
50+
func (e *eventStoreImpl[T]) GetCacheSize() int64 {
51+
e.Mutex.RLock()
52+
defer e.Mutex.RUnlock()
53+
return int64(len(e.Events))
54+
}
55+
56+
// GetSizeQueue return the number of events that have not been consumed by the consumer
57+
func (e *eventStoreImpl[T]) GetSizeQueue(consumerName string) (int64, error) {
58+
e.Mutex.RLock()
59+
defer e.Mutex.RUnlock()
60+
61+
consumer, err := e.getConsumer(consumerName)
62+
if err != nil {
63+
return 0, err
64+
}
65+
66+
if e.Events == nil || len(e.Events) == 0 {
67+
return 0, nil
68+
}
69+
return e.lastOffset - consumer.Offset, nil
70+
}
71+
72+
// GetConsumer return the consumer with the given name
73+
func (e *eventStoreImpl[T]) getConsumer(name string) (consumer, error) {
74+
if e.Consumers == nil {
75+
return consumer{}, fmt.Errorf("consumer not found")
76+
}
77+
if value, ok := e.Consumers.Load(name); ok {
78+
typedValue, ok := value.(consumer)
79+
if ok {
80+
return typedValue, nil
81+
}
82+
}
83+
return consumer{}, fmt.Errorf("invalid consumer found")
84+
}
85+
86+
// Push add a new event to the store
87+
func (e *eventStoreImpl[T]) Push(data T) {
88+
e.Mutex.Lock()
89+
defer e.Mutex.Unlock()
90+
e.lastOffset++
91+
e.Events = append(e.Events, event[T]{Offset: e.lastOffset, Data: data})
92+
}
93+
94+
// GetEvents return all events that have not been consumed by the consumer
95+
func (e *eventStoreImpl[T]) GetEvents(consumerName string) ([]event[T], error) {
96+
e.Mutex.RLock()
97+
defer e.Mutex.RUnlock()
98+
events := make([]event[T], 0)
99+
currentConsumer, err := e.getConsumer(consumerName)
100+
if err != nil {
101+
return nil, fmt.Errorf("consumer with name %s not found: %s", consumerName, err)
102+
}
103+
for _, event := range e.Events {
104+
if event.Offset > currentConsumer.Offset {
105+
events = append(events, event)
106+
}
107+
}
108+
if len(events) > 0 {
109+
e.Consumers.Store(consumerName, consumer{Offset: e.lastOffset})
110+
}
111+
112+
// trigger queue cleaning to remove all events that have been consumed by all consumers, we can optimize
113+
// that part by calling this goroutine only on certain conditions and not every time we call GetEvents
114+
go e.cleanQueue()
115+
116+
return events, nil
117+
}
118+
119+
// cleanQueue remove all events that have been consumed by all consumers
120+
func (e *eventStoreImpl[T]) cleanQueue() {
121+
e.Mutex.Lock()
122+
defer e.Mutex.Unlock()
123+
if e.Events == nil || len(e.Events) == 0 {
124+
// nothing to remove
125+
return
126+
}
127+
consumerMinOffset := minOffset
128+
for _, consumer := range e.consumerNames {
129+
currentConsumer, _ := e.getConsumer(consumer)
130+
if consumerMinOffset == minOffset || currentConsumer.Offset < consumerMinOffset {
131+
consumerMinOffset = currentConsumer.Offset
132+
}
133+
}
134+
if consumerMinOffset <= minOffset {
135+
// nothing to remove
136+
return
137+
}
138+
139+
index := 0
140+
for i, event := range e.Events {
141+
if event.Offset == consumerMinOffset {
142+
index = i
143+
break
144+
}
145+
}
146+
147+
e.Events = e.Events[index+1:]
148+
}

go.mod

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
module untitled
2+
3+
go 1.23
4+
5+
require (
6+
github.com/davecgh/go-spew v1.1.1 // indirect
7+
github.com/pmezard/go-difflib v1.0.0 // indirect
8+
github.com/stretchr/testify v1.10.0 // indirect
9+
gopkg.in/yaml.v3 v3.0.1 // indirect
10+
)

go.sum

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
4+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
5+
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
6+
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
7+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
8+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
9+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

main.go

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"untitled/datastruct"
6+
)
7+
8+
func main() {
9+
eventStore := datastruct.NewEventStore[string]([]string{"consumer1", "consumer2"})
10+
11+
for i := 0; i < 1000; i++ {
12+
eventStore.Push("Hello")
13+
}
14+
15+
fmt.Println(eventStore.GetEvents("consumer1"))
16+
fmt.Println(eventStore.GetConsumer("consumer1"))
17+
fmt.Println(eventStore.GetEvents("consumer2"))
18+
fmt.Println(eventStore.GetConsumer("consumer2"))
19+
20+
for i := 0; i < 100; i++ {
21+
eventStore.Push("Hello")
22+
}
23+
fmt.Println(eventStore.GetEvents("consumer1"))
24+
fmt.Println(eventStore.GetEvents("consumer2"))
25+
26+
for i := 0; i < 100; i++ {
27+
eventStore.Push("Hello")
28+
}
29+
fmt.Println(eventStore.GetEvents("consumer1"))
30+
fmt.Println("toto")
31+
32+
}

0 commit comments

Comments
 (0)