Skip to content

Commit f33705e

Browse files
[release-0.20] 🐛fix(controller): support WaitForSync in custom TypedSyncingSource (#3086)
* 🐛fix(controller): use generic WaitForSync There is already support for defining `TypedSyncingSource` but the original code still checks for the original `SyncingSource` before callign `WaitForSync(ctx)` which does not work for custom typed controller. this fix should be backported to v0.19 * test --------- Co-authored-by: Tarek Sharafi <[email protected]>
1 parent 571c31a commit f33705e

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

Diff for: pkg/internal/controller/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
200200
sourceStartErrChan <- err
201201
return
202202
}
203-
syncingSource, ok := watch.(source.SyncingSource)
203+
syncingSource, ok := watch.(source.TypedSyncingSource[request])
204204
if !ok {
205205
return
206206
}

Diff for: pkg/internal/controller/controller_test.go

+76
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ import (
4646
"sigs.k8s.io/controller-runtime/pkg/source"
4747
)
4848

49+
type TestRequest struct {
50+
Key string
51+
}
52+
4953
var _ = Describe("controller", func() {
5054
var fakeReconcile *fakeReconciler
5155
var ctrl *Controller[reconcile.Request]
@@ -340,6 +344,41 @@ var _ = Describe("controller", func() {
340344
Expect(err.Error()).To(Equal("controller was started more than once. This is likely to be caused by being added to a manager multiple times"))
341345
})
342346

347+
It("should check for correct TypedSyncingSource if custom types are used", func() {
348+
queue := &controllertest.TypedQueue[TestRequest]{
349+
TypedInterface: workqueue.NewTyped[TestRequest](),
350+
}
351+
ctrl := &Controller[TestRequest]{
352+
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
353+
return queue
354+
},
355+
LogConstructor: func(*TestRequest) logr.Logger {
356+
return log.RuntimeLog.WithName("controller").WithName("test")
357+
},
358+
}
359+
ctrl.CacheSyncTimeout = time.Second
360+
src := &bisignallingSource[TestRequest]{
361+
startCall: make(chan workqueue.TypedRateLimitingInterface[TestRequest]),
362+
startDone: make(chan error, 1),
363+
waitCall: make(chan struct{}),
364+
waitDone: make(chan error, 1),
365+
}
366+
ctrl.startWatches = []source.TypedSource[TestRequest]{src}
367+
ctrl.Name = "foo"
368+
ctx, cancel := context.WithCancel(context.Background())
369+
defer cancel()
370+
startCh := make(chan error)
371+
go func() {
372+
defer GinkgoRecover()
373+
startCh <- ctrl.Start(ctx)
374+
}()
375+
Eventually(src.startCall).Should(Receive(Equal(queue)))
376+
src.startDone <- nil
377+
Eventually(src.waitCall).Should(BeClosed())
378+
src.waitDone <- nil
379+
cancel()
380+
Eventually(startCh).Should(Receive(Succeed()))
381+
})
343382
})
344383

345384
Describe("Processing queue items from a Controller", func() {
@@ -901,3 +940,40 @@ func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Conte
901940
<-ctx.Done()
902941
return nil, errors.New("GetInformer timed out")
903942
}
943+
944+
type bisignallingSource[T comparable] struct {
945+
// receives the queue that is passed to Start
946+
startCall chan workqueue.TypedRateLimitingInterface[T]
947+
// passes an error to return from Start
948+
startDone chan error
949+
// closed when WaitForSync is called
950+
waitCall chan struct{}
951+
// passes an error to return from WaitForSync
952+
waitDone chan error
953+
}
954+
955+
var _ source.TypedSyncingSource[int] = (*bisignallingSource[int])(nil)
956+
957+
func (t *bisignallingSource[T]) Start(ctx context.Context, q workqueue.TypedRateLimitingInterface[T]) error {
958+
select {
959+
case t.startCall <- q:
960+
case <-ctx.Done():
961+
return ctx.Err()
962+
}
963+
select {
964+
case err := <-t.startDone:
965+
return err
966+
case <-ctx.Done():
967+
return ctx.Err()
968+
}
969+
}
970+
971+
func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
972+
close(t.waitCall)
973+
select {
974+
case err := <-t.waitDone:
975+
return err
976+
case <-ctx.Done():
977+
return ctx.Err()
978+
}
979+
}

0 commit comments

Comments
 (0)