7
7
8
8
"github.com/golang/glog"
9
9
kapi "k8s.io/kubernetes/pkg/api"
10
+ "k8s.io/kubernetes/pkg/apis/extensions"
10
11
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
11
12
"k8s.io/kubernetes/pkg/util/sets"
12
13
utilwait "k8s.io/kubernetes/pkg/util/wait"
@@ -30,25 +31,38 @@ type RouterController struct {
30
31
NextRoute func () (watch.EventType , * routeapi.Route , error )
31
32
NextNode func () (watch.EventType , * kapi.Node , error )
32
33
NextEndpoints func () (watch.EventType , * kapi.Endpoints , error )
34
+ NextIngress func () (watch.EventType , * extensions.Ingress , error )
35
+ NextSecret func () (watch.EventType , * kapi.Secret , error )
33
36
34
37
RoutesListConsumed func () bool
35
38
EndpointsListConsumed func () bool
39
+ IngressesListConsumed func () bool
40
+ SecretsListConsumed func () bool
36
41
routesListConsumed bool
37
42
endpointsListConsumed bool
43
+ ingressesListConsumed bool
44
+ secretsListConsumed bool
38
45
filteredByNamespace bool
39
46
syncing bool
40
47
41
48
RoutesListSuccessfulAtLeastOnce func () bool
42
49
EndpointsListSuccessfulAtLeastOnce func () bool
50
+ IngressesListSuccessfulAtLeastOnce func () bool
51
+ SecretsListSuccessfulAtLeastOnce func () bool
43
52
RoutesListCount func () int
44
53
EndpointsListCount func () int
54
+ IngressesListCount func () int
55
+ SecretsListCount func () int
45
56
46
57
WatchNodes bool
47
58
48
59
Namespaces NamespaceLister
49
60
NamespaceSyncInterval time.Duration
50
61
NamespaceWaitInterval time.Duration
51
62
NamespaceRetries int
63
+
64
+ EnableIngress bool
65
+ IngressTranslator * IngressTranslator
52
66
}
53
67
54
68
// Run begins watching and syncing.
@@ -63,6 +77,10 @@ func (c *RouterController) Run() {
63
77
if c .WatchNodes {
64
78
go utilwait .Forever (c .HandleNode , 0 )
65
79
}
80
+ if c .EnableIngress {
81
+ go utilwait .Forever (c .HandleIngress , 0 )
82
+ go utilwait .Forever (c .HandleSecret , 0 )
83
+ }
66
84
go c .watchForFirstSync ()
67
85
}
68
86
@@ -74,22 +92,31 @@ func (c *RouterController) handleFirstSync() bool {
74
92
75
93
synced := c .RoutesListSuccessfulAtLeastOnce () &&
76
94
c .EndpointsListSuccessfulAtLeastOnce () &&
77
- (c .Namespaces == nil || c .filteredByNamespace )
95
+ (c .Namespaces == nil || c .filteredByNamespace ) &&
96
+ (! c .EnableIngress ||
97
+ (c .IngressesListSuccessfulAtLeastOnce () && c .SecretsListSuccessfulAtLeastOnce ()))
78
98
if ! synced {
79
99
return false
80
100
}
81
101
82
- // If either of the event queues were empty after the initial
83
- // List, the tracking listConsumed variable's default value of
84
- // 'false' may prevent the router from committing the readiness
85
- // status. Set the value to 'true' to ensure that state will be
86
- // committed if necessary.
102
+ // If any of the event queues were empty after the initial List,
103
+ // the tracking listConsumed variable's default value of 'false'
104
+ // may prevent the router from committing. Set the value to
105
+ // 'true' to ensure that state can be committed if necessary.
87
106
if c .RoutesListCount () == 0 {
88
107
c .routesListConsumed = true
89
108
}
90
109
if c .EndpointsListCount () == 0 {
91
110
c .endpointsListConsumed = true
92
111
}
112
+ if c .EnableIngress {
113
+ if c .IngressesListCount () == 0 {
114
+ c .ingressesListConsumed = true
115
+ }
116
+ if c .SecretsListCount () == 0 {
117
+ c .secretsListConsumed = true
118
+ }
119
+ }
93
120
c .commit ()
94
121
95
122
return true
@@ -109,6 +136,14 @@ func (c *RouterController) HandleNamespaces() {
109
136
for i := 0 ; i < c .NamespaceRetries ; i ++ {
110
137
namespaces , err := c .Namespaces .NamespaceNames ()
111
138
if err == nil {
139
+
140
+ // The ingress translator synchronizes access to its cache with a
141
+ // lock, so calls to it are made outside of the controller lock to
142
+ // avoid unintended interaction.
143
+ if c .EnableIngress {
144
+ c .IngressTranslator .UpdateNamespaces (namespaces )
145
+ }
146
+
112
147
c .lock .Lock ()
113
148
defer c .lock .Unlock ()
114
149
@@ -161,13 +196,7 @@ func (c *RouterController) HandleRoute() {
161
196
c .lock .Lock ()
162
197
defer c .lock .Unlock ()
163
198
164
- glog .V (4 ).Infof ("Processing Route: %s -> %s" , route .Name , route .Spec .To .Name )
165
- glog .V (4 ).Infof (" Alias: %s" , route .Spec .Host )
166
- glog .V (4 ).Infof (" Event: %s" , eventType )
167
-
168
- if err := c .Plugin .HandleRoute (eventType , route ); err != nil {
169
- utilruntime .HandleError (err )
170
- }
199
+ c .processRoute (eventType , route )
171
200
172
201
// Change the local sync state within the lock to ensure that all
173
202
// event handlers have the same view of sync state.
@@ -196,10 +225,61 @@ func (c *RouterController) HandleEndpoints() {
196
225
c .commit ()
197
226
}
198
227
228
+ // HandleIngress handles a single Ingress event and synchronizes the router backend.
229
+ func (c * RouterController ) HandleIngress () {
230
+ eventType , ingress , err := c .NextIngress ()
231
+ if err != nil {
232
+ utilruntime .HandleError (fmt .Errorf ("unable to read ingress: %v" , err ))
233
+ return
234
+ }
235
+
236
+ // The ingress translator synchronizes access to its cache with a
237
+ // lock, so calls to it are made outside of the controller lock to
238
+ // avoid unintended interaction.
239
+ events := c .IngressTranslator .TranslateIngressEvent (eventType , ingress )
240
+
241
+ c .lock .Lock ()
242
+ defer c .lock .Unlock ()
243
+
244
+ c .processIngressEvents (events )
245
+
246
+ // Change the local sync state within the lock to ensure that all
247
+ // event handlers have the same view of sync state.
248
+ c .ingressesListConsumed = c .IngressesListConsumed ()
249
+ c .commit ()
250
+ }
251
+
252
+ // HandleSecret handles a single Secret event and synchronizes the router backend.
253
+ func (c * RouterController ) HandleSecret () {
254
+ eventType , secret , err := c .NextSecret ()
255
+ if err != nil {
256
+ utilruntime .HandleError (fmt .Errorf ("unable to read secret: %v" , err ))
257
+ return
258
+
259
+ }
260
+
261
+ // The ingress translator synchronizes access to its cache with a
262
+ // lock, so calls to it are made outside of the controller lock to
263
+ // avoid unintended interaction.
264
+ events := c .IngressTranslator .TranslateSecretEvent (eventType , secret )
265
+
266
+ c .lock .Lock ()
267
+ defer c .lock .Unlock ()
268
+
269
+ c .processIngressEvents (events )
270
+
271
+ // Change the local sync state within the lock to ensure that all
272
+ // event handlers have the same view of sync state.
273
+ c .secretsListConsumed = c .SecretsListConsumed ()
274
+ c .commit ()
275
+ }
276
+
199
277
// commit notifies the plugin that it is safe to commit state.
200
278
func (c * RouterController ) commit () {
201
279
syncing := ! (c .endpointsListConsumed && c .routesListConsumed &&
202
- (c .Namespaces == nil || c .filteredByNamespace ))
280
+ (c .Namespaces == nil || c .filteredByNamespace ) &&
281
+ (! c .EnableIngress ||
282
+ (c .ingressesListConsumed && c .secretsListConsumed )))
203
283
c .logSyncState (syncing )
204
284
if syncing {
205
285
return
@@ -219,3 +299,24 @@ func (c *RouterController) logSyncState(syncing bool) {
219
299
}
220
300
}
221
301
}
302
+
303
+ // processRoute logs and propagates a route event to the plugin
304
+ func (c * RouterController ) processRoute (eventType watch.EventType , route * routeapi.Route ) {
305
+ glog .V (4 ).Infof ("Processing Route: %s/%s -> %s" , route .Namespace , route .Name , route .Spec .To .Name )
306
+ glog .V (4 ).Infof (" Alias: %s" , route .Spec .Host )
307
+ glog .V (4 ).Infof (" Event: %s" , eventType )
308
+
309
+ if err := c .Plugin .HandleRoute (eventType , route ); err != nil {
310
+ utilruntime .HandleError (err )
311
+ }
312
+ }
313
+
314
+ // processIngressEvents logs and propagates the route events resulting from an ingress or secret event
315
+ func (c * RouterController ) processIngressEvents (events []ingressRouteEvents ) {
316
+ for _ , ingressEvent := range events {
317
+ glog .V (4 ).Infof ("Processing Ingress %s" , ingressEvent .ingressKey )
318
+ for _ , routeEvent := range ingressEvent .routeEvents {
319
+ c .processRoute (routeEvent .eventType , routeEvent .route )
320
+ }
321
+ }
322
+ }
0 commit comments