Skip to content

Commit 0da33d3

Browse files
committed
router: coordinate readiness check with reload
Previously, if either no routes or no endpoints were returned by the initial List operation of a router, the readiness status would not be reloaded even though state had been synchronized. This change ensures that empty initial Lists still result in a 'ready' router.
1 parent 2026b4d commit 0da33d3

File tree

6 files changed

+78
-51
lines changed

6 files changed

+78
-51
lines changed

pkg/client/cache/eventqueue.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ type EventQueue struct {
4949
// item it refers to is explicitly deleted from the store or the
5050
// event is read via Pop().
5151
lastReplaceKey string
52-
// Tracks whether the Replace() method has been called at least
53-
// once. This indicates that a List operation was successfully
54-
// completed against the api even if no items were queued.
52+
// Tracks whether the Replace() method has been called at least once.
5553
replaceCalled bool
54+
// Tracks the number of items queued by the last Replace() call.
55+
replaceCount int
5656
}
5757

5858
// EventQueue implements kcache.Store
@@ -327,6 +327,7 @@ func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) err
327327
defer eq.lock.Unlock()
328328

329329
eq.replaceCalled = true
330+
eq.replaceCount = len(objects)
330331

331332
eq.events = map[string]watch.EventType{}
332333
eq.queue = eq.queue[:0]
@@ -361,6 +362,14 @@ func (eq *EventQueue) ListSuccessfulAtLeastOnce() bool {
361362
return eq.replaceCalled
362363
}
363364

