7
7
8
8
"github.com/golang/glog"
9
9
kapi "k8s.io/kubernetes/pkg/api"
10
+ "k8s.io/kubernetes/pkg/apis/extensions"
10
11
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
11
12
"k8s.io/kubernetes/pkg/util/sets"
12
13
utilwait "k8s.io/kubernetes/pkg/util/wait"
@@ -30,25 +31,37 @@ type RouterController struct {
30
31
NextRoute func () (watch.EventType , * routeapi.Route , error )
31
32
NextNode func () (watch.EventType , * kapi.Node , error )
32
33
NextEndpoints func () (watch.EventType , * kapi.Endpoints , error )
34
+ NextIngress func () (watch.EventType , * extensions.Ingress , error )
35
+ NextSecret func () (watch.EventType , * kapi.Secret , error )
33
36
34
37
RoutesListConsumed func () bool
35
38
EndpointsListConsumed func () bool
39
+ IngressesListConsumed func () bool
40
+ SecretsListConsumed func () bool
36
41
routesListConsumed bool
37
42
endpointsListConsumed bool
43
+ ingressesListConsumed bool
44
+ secretsListConsumed bool
38
45
filteredByNamespace bool
39
46
syncing bool
40
47
41
48
RoutesListSuccessfulAtLeastOnce func () bool
42
49
EndpointsListSuccessfulAtLeastOnce func () bool
50
+ IngressesListSuccessfulAtLeastOnce func () bool
51
+ SecretsListSuccessfulAtLeastOnce func () bool
43
52
RoutesListCount func () int
44
53
EndpointsListCount func () int
54
+ IngressesListCount func () int
55
+ SecretsListCount func () int
45
56
46
57
WatchNodes bool
47
58
48
59
Namespaces NamespaceLister
49
60
NamespaceSyncInterval time.Duration
50
61
NamespaceWaitInterval time.Duration
51
62
NamespaceRetries int
63
+
64
+ IngressTranslator * IngressTranslator
52
65
}
53
66
54
67
// Run begins watching and syncing.
@@ -63,6 +76,8 @@ func (c *RouterController) Run() {
63
76
if c .WatchNodes {
64
77
go utilwait .Forever (c .HandleNode , 0 )
65
78
}
79
+ go utilwait .Forever (c .HandleIngress , 0 )
80
+ go utilwait .Forever (c .HandleSecret , 0 )
66
81
go c .watchForFirstSync ()
67
82
}
68
83
@@ -74,22 +89,29 @@ func (c *RouterController) handleFirstSync() bool {
74
89
75
90
synced := c .RoutesListSuccessfulAtLeastOnce () &&
76
91
c .EndpointsListSuccessfulAtLeastOnce () &&
77
- (c .Namespaces == nil || c .filteredByNamespace )
92
+ (c .Namespaces == nil || c .filteredByNamespace ) &&
93
+ c .IngressesListSuccessfulAtLeastOnce () &&
94
+ c .SecretsListSuccessfulAtLeastOnce ()
78
95
if ! synced {
79
96
return false
80
97
}
81
98
82
- // If either of the event queues were empty after the initial
83
- // List, the tracking listConsumed variable's default value of
84
- // 'false' may prevent the router from committing the readiness
85
- // status. Set the value to 'true' to ensure that state will be
86
- // committed if necessary.
99
+ // If any of the event queues were empty after the initial List,
100
+ // the tracking listConsumed variable's default value of 'false'
101
+ // may prevent the router from committing. Set the value to
102
+ // 'true' to ensure that state can be committed if necessary.
87
103
if c .RoutesListCount () == 0 {
88
104
c .routesListConsumed = true
89
105
}
90
106
if c .EndpointsListCount () == 0 {
91
107
c .endpointsListConsumed = true
92
108
}
109
+ if c .IngressesListCount () == 0 {
110
+ c .ingressesListConsumed = true
111
+ }
112
+ if c .SecretsListCount () == 0 {
113
+ c .secretsListConsumed = true
114
+ }
93
115
c .commit ()
94
116
95
117
return true
@@ -161,13 +183,7 @@ func (c *RouterController) HandleRoute() {
161
183
c .lock .Lock ()
162
184
defer c .lock .Unlock ()
163
185
164
- glog .V (4 ).Infof ("Processing Route: %s -> %s" , route .Name , route .Spec .To .Name )
165
- glog .V (4 ).Infof (" Alias: %s" , route .Spec .Host )
166
- glog .V (4 ).Infof (" Event: %s" , eventType )
167
-
168
- if err := c .Plugin .HandleRoute (eventType , route ); err != nil {
169
- utilruntime .HandleError (err )
170
- }
186
+ c .processRoute (eventType , route )
171
187
172
188
// Change the local sync state within the lock to ensure that all
173
189
// event handlers have the same view of sync state.
@@ -196,10 +212,60 @@ func (c *RouterController) HandleEndpoints() {
196
212
c .commit ()
197
213
}
198
214
215
+ // HandleIngress handles a single Ingress event and synchronizes the router backend.
216
+ func (c * RouterController ) HandleIngress () {
217
+ eventType , ingress , err := c .NextIngress ()
218
+ if err != nil {
219
+ utilruntime .HandleError (fmt .Errorf ("unable to read ingress: %v" , err ))
220
+ return
221
+ }
222
+
223
+ // The ingress translator synchronizes access to its cache with a
224
+ // lock, so calls to it are made outside of the controller lock to
225
+ // avoid unintended interaction.
226
+ events := c .IngressTranslator .TranslateIngressEvent (eventType , ingress )
227
+
228
+ c .lock .Lock ()
229
+ defer c .lock .Unlock ()
230
+
231
+ c .processIngressEvents (events )
232
+
233
+ // Change the local sync state within the lock to ensure that all
234
+ // event handlers have the same view of sync state.
235
+ c .ingressesListConsumed = c .IngressesListConsumed ()
236
+ c .commit ()
237
+ }
238
+
239
+ // HandleSecret handles a single Secret event and synchronizes the router backend.
240
+ func (c * RouterController ) HandleSecret () {
241
+ eventType , secret , err := c .NextSecret ()
242
+ if err != nil {
243
+ utilruntime .HandleError (fmt .Errorf ("unable to read secret: %v" , err ))
244
+ return
245
+
246
+ }
247
+
248
+ // The ingress translator synchronizes access to its cache with a
249
+ // lock, so calls to it are made outside of the controller lock to
250
+ // avoid unintended interaction.
251
+ events := c .IngressTranslator .TranslateSecretEvent (eventType , secret )
252
+
253
+ c .lock .Lock ()
254
+ defer c .lock .Unlock ()
255
+
256
+ c .processIngressEvents (events )
257
+
258
+ // Change the local sync state within the lock to ensure that all
259
+ // event handlers have the same view of sync state.
260
+ c .secretsListConsumed = c .SecretsListConsumed ()
261
+ c .commit ()
262
+ }
263
+
199
264
// commit notifies the plugin that it is safe to commit state.
200
265
func (c * RouterController ) commit () {
201
266
syncing := ! (c .endpointsListConsumed && c .routesListConsumed &&
202
- (c .Namespaces == nil || c .filteredByNamespace ))
267
+ (c .Namespaces == nil || c .filteredByNamespace ) &&
268
+ c .ingressesListConsumed && c .secretsListConsumed )
203
269
c .logSyncState (syncing )
204
270
if syncing {
205
271
return
@@ -219,3 +285,24 @@ func (c *RouterController) logSyncState(syncing bool) {
219
285
}
220
286
}
221
287
}
288
+
289
+ // processRoute logs and propagates a route event to the plugin
290
+ func (c * RouterController ) processRoute (eventType watch.EventType , route * routeapi.Route ) {
291
+ glog .V (4 ).Infof ("Processing Route: %s/%s -> %s" , route .Namespace , route .Name , route .Spec .To .Name )
292
+ glog .V (4 ).Infof (" Alias: %s" , route .Spec .Host )
293
+ glog .V (4 ).Infof (" Event: %s" , eventType )
294
+
295
+ if err := c .Plugin .HandleRoute (eventType , route ); err != nil {
296
+ utilruntime .HandleError (err )
297
+ }
298
+ }
299
+
300
+ // processIngressEvents logs and propagates the route events resulting from an ingress event
301
+ func (c * RouterController ) processIngressEvents (events []ingressRouteEvents ) {
302
+ for _ , ingressEvent := range events {
303
+ glog .V (4 ).Infof ("Processing Ingress %s" , ingressEvent .ingressKey )
304
+ for _ , routeEvent := range ingressEvent .routeEvents {
305
+ c .processRoute (routeEvent .eventType , routeEvent .route )
306
+ }
307
+ }
308
+ }
0 commit comments