Skip to content

Commit 955d8b6

Browse files
committed
Ensure router reload on initial sync
Previously, the router wouldn't reload HAProxy after the initial sync if the last item of the initial list of any of the watched resources didn't reach the router to trigger a commit. One possible trigger for this condition was a route specifying a host already claimed by another namespace. The router could be left in its initial state until another commit-triggering event occurred (e.g. a watch event). This change refactors the commit handling so that the plugin no longer triggers a commit directly. Instead: - the plugin tracks whether state has changed - a commit is only attempted if state has changed - assuming a sync is not in progress, the controller will trigger a commit attempt after calling one of the relevant plugin Handle methods
1 parent 4e5c579 commit 955d8b6

13 files changed

+137
-116
lines changed

pkg/router/controller/controller.go

+44-31
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type RouterController struct {
3636
routesListConsumed bool
3737
endpointsListConsumed bool
3838
filteredByNamespace bool
39+
syncing bool
3940

4041
RoutesListSuccessfulAtLeastOnce func() bool
4142
EndpointsListSuccessfulAtLeastOnce func() bool
@@ -78,25 +79,26 @@ func (c *RouterController) handleFirstSync() bool {
7879
return false
7980
}
8081

82+
err := c.Plugin.SetSyncedAtLeastOnce()
83+
if err != nil {
84+
utilruntime.HandleError(err)
85+
return false
86+
}
87+
8188
// If either of the event queues were empty after the initial
8289
// List, the tracking listConsumed variable's default value of
83-
// 'false' may prevent the router from reloading to indicate the
84-
// readiness status. Set the value to 'true' to ensure that a
85-
// reload will be performed if necessary.
90+
// 'false' may prevent the router from committing the readiness
91+
// status. Set the value to 'true' to ensure that state will be
92+
// committed if necessary.
8693
if c.RoutesListCount() == 0 {
8794
c.routesListConsumed = true
8895
}
8996
if c.EndpointsListCount() == 0 {
9097
c.endpointsListConsumed = true
9198
}
92-
c.updateLastSyncProcessed()
99+
c.commit()
93100

94-
err := c.Plugin.SetSyncedAtLeastOnce()
95-
if err == nil {
96-
return true
97-
}
98-
utilruntime.HandleError(err)
99-
return false
101+
return true
100102
}
101103

102104
// watchForFirstSync loops until the first sync has been handled.
@@ -116,16 +118,17 @@ func (c *RouterController) HandleNamespaces() {
116118
c.lock.Lock()
117119
defer c.lock.Unlock()
118120

121+
glog.V(4).Infof("Updating watched namespaces: %v", namespaces)
122+
if err := c.Plugin.HandleNamespaces(namespaces); err != nil {
123+
utilruntime.HandleError(err)
124+
}
125+
119126
// Namespace filtering is assumed to be have been
120127
// performed so long as the plugin event handler is called
121128
// at least once.
122129
c.filteredByNamespace = true
123-
c.updateLastSyncProcessed()
130+
c.commit()
124131

125-
glog.V(4).Infof("Updating watched namespaces: %v", namespaces)
126-
if err := c.Plugin.HandleNamespaces(namespaces); err != nil {
127-
utilruntime.HandleError(err)
128-
}
129132
return
130133
}
131134
utilruntime.HandleError(fmt.Errorf("unable to find namespaces for router: %v", err))
@@ -164,18 +167,18 @@ func (c *RouterController) HandleRoute() {
164167
c.lock.Lock()
165168
defer c.lock.Unlock()
166169

167-
// Change the local sync state within the lock to ensure that all
168-
// event handlers have the same view of sync state.
169-
c.routesListConsumed = c.RoutesListConsumed()
170-
c.updateLastSyncProcessed()
171-
172170
glog.V(4).Infof("Processing Route: %s -> %s", route.Name, route.Spec.To.Name)
173171
glog.V(4).Infof(" Alias: %s", route.Spec.Host)
174172
glog.V(4).Infof(" Event: %s", eventType)
175173

176174
if err := c.Plugin.HandleRoute(eventType, route); err != nil {
177175
utilruntime.HandleError(err)
178176
}
177+
178+
// Change the local sync state within the lock to ensure that all
179+
// event handlers have the same view of sync state.
180+
c.routesListConsumed = c.RoutesListConsumed()
181+
c.commit()
179182
}
180183

