Skip to content

Commit 47840db

Browse files
Set a route expectation to delay processing until routes are created
When a controller creates multiple routes from a single ingress it may observe arbitrary delays between the time of creation and when it observes that route in its cache. Like other controllers that use random name generation (secret controller, replica set controller) we therefore need to wait until our expectation is met. To simplify the code, we bypass server side name generation and use our own name generation to avoid racing against the server (create route, server sends event, set expectation = expectation is never cleared).
1 parent e224347 commit 47840db

File tree

6 files changed

+454
-38
lines changed

6 files changed

+454
-38
lines changed

pkg/cmd/server/bootstrappolicy/controller_policy.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func init() {
297297
},
298298
})
299299

300-
// ingress-secretref-controller
300+
// ingress-to-route-controller
301301
addControllerRole(rbac.ClusterRole{
302302
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + InfraIngressToRouteControllerServiceAccountName},
303303
Rules: []rbac.PolicyRule{

pkg/route/controller/ingress/ingress.go

+149-4
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ingress
22

33
import (
44
"fmt"
5+
"sync"
56
"time"
67

78
"github.com/golang/glog"
@@ -15,7 +16,9 @@ import (
1516
utilerrors "k8s.io/apimachinery/pkg/util/errors"
1617
"k8s.io/apimachinery/pkg/util/intstr"
1718
"k8s.io/apimachinery/pkg/util/json"
19+
utilrand "k8s.io/apimachinery/pkg/util/rand"
1820
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21+
"k8s.io/apimachinery/pkg/util/sets"
1922
"k8s.io/apimachinery/pkg/util/wait"
2023
coreinformers "k8s.io/client-go/informers/core/v1"
2124
extensionsinformers "k8s.io/client-go/informers/extensions/v1beta1"
@@ -46,9 +49,16 @@ import (
4649
// 2. For every TLS hostname that has a corresponding path rule and points to a secret
4750
// that exists, a route should exist with a valid TLS config from that secret.
4851
// 3. For every service referenced by the ingress path rule, the route should have
49-
// an update to date target port based on the service.
50-
// 4. A route owned by an ingress that no longer satisfies the first three invariants
51-
// should be deleted.
52+
// a target port based on the service.
53+
// 4. A route owned by an ingress that is not described by any of the three invariants
54+
// above should be deleted.
55+
//
56+
// The controller also relies on the use of expectations to remind itself whether there
57+
// are route creations it has not yet observed, which prevents the controller from
58+
// creating more objects than it needs. The expectations are reset when the ingress
59+
// object is modified. It is possible that expectations could leak if an ingress is
60+
// deleted and its deletion is not observed by the cache, but such leaks are only expected
61+
// if there is a bug in the informer cache which must be fixed anyway.
5262
//
5363
// Unsupported attributes:
5464
//
@@ -73,6 +83,73 @@ type Controller struct {
7383

7484
// queue is the list of namespace keys that must be synced.
7585
queue workqueue.RateLimitingInterface
86+
87+
// expectations track upcoming route creations that we have not yet observed
88+
expectations *expectations
89+
// expectationDelay controls how long the controller waits to observe its
90+
// own creates. Exposed only for testing.
91+
expectationDelay time.Duration
92+
}
93+
94+
// expectations track an upcoming change to a named resource related
95+
// to an ingress. This is a thread safe object but callers assume
96+
// responsibility for ensuring expectations do not leak.
97+
type expectations struct {
98+
lock sync.Mutex
99+
expect map[queueKey]sets.String
100+
}
101+
102+
// newExpectations returns a tracking object for upcoming events
103+
// that the controller may expect to happen.
104+
func newExpectations() *expectations {
105+
return &expectations{
106+
expect: make(map[queueKey]sets.String),
107+
}
108+
}
109+
110+
// Expect that an event will happen in the future for the given ingress
111+
// and a named resource related to that ingress.
112+
func (e *expectations) Expect(namespace, ingressName, name string) {
113+
e.lock.Lock()
114+
defer e.lock.Unlock()
115+
key := queueKey{namespace: namespace, name: ingressName}
116+
set, ok := e.expect[key]
117+
if !ok {
118+
set = sets.NewString()
119+
e.expect[key] = set
120+
}
121+
set.Insert(name)
122+
}
123+
124+
// Satisfied clears the expectation for the given resource name on an
125+
// ingress.
126+
func (e *expectations) Satisfied(namespace, ingressName, name string) {
127+
e.lock.Lock()
128+
defer e.lock.Unlock()
129+
key := queueKey{namespace: namespace, name: ingressName}
130+
set := e.expect[key]
131+
set.Delete(name)
132+
if set.Len() == 0 {
133+
delete(e.expect, key)
134+
}
135+
}
136+
137+
// Expecting returns true if the provided ingress is still waiting to
138+
// see changes.
139+
func (e *expectations) Expecting(namespace, ingressName string) bool {
140+
e.lock.Lock()
141+
defer e.lock.Unlock()
142+
key := queueKey{namespace: namespace, name: ingressName}
143+
return e.expect[key].Len() > 0
144+
}
145+
146+
// Clear indicates that all expectations for the given ingress should
147+
// be cleared.
148+
func (e *expectations) Clear(namespace, ingressName string) {
149+
e.lock.Lock()
150+
defer e.lock.Unlock()
151+
key := queueKey{namespace: namespace, name: ingressName}
152+
delete(e.expect, key)
76153
}
77154

78155
type queueKey struct {
@@ -92,6 +169,9 @@ func NewController(eventsClient kv1core.EventsGetter, client routeclient.RoutesG
92169
eventRecorder: recorder,
93170
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingress-to-route"),
94171

172+
expectations: newExpectations(),
173+
expectationDelay: 2 * time.Second,
174+
95175
client: client,
96176

97177
ingressLister: ingresses.Lister(),
@@ -186,6 +266,7 @@ func (c *Controller) processRoute(obj interface{}) {
186266
if !ok {
187267
return
188268
}
269+
c.expectations.Satisfied(t.Namespace, ingressName, t.Name)
189270
c.queue.Add(queueKey{namespace: t.Namespace, name: ingressName})
190271
default:
191272
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj))
@@ -195,6 +276,10 @@ func (c *Controller) processRoute(obj interface{}) {
195276
func (c *Controller) processIngress(obj interface{}) {
196277
switch t := obj.(type) {
197278
case *extensionsv1beta1.Ingress:
279+
// when we see a change to an ingress, reset our expectations
280+
// this also allows periodic purging of the expectation list in the event
281+
// we miss one or more events.
282+
c.expectations.Clear(t.Namespace, t.Name)
198283
c.queue.Add(queueKey{namespace: t.Namespace, name: t.Name})
199284
default:
200285
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %T", obj))
@@ -234,8 +319,10 @@ func (c *Controller) processNext() bool {
234319
}
235320
defer c.queue.Done(key)
236321

322+
glog.V(5).Infof("processing %v begin", key)
237323
err := c.sync(key.(queueKey))
238324
c.handleNamespaceErr(err, key)
325+
glog.V(5).Infof("processing %v end", key)
239326

240327
return true
241328
}
@@ -262,6 +349,12 @@ func (c *Controller) sync(key queueKey) error {
262349
}
263350
return nil
264351
}
352+
// if we are waiting to observe the result of route creations, simply delay
353+
if c.expectations.Expecting(key.namespace, key.name) {
354+
c.queue.AddAfter(key, c.expectationDelay)
355+
glog.V(5).Infof("Ingress %s/%s has unsatisfied expectations", key.namespace, key.name)
356+
return nil
357+
}
265358

266359
ingress, err := c.ingressLister.Ingresses(key.namespace).Get(key.name)
267360
if errors.IsNotFound(err) {
@@ -328,7 +421,7 @@ func (c *Controller) sync(key queueKey) error {
328421

329422
// add the new routes
330423
for _, route := range creates {
331-
if _, err := c.client.Routes(route.Namespace).Create(route); err != nil {
424+
if err := createRouteWithName(c.client, ingress, route, c.expectations); err != nil {
332425
errs = append(errs, err)
333426
}
334427
}
@@ -546,3 +639,55 @@ func referencesSecret(ingress *extensionsv1beta1.Ingress, host string) (string,
546639
}
547640
return "", false
548641
}
642+
643+
// createRouteWithName performs client side name generation so we can set a predictable expectation.
644+
// If we fail multiple times in a row we will return an error.
645+
// TODO: future optimization, check the local cache for the name first
646+
func createRouteWithName(client routeclient.RoutesGetter, ingress *extensionsv1beta1.Ingress, route *routev1.Route, expect *expectations) error {
647+
base := route.GenerateName
648+
var lastErr error
649+
// only retry a limited number of times
650+
for i := 0; i < 3; i++ {
651+
if len(base) > 0 {
652+
route.GenerateName = ""
653+
route.Name = generateRouteName(base)
654+
}
655+
656+
// Set the expectation before we talk to the server in order to
657+
// prevent racing with the route cache.
658+
expect.Expect(ingress.Namespace, ingress.Name, route.Name)
659+
660+
_, err := client.Routes(route.Namespace).Create(route)
661+
if err == nil {
662+
return nil
663+
}
664+
665+
// We either collided with another randomly generated name, or another
666+
// error between us and the server prevented observing the success
667+
// of the result. In either case we are not expecting a new route. This
668+
// is safe because expectations are an optimization to avoid churn rather
669+
// than to prevent true duplicate creation.
670+
expect.Satisfied(ingress.Namespace, ingress.Name, route.Name)
671+
672+
// if we aren't generating names (or if we got any other type of error)
673+
// return right away
674+
if len(base) == 0 || !errors.IsAlreadyExists(err) {
675+
return err
676+
}
677+
lastErr = err
678+
}
679+
return lastErr
680+
}
681+
682+
const (
683+
maxNameLength = 63
684+
randomLength = 5
685+
maxGeneratedNameLength = maxNameLength - randomLength
686+
)
687+
688+
func generateRouteName(base string) string {
689+
if len(base) > maxGeneratedNameLength {
690+
base = base[:maxGeneratedNameLength]
691+
}
692+
return fmt.Sprintf("%s%s", base, utilrand.String(randomLength))
693+
}

0 commit comments

Comments
 (0)