Skip to content

Commit 6ad5c1d

Browse files
authored
Merge pull request #3190 from godwinpang/extract-start-watches
✨ [Warm Replicas] Extract startWatches into helper method.
2 parents 7606727 + 5f5daf3 commit 6ad5c1d

File tree

2 files changed

+177
-54
lines changed

2 files changed

+177
-54
lines changed

pkg/internal/controller/controller.go

+60-54
Original file line numberDiff line numberDiff line change
@@ -179,60 +179,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
179179
// NB(directxman12): launch the sources *before* trying to wait for the
180180
// caches to sync so that they have a chance to register their intended
181181
// caches.
182-
errGroup := &errgroup.Group{}
183-
for _, watch := range c.startWatches {
184-
log := c.LogConstructor(nil)
185-
_, ok := watch.(interface {
186-
String() string
187-
})
188-
189-
if !ok {
190-
log = log.WithValues("source", fmt.Sprintf("%T", watch))
191-
} else {
192-
log = log.WithValues("source", fmt.Sprintf("%s", watch))
193-
}
194-
didStartSyncingSource := &atomic.Bool{}
195-
errGroup.Go(func() error {
196-
// Use a timeout for starting and syncing the source to avoid silently
197-
// blocking startup indefinitely if it doesn't come up.
198-
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
199-
defer cancel()
200-
201-
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
202-
go func() {
203-
defer close(sourceStartErrChan)
204-
log.Info("Starting EventSource")
205-
if err := watch.Start(ctx, c.Queue); err != nil {
206-
sourceStartErrChan <- err
207-
return
208-
}
209-
syncingSource, ok := watch.(source.TypedSyncingSource[request])
210-
if !ok {
211-
return
212-
}
213-
didStartSyncingSource.Store(true)
214-
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
215-
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
216-
log.Error(err, "Could not wait for Cache to sync")
217-
sourceStartErrChan <- err
218-
}
219-
}()
220-
221-
select {
222-
case err := <-sourceStartErrChan:
223-
return err
224-
case <-sourceStartCtx.Done():
225-
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
226-
return <-sourceStartErrChan
227-
}
228-
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
229-
return nil
230-
}
231-
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
232-
}
233-
})
234-
}
235-
if err := errGroup.Wait(); err != nil {
182+
if err := c.startEventSources(ctx); err != nil {
236183
return err
237184
}
238185

@@ -271,6 +218,65 @@ func (c *Controller[request]) Start(ctx context.Context) error {
271218
return nil
272219
}
273220

221+
// startEventSources launches all the sources registered with this controller and waits
222+
// for them to sync. It returns an error if any of the sources fail to start or sync.
223+
func (c *Controller[request]) startEventSources(ctx context.Context) error {
224+
errGroup := &errgroup.Group{}
225+
for _, watch := range c.startWatches {
226+
log := c.LogConstructor(nil)
227+
_, ok := watch.(interface {
228+
String() string
229+
})
230+
231+
if !ok {
232+
log = log.WithValues("source", fmt.Sprintf("%T", watch))
233+
} else {
234+
log = log.WithValues("source", fmt.Sprintf("%s", watch))
235+
}
236+
didStartSyncingSource := &atomic.Bool{}
237+
errGroup.Go(func() error {
238+
// Use a timeout for starting and syncing the source to avoid silently
239+
// blocking startup indefinitely if it doesn't come up.
240+
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
241+
defer cancel()
242+
243+
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
244+
go func() {
245+
defer close(sourceStartErrChan)
246+
log.Info("Starting EventSource")
247+
if err := watch.Start(ctx, c.Queue); err != nil {
248+
sourceStartErrChan <- err
249+
return
250+
}
251+
syncingSource, ok := watch.(source.TypedSyncingSource[request])
252+
if !ok {
253+
return
254+
}
255+
didStartSyncingSource.Store(true)
256+
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
257+
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
258+
log.Error(err, "Could not wait for Cache to sync")
259+
sourceStartErrChan <- err
260+
}
261+
}()
262+
263+
select {
264+
case err := <-sourceStartErrChan:
265+
return err
266+
case <-sourceStartCtx.Done():
267+
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
268+
return <-sourceStartErrChan
269+
}
270+
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
271+
return nil
272+
}
273+
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
274+
}
275+
})
276+
}
277+
return errGroup.Wait()
278+
}
279+
274280
// processNextWorkItem will read a single work item off the workqueue and
275281
// attempt to process it, by calling the reconcileHandler.
276282
func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {

pkg/internal/controller/controller_test.go

+117
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,123 @@ var _ = Describe("controller", func() {
383383
})
384384
})
385385

