@@ -28,16 +28,19 @@ import (
28
28
dto "github.com/prometheus/client_model/go"
29
29
appsv1 "k8s.io/api/apps/v1"
30
30
corev1 "k8s.io/api/core/v1"
31
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31
32
"k8s.io/apimachinery/pkg/types"
32
33
"k8s.io/client-go/util/workqueue"
33
34
"sigs.k8s.io/controller-runtime/pkg/cache"
34
35
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
35
36
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
37
+ "sigs.k8s.io/controller-runtime/pkg/event"
36
38
"sigs.k8s.io/controller-runtime/pkg/handler"
37
39
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
38
40
"sigs.k8s.io/controller-runtime/pkg/internal/log"
39
41
"sigs.k8s.io/controller-runtime/pkg/predicate"
40
42
"sigs.k8s.io/controller-runtime/pkg/reconcile"
43
+ "sigs.k8s.io/controller-runtime/pkg/runtime/inject"
41
44
"sigs.k8s.io/controller-runtime/pkg/source"
42
45
)
43
46
@@ -169,6 +172,83 @@ var _ = Describe("controller", func() {
169
172
close (done )
170
173
}, 10.0 )
171
174
175
+ It ("should process events from source.Channel" , func (done Done ) {
176
+ // channel to be closed when event is processed
177
+ processed := make (chan struct {})
178
+ // source channel to be injected
179
+ ch := make (chan event.GenericEvent , 1 )
180
+
181
+ ctx , cancel := context .WithCancel (context .TODO ())
182
+ defer cancel ()
183
+
184
+ // event to be sent to the channel
185
+ p := & corev1.Pod {
186
+ ObjectMeta : metav1.ObjectMeta {Name : "foo" , Namespace : "bar" },
187
+ }
188
+ evt := event.GenericEvent {
189
+ Object : p ,
190
+ }
191
+
192
+ ins := & source.Channel {Source : ch }
193
+ ins .DestBufferSize = 1
194
+ Expect (inject .StopChannelInto (ctx .Done (), ins )).To (BeTrue ())
195
+
196
+ // send the event to the channel
197
+ ch <- evt
198
+
199
+ ctrl .startWatches = []watchDescription {{
200
+ src : ins ,
201
+ handler : handler.Funcs {
202
+ GenericFunc : func (evt event.GenericEvent , q workqueue.RateLimitingInterface ) {
203
+ defer GinkgoRecover ()
204
+ close (processed )
205
+ },
206
+ },
207
+ }}
208
+
209
+ go func () {
210
+ defer GinkgoRecover ()
211
+ Expect (ctrl .Start (ctx )).To (Succeed ())
212
+ }()
213
+ <- processed
214
+ close (done )
215
+ })
216
+
217
+ It ("should error when channel is passed as a source but stop channel is not injected" , func (done Done ) {
218
+ ch := make (chan event.GenericEvent )
219
+ ctx , cancel := context .WithCancel (context .TODO ())
220
+ defer cancel ()
221
+
222
+ ins := & source.Channel {Source : ch }
223
+ ctrl .startWatches = []watchDescription {{
224
+ src : ins ,
225
+ }}
226
+
227
+ e := ctrl .Start (ctx )
228
+
229
+ Expect (e ).NotTo (BeNil ())
230
+ Expect (e .Error ()).To (ContainSubstring ("must call InjectStop on Channel before calling Start" ))
231
+ close (done )
232
+ })
233
+
234
+ It ("should error when channel source is not specified" , func (done Done ) {
235
+ ctx , cancel := context .WithCancel (context .Background ())
236
+ defer cancel ()
237
+
238
+ ins := & source.Channel {}
239
+ Expect (inject .StopChannelInto (make (<- chan struct {}), ins )).To (BeTrue ())
240
+
241
+ ctrl .startWatches = []watchDescription {{
242
+ src : & source.Channel {},
243
+ }}
244
+
245
+ e := ctrl .Start (ctx )
246
+ Expect (e ).NotTo (BeNil ())
247
+ Expect (e .Error ()).To (ContainSubstring ("must specify Channel.Source" ))
248
+
249
+ close (done )
250
+ })
251
+
172
252
It ("should call Start on sources with the appropriate EventHandler, Queue, and Predicates" , func () {
173
253
pr1 := & predicate.Funcs {}
174
254
pr2 := & predicate.Funcs {}
0 commit comments