-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathevent_store.go
179 lines (155 loc) · 4.99 KB
/
event_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package datastruct
import (
"fmt"
"math"
"sync"
"time"
)
const minOffset = int64(math.MinInt64)
type eventStoreImpl[T any] struct {
// events is a list of events to store
events []event[T]
// mutex to protect the events and consumers
mutex sync.RWMutex
// consumers is a map of consumers with their name as key
consumers map[string]*consumer
// lastOffset is the last offset used for the event store
lastOffset int64
// stopPeriodicCleaning is a channel to stop the periodic cleaning goroutine
stopPeriodicCleaning chan struct{}
// cleanQueueDuration is the duration between each cleaning
cleanQueueDuration time.Duration
}
func NewEventStore[T any](consumerNames []string, cleanQueueDuration time.Duration) EventStore[T] {
consumers := make(map[string]*consumer, len(consumerNames))
for _, name := range consumerNames {
consumers[name] = &consumer{Offset: minOffset}
}
store := &eventStoreImpl[T]{
events: make([]event[T], 0),
consumers: consumers,
mutex: sync.RWMutex{},
lastOffset: minOffset,
stopPeriodicCleaning: make(chan struct{}),
cleanQueueDuration: cleanQueueDuration,
}
go store.periodicCleanQueue()
return store
}
type EventList[T any] struct {
Events []event[T]
InitialOffset int64
NewOffset int64
}
type EventStore[T any] interface {
// Add is adding item of type T in the event store.
Add(data T)
// FetchPendingEvents is returning all the available item in the event store for this consumer.
FetchPendingEvents(consumerName string) (*EventList[T], error)
// GetPendingEventCount is returning the number items available in the event store for this consumer.
GetPendingEventCount(consumerName string) (int64, error)
// GetTotalEventCount returns the total number of events in the store.
GetTotalEventCount() int64
// UpdateConsumerOffset updates the offset of the consumer to the new offset.
UpdateConsumerOffset(consumerName string, offset int64) error
// Stop is closing the event store and stop the periodic cleaning.
Stop()
}
type event[T any] struct {
Offset int64
Data T
}
type consumer struct {
Offset int64
}
// GetTotalEventCount returns the total number of events in the store.
func (e *eventStoreImpl[T]) GetTotalEventCount() int64 {
e.mutex.RLock()
defer e.mutex.RUnlock()
return int64(len(e.events))
}
// GetPendingEventCount is returning the number items available in the event store for this consumer.
func (e *eventStoreImpl[T]) GetPendingEventCount(consumerName string) (int64, error) {
e.mutex.RLock()
defer e.mutex.RUnlock()
consumer, ok := e.consumers[consumerName]
if !ok {
return 0, fmt.Errorf("consumer with name %s not found", consumerName)
}
return e.lastOffset - consumer.Offset, nil
}
// Add is adding item of type T in the event store.
func (e *eventStoreImpl[T]) Add(data T) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.lastOffset++
e.events = append(e.events, event[T]{Offset: e.lastOffset, Data: data})
}
// FetchPendingEvents is returning all the available item in the event store for this consumer.
func (e *eventStoreImpl[T]) FetchPendingEvents(consumerName string) (*EventList[T], error) {
e.mutex.RLock()
defer e.mutex.RUnlock()
currentConsumer, ok := e.consumers[consumerName]
if !ok {
return nil, fmt.Errorf("consumer with name %s not found", consumerName)
}
events := make([]event[T], 0)
for _, event := range e.events {
if event.Offset > currentConsumer.Offset {
events = append(events, event)
}
}
return &EventList[T]{Events: events, InitialOffset: currentConsumer.Offset, NewOffset: e.lastOffset}, nil
}
// UpdateConsumerOffset updates the offset of the consumer to the new offset.
func (e *eventStoreImpl[T]) UpdateConsumerOffset(consumerName string, offset int64) error {
e.mutex.Lock()
defer e.mutex.Unlock()
if offset > e.lastOffset {
return fmt.Errorf("invalid offset: offset %d is greater than the last offset %d", offset, e.lastOffset)
}
e.consumers[consumerName].Offset = e.lastOffset
return nil
}
// cleanQueue removes all events that have been consumed by all consumers
func (e *eventStoreImpl[T]) cleanQueue() {
e.mutex.Lock()
defer e.mutex.Unlock()
if e.events == nil || len(e.events) == 0 {
// nothing to remove
return
}
consumerMinOffset := minOffset
for _, currentConsumer := range e.consumers {
if consumerMinOffset == minOffset || currentConsumer.Offset < consumerMinOffset {
consumerMinOffset = currentConsumer.Offset
}
}
if consumerMinOffset <= minOffset {
// nothing to remove
return
}
for i, event := range e.events {
if event.Offset == consumerMinOffset {
e.events = e.events[i+1:]
break
}
}
}
// periodicCleanQueue periodically cleans the queue
func (e *eventStoreImpl[T]) periodicCleanQueue() {
ticker := time.NewTicker(e.cleanQueueDuration)
defer ticker.Stop()
for {
select {
case <-ticker.C:
e.cleanQueue()
case <-e.stopPeriodicCleaning:
return
}
}
}
// Stop is closing the event store and stop the periodic cleaning.
func (e *eventStoreImpl[T]) Stop() {
close(e.stopPeriodicCleaning)
}