365+
// ListCount returns how many objects were queued by the most recent List operation.
366+
func (eq *EventQueue) ListCount() int {
367+
eq.lock.Lock()
368+
defer eq.lock.Unlock()
369+
370+
return eq.replaceCount
371+
}
372+
364373
// ListConsumed indicates whether the items queued by a List/Relist
365374
// operation have been consumed.
366375
func (eq *EventQueue) ListConsumed() bool {

pkg/router/controller/controller.go

+39-10
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ type RouterController struct {
3939

4040
RoutesListSuccessfulAtLeastOnce func() bool
4141
EndpointsListSuccessfulAtLeastOnce func() bool
42+
RoutesListCount func() int
43+
EndpointsListCount func() int
4244

4345
WatchNodes bool
4446

@@ -63,20 +65,47 @@ func (c *RouterController) Run() {
6365
go c.watchForFirstSync()
6466
}
6567

66-
// watchForFirstSync signals the router when it sees that the various
68+
// handleFirstSync signals the router when it sees that the various
6769
// watchers have successfully listed data from the api.
70+
func (c *RouterController) handleFirstSync() bool {
71+
c.lock.Lock()
72+
defer c.lock.Unlock()
73+
74+
synced := c.EndpointsListSuccessfulAtLeastOnce() &&
75+
c.RoutesListSuccessfulAtLeastOnce() &&
76+
(c.Namespaces == nil || c.filteredByNamespace)
77+
if !synced {
78+
return false
79+
}
80+
81+
// If either of the event queues were empty after the initial
82+
// List, the tracking listConsumed variable's default value of
83+
// 'false' will prevent the router from reloading to indicate the
84+
// readiness status. Set the value to 'true' to ensure that a
85+
// reload will be performed if necessary.
86+
if c.EndpointsListCount() == 0 {
87+
c.endpointsListConsumed = true
88+
}
89+
if c.RoutesListCount() == 0 {
90+
c.routesListConsumed = true
91+
}
92+
c.updateLastSyncProcessed()
93+
94+
err := c.Plugin.SetSyncedAtLeastOnce()
95+
if err == nil {
96+
return true
97+
}
98+
utilruntime.HandleError(err)
99+
return false
100+
}
101+
102+
// watchForFirstSync loops until the first sync has been handled.
68103
func (c *RouterController) watchForFirstSync() {
69104
for {
70-
syncedAtLeastOnce := c.RoutesListSuccessfulAtLeastOnce() && c.EndpointsListSuccessfulAtLeastOnce() &&
71-
(c.Namespaces == nil || c.filteredByNamespace)
72-
if syncedAtLeastOnce {
73-
if err := c.Plugin.SetSyncedAtLeastOnce(); err != nil {
74-
utilruntime.HandleError(err)
75-
} else {
76-
return
77-
}
105+
if c.handleFirstSync() {
106+
return
78107
}
79-
time.Sleep(1)
108+
time.Sleep(50 * time.Millisecond)
80109
}
81110
}
82111

pkg/router/controller/controller_test.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
type fakeRouterPlugin struct {
1414
lastSyncProcessed bool
15+
syncedAtLeastOnce bool
1516
}
1617

1718
func (p *fakeRouterPlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error {
@@ -32,7 +33,8 @@ func (p *fakeRouterPlugin) SetLastSyncProcessed(processed bool) error {
3233
return nil
3334
}
3435

35-
func (p *fakeRouterPlugin) SetSyncedAtLeastOnce() {
36+
func (p *fakeRouterPlugin) SetSyncedAtLeastOnce() error {
37+
p.syncedAtLeastOnce = true
3638
return nil
3739
}
3840

@@ -94,3 +96,5 @@ func TestRouterController_updateLastSyncProcessed(t *testing.T) {
9496
}
9597

9698
}
99+
100+
// TODO add test for handleFirstSync

pkg/router/controller/factory/factory.go

+6
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes
9999
}
100100
return eventType, obj.(*kapi.Node), nil
101101
},
102+
EndpointsListCount: func() int {
103+
return endpointsEventQueue.ListCount()
104+
},
105+
RoutesListCount: func() int {
106+
return routeEventQueue.ListCount()
107+
},
102108
EndpointsListSuccessfulAtLeastOnce: func() bool {
103109
return endpointsEventQueue.ListSuccessfulAtLeastOnce()
104110
},

pkg/router/template/plugin.go

+1
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ func (p *TemplatePlugin) SetLastSyncProcessed(processed bool) error {
246246

247247
func (p *TemplatePlugin) SetSyncedAtLeastOnce() error {
248248
p.Router.SetSyncedAtLeastOnce()
249+
p.Router.Commit()
249250
return nil
250251
}
251252

test/integration/router_test.go

+15-37
Original file line numberDiff line numberDiff line change
@@ -1599,7 +1599,18 @@ func waitForRouterToBecomeAvailable(host string, port int) error {
15991599

16001600
// waitForRouterToBecomeReady checks that the router is ready.
16011601
func waitForRouterToBecomeReady(host string, port int) error {
1602-
return waitForRouter(host, port, "ready")
1602+
// Need to poll because before the router becomes ready the
1603+
// readiness uri will return 401 instead of 503.
1604+
return wait.Poll(time.Millisecond*100, 30*time.Second, func() (bool, error) {
1605+
err := waitForRouter(host, port, "ready")
1606+
if err == nil {
1607+
return true, nil
1608+
} else if err == ErrUnauthenticated {
1609+
return false, nil
1610+
} else {
1611+
return false, err
1612+
}
1613+
})
16031614
}
16041615

16051616
func makeRouterRequest(t *testing.T, scheme string) (*http.Response, error) {
@@ -1648,7 +1659,7 @@ func TestRouterBindsPortsAfterSync(t *testing.T) {
16481659
t.Fatalf("Unexpected error while waiting for the router to become available: %v", err)
16491660
}
16501661

1651-
err = waitForRouterToBecomeReady(routerIP, statsPort)
1662+
err = waitForRouter(routerIP, statsPort, "ready")
16521663
if err == nil {
16531664
t.Fatalf("Router is unexpectedly ready")
16541665
} else if err != ErrUnauthenticated {
@@ -1674,41 +1685,8 @@ func TestRouterBindsPortsAfterSync(t *testing.T) {
16741685
t.Fatalf("Unable to start http server: %v", err)
16751686
}
16761687

1677-
// Create events that will allow the router to have enough state to consider itself 'ready'
1678-
httpEndpoint, err := getEndpoint(fakeMasterAndPod.PodHttpAddr)
1679-
if err != nil {
1680-
t.Fatalf("Couldn't get http endpoint: %v", err)
1681-
}
1682-
endpointEvent := &watch.Event{
1683-
Type: watch.Added,
1684-
Object: &kapi.Endpoints{
1685-
ObjectMeta: kapi.ObjectMeta{
1686-
Name: "myService",
1687-
Namespace: "default",
1688-
},
1689-
Subsets: []kapi.EndpointSubset{httpEndpoint},
1690-
},
1691-
}
1692-
routeEvent := &watch.Event{
1693-
Type: watch.Added,
1694-
Object: &routeapi.Route{
1695-
ObjectMeta: kapi.ObjectMeta{
1696-
Name: "path",
1697-
Namespace: "default",
1698-
},
1699-
Spec: routeapi.RouteSpec{
1700-
Host: "www.example.com",
1701-
Path: "/test",
1702-
To: routeapi.RouteTargetReference{
1703-
Name: "myService",
1704-
},
1705-
TLS: &routeapi.TLSConfig{},
1706-
},
1707-
},
1708-
}
1709-
sendTimeout(t, fakeMasterAndPod.EndpointChannel, eventString(endpointEvent), 30*time.Second)
1710-
sendTimeout(t, fakeMasterAndPod.RouteChannel, eventString(routeEvent), 30*time.Second)
1711-
1688+
// The router should now be able to sync with the api and become
1689+
// ready even without any routes or endopints.
17121690
if err = waitForRouterToBecomeReady(routerIP, statsPort); err != nil {
17131691
t.Fatalf("Unexpected error while waiting for router to become ready: %v", err)
17141692
}

0 commit comments

Comments
 (0)