386+
Describe("startEventSources", func() {
387+
It("should return nil when no sources are provided", func() {
388+
ctx, cancel := context.WithCancel(context.Background())
389+
defer cancel()
390+
391+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{}
392+
err := ctrl.startEventSources(ctx)
393+
Expect(err).NotTo(HaveOccurred())
394+
})
395+
396+
It("should return an error if a source fails to start", func() {
397+
ctx, cancel := context.WithCancel(context.Background())
398+
defer cancel()
399+
400+
expectedErr := fmt.Errorf("failed to start source")
401+
src := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
402+
// Return the error immediately so we don't get a timeout
403+
return expectedErr
404+
})
405+
406+
// // Set a sufficiently long timeout to avoid timeouts interfering with the error being returned
407+
ctrl.CacheSyncTimeout = 5 * time.Second
408+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
409+
err := ctrl.startEventSources(ctx)
410+
Expect(err).To(Equal(expectedErr))
411+
})
412+
413+
It("should return an error if a source fails to sync", func() {
414+
ctx, cancel := context.WithCancel(context.Background())
415+
defer cancel()
416+
417+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
418+
source.Kind(&informertest.FakeInformers{Synced: ptr.To(false)}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
419+
}
420+
ctrl.Name = "test-controller"
421+
ctrl.CacheSyncTimeout = 5 * time.Second
422+
423+
err := ctrl.startEventSources(ctx)
424+
Expect(err).To(HaveOccurred())
425+
Expect(err.Error()).To(ContainSubstring("failed to wait for test-controller caches to sync"))
426+
})
427+
428+
It("should not return an error when sources start and sync successfully", func() {
429+
ctx, cancel := context.WithCancel(context.Background())
430+
defer cancel()
431+
432+
// Create a source that starts and syncs successfully
433+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{
434+
source.Kind(&informertest.FakeInformers{Synced: ptr.To(true)}, &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}),
435+
}
436+
ctrl.Name = "test-controller"
437+
ctrl.CacheSyncTimeout = 5 * time.Second
438+
439+
err := ctrl.startEventSources(ctx)
440+
Expect(err).NotTo(HaveOccurred())
441+
})
442+
443+
It("should not return an error when context is cancelled during source sync", func() {
444+
sourceCtx, sourceCancel := context.WithCancel(context.Background())
445+
defer sourceCancel()
446+
447+
ctrl.CacheSyncTimeout = 5 * time.Second
448+
449+
// Create a bisignallingSource to control the test flow
450+
src := &bisignallingSource[reconcile.Request]{
451+
startCall: make(chan workqueue.TypedRateLimitingInterface[reconcile.Request]),
452+
startDone: make(chan error, 1),
453+
waitCall: make(chan struct{}),
454+
waitDone: make(chan error, 1),
455+
}
456+
457+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{src}
458+
459+
// Start the sources in a goroutine
460+
startErrCh := make(chan error)
461+
go func() {
462+
startErrCh <- ctrl.startEventSources(sourceCtx)
463+
}()
464+
465+
// Allow source to start successfully
466+
Eventually(src.startCall).Should(Receive())
467+
src.startDone <- nil
468+
469+
// Wait for WaitForSync to be called
470+
Eventually(src.waitCall).Should(BeClosed())
471+
472+
// Return context.Canceled from WaitForSync
473+
src.waitDone <- context.Canceled
474+
475+
// Also cancel the context
476+
sourceCancel()
477+
478+
// We expect to receive the context.Canceled error
479+
err := <-startErrCh
480+
Expect(err).To(MatchError(context.Canceled))
481+
})
482+
483+
It("should timeout if source Start blocks for too long", func() {
484+
ctx, cancel := context.WithCancel(context.Background())
485+
defer cancel()
486+
487+
ctrl.CacheSyncTimeout = 1 * time.Millisecond
488+
489+
// Create a source that blocks forever in Start
490+
blockingSrc := source.Func(func(ctx context.Context, _ workqueue.TypedRateLimitingInterface[reconcile.Request]) error {
491+
<-ctx.Done()
492+
return ctx.Err()
493+
})
494+
495+
ctrl.startWatches = []source.TypedSource[reconcile.Request]{blockingSrc}
496+
497+
err := ctrl.startEventSources(ctx)
498+
Expect(err).To(HaveOccurred())
499+
Expect(err.Error()).To(ContainSubstring("timed out waiting for source"))
500+
})
501+
})
502+
386503
Describe("Processing queue items from a Controller", func() {
387504
It("should call Reconciler if an item is enqueued", func() {
388505
ctx, cancel := context.WithCancel(context.Background())

0 commit comments

Comments
 (0)