@@ -57,6 +57,8 @@ const (
57
57
// Limit is required to avoid memory spikes during cache initialization.
58
58
// The default limit of 50 is chosen based on experiments.
59
59
defaultListSemaphoreWeight = 50
60
+ // defaultEventProcessingInterval is the default interval for processing events
61
+ defaultEventProcessingInterval = 100 * time .Millisecond
60
62
)
61
63
62
64
const (
@@ -75,6 +77,11 @@ type apiMeta struct {
75
77
watchCancel context.CancelFunc
76
78
}
77
79
80
+ type eventMeta struct {
81
+ event watch.EventType
82
+ un * unstructured.Unstructured
83
+ }
84
+
78
85
// ClusterInfo holds cluster cache stats
79
86
type ClusterInfo struct {
80
87
// Server holds cluster API server URL
@@ -96,6 +103,9 @@ type ClusterInfo struct {
96
103
// OnEventHandler is a function that handles Kubernetes event
97
104
type OnEventHandler func (event watch.EventType , un * unstructured.Unstructured )
98
105
106
+ // OnProcessEventsHandler handles process events event
107
+ type OnProcessEventsHandler func (duration time.Duration , processedEventsNumber int )
108
+
99
109
// OnPopulateResourceInfoHandler returns additional resource metadata that should be stored in cache
100
110
type OnPopulateResourceInfoHandler func (un * unstructured.Unstructured , isRoot bool ) (info interface {}, cacheManifest bool )
101
111
@@ -137,6 +147,8 @@ type ClusterCache interface {
137
147
OnResourceUpdated (handler OnResourceUpdatedHandler ) Unsubscribe
138
148
// OnEvent register event handler that is executed every time when new K8S event received
139
149
OnEvent (handler OnEventHandler ) Unsubscribe
150
+ // OnProcessEventsHandler register event handler that is executed every time when events were processed
151
+ OnProcessEventsHandler (handler OnProcessEventsHandler ) Unsubscribe
140
152
}
141
153
142
154
type WeightedSemaphore interface {
@@ -153,6 +165,7 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
153
165
cache := & clusterCache {
154
166
settings : Settings {ResourceHealthOverride : & noopSettings {}, ResourcesFilter : & noopSettings {}},
155
167
apisMeta : make (map [schema.GroupKind ]* apiMeta ),
168
+ eventMetaCh : nil ,
156
169
listPageSize : defaultListPageSize ,
157
170
listPageBufferSize : defaultListPageBufferSize ,
158
171
listSemaphore : semaphore .NewWeighted (defaultListSemaphoreWeight ),
@@ -169,8 +182,10 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
169
182
},
170
183
watchResyncTimeout : defaultWatchResyncTimeout ,
171
184
clusterSyncRetryTimeout : ClusterRetryTimeout ,
185
+ eventProcessingInterval : defaultEventProcessingInterval ,
172
186
resourceUpdatedHandlers : map [uint64 ]OnResourceUpdatedHandler {},
173
187
eventHandlers : map [uint64 ]OnEventHandler {},
188
+ processEventsHandlers : map [uint64 ]OnProcessEventsHandler {},
174
189
log : log ,
175
190
listRetryLimit : 1 ,
176
191
listRetryUseBackoff : false ,
@@ -185,16 +200,20 @@ func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCa
185
200
type clusterCache struct {
186
201
syncStatus clusterCacheSync
187
202
188
- apisMeta map [schema.GroupKind ]* apiMeta
189
- serverVersion string
190
- apiResources []kube.APIResourceInfo
203
+ apisMeta map [schema.GroupKind ]* apiMeta
204
+ batchEventsProcessing bool
205
+ eventMetaCh chan eventMeta
206
+ serverVersion string
207
+ apiResources []kube.APIResourceInfo
191
208
// namespacedResources is a simple map which indicates a groupKind is namespaced
192
209
namespacedResources map [schema.GroupKind ]bool
193
210
194
211
// maximum time we allow watches to run before relisting the group/kind and restarting the watch
195
212
watchResyncTimeout time.Duration
196
213
// sync retry timeout for cluster when sync error happens
197
214
clusterSyncRetryTimeout time.Duration
215
+ // ticker interval for events processing
216
+ eventProcessingInterval time.Duration
198
217
199
218
// size of a page for list operations pager.
200
219
listPageSize int64
@@ -224,6 +243,7 @@ type clusterCache struct {
224
243
populateResourceInfoHandler OnPopulateResourceInfoHandler
225
244
resourceUpdatedHandlers map [uint64 ]OnResourceUpdatedHandler
226
245
eventHandlers map [uint64 ]OnEventHandler
246
+ processEventsHandlers map [uint64 ]OnProcessEventsHandler
227
247
openAPISchema openapi.Resources
228
248
gvkParser * managedfields.GvkParser
229
249
@@ -299,6 +319,29 @@ func (c *clusterCache) getEventHandlers() []OnEventHandler {
299
319
return handlers
300
320
}
301
321
322
+ // OnProcessEventsHandler register event handler that is executed every time when events were processed
323
+ func (c * clusterCache ) OnProcessEventsHandler (handler OnProcessEventsHandler ) Unsubscribe {
324
+ c .handlersLock .Lock ()
325
+ defer c .handlersLock .Unlock ()
326
+ key := c .handlerKey
327
+ c .handlerKey ++
328
+ c .processEventsHandlers [key ] = handler
329
+ return func () {
330
+ c .handlersLock .Lock ()
331
+ defer c .handlersLock .Unlock ()
332
+ delete (c .processEventsHandlers , key )
333
+ }
334
+ }
335
+ func (c * clusterCache ) getProcessEventsHandlers () []OnProcessEventsHandler {
336
+ c .handlersLock .Lock ()
337
+ defer c .handlersLock .Unlock ()
338
+ handlers := make ([]OnProcessEventsHandler , 0 , len (c .processEventsHandlers ))
339
+ for _ , h := range c .processEventsHandlers {
340
+ handlers = append (handlers , h )
341
+ }
342
+ return handlers
343
+ }
344
+
302
345
// GetServerVersion returns observed cluster version
303
346
func (c * clusterCache ) GetServerVersion () string {
304
347
return c .serverVersion
@@ -440,6 +483,10 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
440
483
for i := range opts {
441
484
opts [i ](c )
442
485
}
486
+
487
+ if c .batchEventsProcessing {
488
+ c .invalidateEventMeta ()
489
+ }
443
490
c .apisMeta = nil
444
491
c .namespacedResources = nil
445
492
c .log .Info ("Invalidated cluster" )
@@ -669,7 +716,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
669
716
return fmt .Errorf ("Failed to convert to *unstructured.Unstructured: %v" , event .Object )
670
717
}
671
718
672
- c .processEvent (event .Type , obj )
719
+ c .recordEvent (event .Type , obj )
673
720
if kube .IsCRD (obj ) {
674
721
var resources []kube.APIResourceInfo
675
722
crd := v1.CustomResourceDefinition {}
@@ -823,6 +870,12 @@ func (c *clusterCache) sync() error {
823
870
for i := range c .apisMeta {
824
871
c .apisMeta [i ].watchCancel ()
825
872
}
873
+
874
+ if c .batchEventsProcessing {
875
+ c .invalidateEventMeta ()
876
+ c .eventMetaCh = make (chan eventMeta )
877
+ }
878
+
826
879
c .apisMeta = make (map [schema.GroupKind ]* apiMeta )
827
880
c .resources = make (map [kube.ResourceKey ]* Resource )
828
881
c .namespacedResources = make (map [schema.GroupKind ]bool )
@@ -864,6 +917,10 @@ func (c *clusterCache) sync() error {
864
917
return err
865
918
}
866
919
920
+ if c .batchEventsProcessing {
921
+ go c .processEvents ()
922
+ }
923
+
867
924
// Each API is processed in parallel, so we need to take out a lock when we update clusterCache fields.
868
925
lock := sync.Mutex {}
869
926
err = kube .RunAllAsync (len (apis ), func (i int ) error {
@@ -926,6 +983,14 @@ func (c *clusterCache) sync() error {
926
983
return nil
927
984
}
928
985
986
+ // invalidateEventMeta closes the eventMeta channel if it is open
987
+ func (c * clusterCache ) invalidateEventMeta () {
988
+ if c .eventMetaCh != nil {
989
+ close (c .eventMetaCh )
990
+ c .eventMetaCh = nil
991
+ }
992
+ }
993
+
929
994
// EnsureSynced checks cache state and synchronizes it if necessary
930
995
func (c * clusterCache ) EnsureSynced () error {
931
996
syncStatus := & c .syncStatus
@@ -1231,7 +1296,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
1231
1296
return managedObjs , nil
1232
1297
}
1233
1298
1234
- func (c * clusterCache ) processEvent (event watch.EventType , un * unstructured.Unstructured ) {
1299
+ func (c * clusterCache ) recordEvent (event watch.EventType , un * unstructured.Unstructured ) {
1235
1300
for _ , h := range c .getEventHandlers () {
1236
1301
h (event , un )
1237
1302
}
@@ -1240,15 +1305,74 @@ func (c *clusterCache) processEvent(event watch.EventType, un *unstructured.Unst
1240
1305
return
1241
1306
}
1242
1307
1308
+ if c .batchEventsProcessing {
1309
+ c .eventMetaCh <- eventMeta {event , un }
1310
+ } else {
1311
+ c .lock .Lock ()
1312
+ defer c .lock .Unlock ()
1313
+ c .processEvent (key , eventMeta {event , un })
1314
+ }
1315
+ }
1316
+
1317
+ func (c * clusterCache ) processEvents () {
1318
+ log := c .log .WithValues ("functionName" , "processItems" )
1319
+ log .V (1 ).Info ("Start processing events" )
1320
+
1243
1321
c .lock .Lock ()
1244
- defer c .lock .Unlock ()
1322
+ ch := c .eventMetaCh
1323
+ c .lock .Unlock ()
1324
+
1325
+ eventMetas := make ([]eventMeta , 0 )
1326
+ ticker := time .NewTicker (c .eventProcessingInterval )
1327
+ defer ticker .Stop ()
1328
+
1329
+ for {
1330
+ select {
1331
+ case evMeta , ok := <- ch :
1332
+ if ! ok {
1333
+ log .V (2 ).Info ("Event processing channel closed, finish processing" )
1334
+ return
1335
+ }
1336
+ eventMetas = append (eventMetas , evMeta )
1337
+ case <- ticker .C :
1338
+ if len (eventMetas ) > 0 {
1339
+ c .processEventsBatch (eventMetas )
1340
+ eventMetas = eventMetas [:0 ]
1341
+ }
1342
+ }
1343
+ }
1344
+ }
1345
+
1346
+ func (c * clusterCache ) processEventsBatch (eventMetas []eventMeta ) {
1347
+ log := c .log .WithValues ("functionName" , "processEventsBatch" )
1348
+ start := time .Now ()
1349
+ c .lock .Lock ()
1350
+ log .V (1 ).Info ("Lock acquired (ms)" , "duration" , time .Since (start ).Milliseconds ())
1351
+ defer func () {
1352
+ c .lock .Unlock ()
1353
+ duration := time .Since (start )
1354
+ // Update the metric with the duration of the events processing
1355
+ for _ , handler := range c .getProcessEventsHandlers () {
1356
+ handler (duration , len (eventMetas ))
1357
+ }
1358
+ }()
1359
+
1360
+ for _ , evMeta := range eventMetas {
1361
+ key := kube .GetResourceKey (evMeta .un )
1362
+ c .processEvent (key , evMeta )
1363
+ }
1364
+
1365
+ log .V (1 ).Info ("Processed events (ms)" , "count" , len (eventMetas ), "duration" , time .Since (start ).Milliseconds ())
1366
+ }
1367
+
1368
+ func (c * clusterCache ) processEvent (key kube.ResourceKey , evMeta eventMeta ) {
1245
1369
existingNode , exists := c .resources [key ]
1246
- if event == watch .Deleted {
1370
+ if evMeta . event == watch .Deleted {
1247
1371
if exists {
1248
1372
c .onNodeRemoved (key )
1249
1373
}
1250
- } else if event != watch . Deleted {
1251
- c .onNodeUpdated (existingNode , c .newResource (un ))
1374
+ } else {
1375
+ c .onNodeUpdated (existingNode , c .newResource (evMeta . un ))
1252
1376
}
1253
1377
}
1254
1378
0 commit comments