181184
// HandleEndpoints handles a single Endpoints event and refreshes the router backend.
@@ -189,22 +192,32 @@ func (c *RouterController) HandleEndpoints() {
189192
c.lock.Lock()
190193
defer c.lock.Unlock()
191194

192-
// Change the local sync state within the lock to ensure that all
193-
// event handlers have the same view of sync state.
194-
c.endpointsListConsumed = c.EndpointsListConsumed()
195-
c.updateLastSyncProcessed()
196-
197195
if err := c.Plugin.HandleEndpoints(eventType, endpoints); err != nil {
198196
utilruntime.HandleError(err)
199197
}
198+
199+
// Change the local sync state within the lock to ensure that all
200+
// event handlers have the same view of sync state.
201+
c.endpointsListConsumed = c.EndpointsListConsumed()
202+
c.commit()
200203
}
201204

202-
// updateLastSyncProcessed notifies the plugin if the most recent sync
203-
// of router resources has been completed.
204-
func (c *RouterController) updateLastSyncProcessed() {
205-
lastSyncProcessed := c.endpointsListConsumed && c.routesListConsumed &&
206-
(c.Namespaces == nil || c.filteredByNamespace)
207-
if err := c.Plugin.SetLastSyncProcessed(lastSyncProcessed); err != nil {
205+
// commit notifies the plugin that it is safe to commit state.
206+
func (c *RouterController) commit() {
207+
syncing := !(c.endpointsListConsumed && c.routesListConsumed &&
208+
(c.Namespaces == nil || c.filteredByNamespace))
209+
if syncing != c.syncing {
210+
c.syncing = syncing
211+
if c.syncing {
212+
glog.V(4).Infof("Router sync in progress")
213+
} else {
214+
glog.V(4).Infof("Router sync complete")
215+
}
216+
}
217+
if c.syncing {
218+
return
219+
}
220+
if err := c.Plugin.Commit(); err != nil {
208221
utilruntime.HandleError(err)
209222
}
210223
}

pkg/router/controller/controller_test.go

+18-16
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ import (
1111
)
1212

1313
type fakeRouterPlugin struct {
14-
lastSyncProcessed bool
15-
syncedAtLeastOnce bool
14+
commitRequested bool
1615
}
1716

1817
func (p *fakeRouterPlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error {
@@ -28,13 +27,12 @@ func (p *fakeRouterPlugin) HandleNamespaces(namespaces sets.String) error {
2827
return nil
2928
}
3029

31-
func (p *fakeRouterPlugin) SetLastSyncProcessed(processed bool) error {
32-
p.lastSyncProcessed = processed
30+
func (p *fakeRouterPlugin) Commit() error {
31+
p.commitRequested = true
3332
return nil
3433
}
3534

3635
func (p *fakeRouterPlugin) SetSyncedAtLeastOnce() error {
37-
p.syncedAtLeastOnce = true
3836
return nil
3937
}
4038

@@ -45,7 +43,7 @@ func (n fakeNamespaceLister) NamespaceNames() (sets.String, error) {
4543
return sets.NewString("foo"), nil
4644
}
4745

48-
func TestRouterController_updateLastSyncProcessed(t *testing.T) {
46+
func TestRouterController_commit(t *testing.T) {
4947
p := fakeRouterPlugin{}
5048
routesListConsumed := true
5149
c := RouterController{
@@ -69,30 +67,34 @@ func TestRouterController_updateLastSyncProcessed(t *testing.T) {
6967
NamespaceRetries: 1,
7068
}
7169

70+
expectedMsg := "commit not expected to have been requested"
71+
notExpectedMsg := "commit expected to have been requested"
72+
7273
// Simulate the initial sync
7374
c.HandleNamespaces()
74-
if p.lastSyncProcessed {
75-
t.Fatalf("last sync not expected to have been processed")
75+
if p.commitRequested {
76+
t.Fatalf(notExpectedMsg)
7677
}
7778
c.HandleEndpoints()
78-
if p.lastSyncProcessed {
79-
t.Fatalf("last sync not expected to have been processed")
79+
if p.commitRequested {
80+
t.Fatalf(notExpectedMsg)
8081
}
8182
c.HandleRoute()
82-
if !p.lastSyncProcessed {
83-
t.Fatalf("last sync expected to have been processed")
83+
if !p.commitRequested {
84+
t.Fatalf(expectedMsg)
8485
}
8586

8687
// Simulate a relist
88+
p.commitRequested = false
8789
routesListConsumed = false
8890
c.HandleRoute()
89-
if p.lastSyncProcessed {
90-
t.Fatalf("last sync not expected to have been processed")
91+
if p.commitRequested {
92+
t.Fatalf(notExpectedMsg)
9193
}
9294
routesListConsumed = true
9395
c.HandleRoute()
94-
if !p.lastSyncProcessed {
95-
t.Fatalf("last sync expected to have been processed")
96+
if !p.commitRequested {
97+
t.Fatalf(expectedMsg)
9698
}
9799

98100
}

pkg/router/controller/extended_validator.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ func (p *ExtendedValidator) HandleNamespaces(namespaces sets.String) error {
7979
return p.plugin.HandleNamespaces(namespaces)
8080
}
8181

82-
func (p *ExtendedValidator) SetLastSyncProcessed(processed bool) error {
83-
return p.plugin.SetLastSyncProcessed(processed)
82+
func (p *ExtendedValidator) Commit() error {
83+
return p.plugin.Commit()
8484
}
8585

8686
func (p *ExtendedValidator) SetSyncedAtLeastOnce() error {

pkg/router/controller/host_admitter.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,8 @@ func (p *HostAdmitter) HandleNamespaces(namespaces sets.String) error {
148148
return p.plugin.HandleNamespaces(namespaces)
149149
}
150150

151-
func (p *HostAdmitter) SetLastSyncProcessed(processed bool) error {
152-
return p.plugin.SetLastSyncProcessed(processed)
151+
func (p *HostAdmitter) Commit() error {
152+
return p.plugin.Commit()
153153
}
154154

155155
func (p *HostAdmitter) SetSyncedAtLeastOnce() error {

pkg/router/controller/status.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,8 @@ func (a *StatusAdmitter) HandleNamespaces(namespaces sets.String) error {
312312
return a.plugin.HandleNamespaces(namespaces)
313313
}
314314

315-
func (a *StatusAdmitter) SetLastSyncProcessed(processed bool) error {
316-
return a.plugin.SetLastSyncProcessed(processed)
315+
func (a *StatusAdmitter) Commit() error {
316+
return a.plugin.Commit()
317317
}
318318

319319
func (a *StatusAdmitter) SetSyncedAtLeastOnce() error {

pkg/router/controller/status_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (p *fakePlugin) HandleEndpoints(watch.EventType, *kapi.Endpoints) error {
3939
func (p *fakePlugin) HandleNamespaces(namespaces sets.String) error {
4040
return fmt.Errorf("not expected")
4141
}
42-
func (p *fakePlugin) SetLastSyncProcessed(processed bool) error {
42+
func (p *fakePlugin) Commit() error {
4343
return fmt.Errorf("not expected")
4444
}
4545

pkg/router/controller/unique_host.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,8 @@ func (p *UniqueHost) HandleNamespaces(namespaces sets.String) error {
255255
return p.plugin.HandleNamespaces(namespaces)
256256
}
257257

258-
func (p *UniqueHost) SetLastSyncProcessed(processed bool) error {
259-
return p.plugin.SetLastSyncProcessed(processed)
258+
func (p *UniqueHost) Commit() error {
259+
return p.plugin.Commit()
260260
}
261261

262262
func (p *UniqueHost) SetSyncedAtLeastOnce() error {

pkg/router/f5/plugin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ func (p *F5Plugin) HandleRoute(eventType watch.EventType,
630630
}
631631

632632
// No-op since f5 configuration can be updated piecemeal
633-
func (p *F5Plugin) SetLastSyncProcessed(processed bool) error {
633+
func (p *F5Plugin) Commit() error {
634634
return nil
635635
}
636636

pkg/router/interfaces.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,6 @@ type Plugin interface {
1616
// If sent, filter the list of accepted routes and endpoints to this set
1717
HandleNamespaces(namespaces sets.String) error
1818
HandleNode(watch.EventType, *kapi.Node) error
19-
SetLastSyncProcessed(processed bool) error
19+
Commit() error
2020
SetSyncedAtLeastOnce() error
2121
}

pkg/router/template/plugin.go

+8-25
Original file line numberDiff line numberDiff line change
@@ -65,17 +65,15 @@ type routerInterface interface {
6565
// FindServiceUnit finds the service with the given id.
6666
FindServiceUnit(id string) (v ServiceUnit, ok bool)
6767

68-
// AddEndpoints adds new Endpoints for the given id. Returns true if a change was made
69-
// and the state should be stored with Commit().
70-
AddEndpoints(id string, endpoints []Endpoint) bool
68+
// AddEndpoints adds new Endpoints for the given id.
69+
AddEndpoints(id string, endpoints []Endpoint)
7170
// DeleteEndpoints deletes the endpoints for the frontend with the given id.
7271
DeleteEndpoints(id string)
7372

7473
// AddRoute adds a route for the given id and the calculated host. Weight
7574
// suggests the weightage attached to it with respect to other services
76-
// pointed to by the route. Returns true if a
77-
// change was made and the state should be stored with Commit().
78-
AddRoute(id string, weight int32, route *routeapi.Route, host string) bool
75+
// pointed to by the route.
76+
AddRoute(id string, weight int32, route *routeapi.Route, host string)
7977
// RemoveRoute removes the given route
8078
RemoveRoute(route *routeapi.Route)
8179
// HasRoute indicates whether the router is configured with the given route
@@ -86,9 +84,6 @@ type routerInterface interface {
8684
// commit (persist router state + refresh the backend) that coalesces multiple changes.
8785
Commit()
8886

89-
// SetSkipCommit indicates to the router whether commits should be skipped
90-
SetSkipCommit(skipCommit bool)
91-
9287
// SetSyncedAtLeastOnce indicates to the router that state has been read from the api at least once
9388
SetSyncedAtLeastOnce()
9489
}
@@ -171,14 +166,10 @@ func (p *TemplatePlugin) HandleEndpoints(eventType watch.EventType, endpoints *k
171166
glog.V(4).Infof("Modifying endpoints for %s", key)
172167
routerEndpoints := createRouterEndpoints(endpoints, !p.IncludeUDP, p.ServiceFetcher)
173168
key := endpointsKey(endpoints)
174-
commit := p.Router.AddEndpoints(key, routerEndpoints)
175-
if commit {
176-
p.Router.Commit()
177-
}
169+
p.Router.AddEndpoints(key, routerEndpoints)
178170
case watch.Deleted:
179171
glog.V(4).Infof("Deleting endpoints for %s", key)
180172
p.Router.DeleteEndpoints(key)
181-
p.Router.Commit()
182173
}
183174

184175
return nil
@@ -206,7 +197,6 @@ func (p *TemplatePlugin) HandleRoute(eventType watch.EventType, route *routeapi.
206197
p.Router.RemoveRoute(route)
207198

208199
// Now add the route back again
209-
commit := false
210200
for i := range serviceKeys {
211201
key := serviceKeys[i]
212202
weight := weights[i]
@@ -216,16 +206,11 @@ func (p *TemplatePlugin) HandleRoute(eventType watch.EventType, route *routeapi.
216206
}
217207

218208
glog.V(4).Infof("Modifying routes for %s", key)
219-
commitRoute := p.Router.AddRoute(key, weight, route, host)
220-
commit = (map[bool]bool{true: true, false: commit})[commitRoute]
221-
}
222-
if commit {
223-
p.Router.Commit()
209+
p.Router.AddRoute(key, weight, route, host)
224210
}
225211
case watch.Deleted:
226212
glog.V(4).Infof("Deleting route %v", route)
227213
p.Router.RemoveRoute(route)
228-
p.Router.Commit()
229214
}
230215
return nil
231216
}
@@ -234,18 +219,16 @@ func (p *TemplatePlugin) HandleRoute(eventType watch.EventType, route *routeapi.
234219
// the provided namespace list.
235220
func (p *TemplatePlugin) HandleNamespaces(namespaces sets.String) error {
236221
p.Router.FilterNamespaces(namespaces)
237-
p.Router.Commit()
238222
return nil
239223
}
240224

241-
func (p *TemplatePlugin) SetLastSyncProcessed(processed bool) error {
242-
p.Router.SetSkipCommit(!processed)
225+
func (p *TemplatePlugin) Commit() error {
226+
p.Router.Commit()
243227
return nil
244228
}
245229

246230
func (p *TemplatePlugin) SetSyncedAtLeastOnce() error {
247231
p.Router.SetSyncedAtLeastOnce()
248-
p.Router.Commit()
249232
return nil
250233
}
251234

pkg/router/template/plugin_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,6 @@ func (r *TestRouter) Commit() {
245245
r.Committed = true
246246
}
247247

248-
func (r *TestRouter) SetSkipCommit(skipCommit bool) {
249-
}
250-
251248
func (r *TestRouter) SetSyncedAtLeastOnce() {
252249
}
253250

0 commit comments

Comments
 (0)