Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

router: Add option to bind ports only when ready #11768

Merged
merged 1 commit into from
Dec 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/completions/bash/openshift
Original file line number Diff line number Diff line change
Expand Up @@ -20094,6 +20094,8 @@ _openshift_infra_router()
local_nonpersistent_flags+=("--allowed-domains=")
flags+=("--as=")
local_nonpersistent_flags+=("--as=")
flags+=("--bind-ports-after-sync")
local_nonpersistent_flags+=("--bind-ports-after-sync")
flags+=("--certificate-authority=")
flags_with_completion+=("--certificate-authority")
flags_completion+=("_filedir")
Expand Down
2 changes: 2 additions & 0 deletions contrib/completions/zsh/openshift
Original file line number Diff line number Diff line change
Expand Up @@ -20255,6 +20255,8 @@ _openshift_infra_router()
local_nonpersistent_flags+=("--allowed-domains=")
flags+=("--as=")
local_nonpersistent_flags+=("--as=")
flags+=("--bind-ports-after-sync")
local_nonpersistent_flags+=("--bind-ports-after-sync")
flags+=("--certificate-authority=")
flags_with_completion+=("--certificate-authority")
flags_completion+=("_filedir")
Expand Down
4 changes: 4 additions & 0 deletions docs/man/man1/openshift-infra-router.1
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ You may restrict the set of routes exposed to a single project (with \-\-namespa
\fB\-\-as\fP=""
Username to impersonate for the operation

.PP
\fB\-\-bind\-ports\-after\-sync\fP=false
Bind ports only after route state has been synchronized

.PP
\fB\-\-certificate\-authority\fP=""
Path to a cert. file for the certificate authority
Expand Down
4 changes: 4 additions & 0 deletions images/router/haproxy/conf/haproxy-config.template
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ listen stats :1936
stats auth {{.StatsUser}}:{{.StatsPassword}}
{{ end }}

{{ if .BindPorts }}
frontend public
bind :{{env "ROUTER_SERVICE_HTTP_PORT" "80"}}
mode http
Expand Down Expand Up @@ -523,6 +524,9 @@ backend be_secure_{{$cfgIdx}}
{{ end }}{{/* end range over serviceUnitNames */}}
{{ end }}{{/* end tls==reencrypt */}}
{{ end }}{{/* end loop over routes */}}
{{ else }}
# Avoiding binding ports until routing configuration has been synchronized.
{{ end }}{{/* end bind ports after sync */}}
{{ end }}{{/* end haproxy config template */}}

{{/*--------------------------------- END OF HAPROXY CONFIG, BELOW ARE MAPPING FILES ------------------------*/}}
Expand Down
24 changes: 24 additions & 0 deletions pkg/client/cache/eventqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type EventQueue struct {
// item it refers to is explicitly deleted from the store or the
// event is read via Pop().
lastReplaceKey string
// Tracks whether the Replace() method has been called at least once.
replaceCalled bool
// Tracks the number of items queued by the last Replace() call.
replaceCount int
}

// EventQueue implements kcache.Store
Expand Down Expand Up @@ -322,6 +326,9 @@ func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) err
eq.lock.Lock()
defer eq.lock.Unlock()

eq.replaceCalled = true
eq.replaceCount = len(objects)

eq.events = map[string]watch.EventType{}
eq.queue = eq.queue[:0]

Expand All @@ -346,6 +353,23 @@ func (eq *EventQueue) Replace(objects []interface{}, resourceVersion string) err
return nil
}

// ListSuccessfulAtLeastOnce indicates whether a List operation was
// successfully completed regardless of whether any items were queued.
func (eq *EventQueue) ListSuccessfulAtLeastOnce() bool {
eq.lock.Lock()
defer eq.lock.Unlock()

return eq.replaceCalled
}

// ListCount returns how many objects were queued by the most recent List operation.
func (eq *EventQueue) ListCount() int {
eq.lock.Lock()
defer eq.lock.Unlock()

return eq.replaceCount
}

// ListConsumed indicates whether the items queued by a List/Relist
// operation have been consumed.
func (eq *EventQueue) ListConsumed() bool {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cmd/infra/router/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type TemplateRouter struct {
DefaultCertificateDir string
ExtendedValidation bool
RouterService *ktypes.NamespacedName
BindPortsAfterSync bool
}

// reloadInterval returns how often to run the router reloads. The interval
Expand All @@ -86,6 +87,7 @@ func (o *TemplateRouter) Bind(flag *pflag.FlagSet) {
flag.StringVar(&o.ReloadScript, "reload", util.Env("RELOAD_SCRIPT", ""), "The path to the reload script to use")
flag.DurationVar(&o.ReloadInterval, "interval", reloadInterval(), "Controls how often router reloads are invoked. Mutiple router reload requests are coalesced for the duration of this interval since the last reload time.")
flag.BoolVar(&o.ExtendedValidation, "extended-validation", util.Env("EXTENDED_VALIDATION", "true") == "true", "If set, then an additional extended validation step is performed on all routes admitted in by this router. Defaults to true and enables the extended validation checks.")
flag.BoolVar(&o.BindPortsAfterSync, "bind-ports-after-sync", util.Env("ROUTER_BIND_PORTS_AFTER_SYNC", "") == "true", "Bind ports only after route state has been synchronized")
}

type RouterStats struct {
Expand Down Expand Up @@ -188,6 +190,7 @@ func (o *TemplateRouterOptions) Run() error {
StatsUsername: o.StatsUsername,
StatsPassword: o.StatsPassword,
PeerService: o.RouterService,
BindPortsAfterSync: o.BindPortsAfterSync,
IncludeUDP: o.RouterSelection.IncludeUDP,
AllowWildcardRoutes: o.RouterSelection.AllowWildcardRoutes,
}
Expand Down
50 changes: 50 additions & 0 deletions pkg/router/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type RouterController struct {
endpointsListConsumed bool
filteredByNamespace bool

RoutesListSuccessfulAtLeastOnce func() bool
EndpointsListSuccessfulAtLeastOnce func() bool
RoutesListCount func() int
EndpointsListCount func() int

WatchNodes bool

Namespaces NamespaceLister
Expand All @@ -57,6 +62,51 @@ func (c *RouterController) Run() {
if c.WatchNodes {
go utilwait.Forever(c.HandleNode, 0)
}
go c.watchForFirstSync()
}

// handleFirstSync signals the router when it sees that the various
// watchers have successfully listed data from the api.
func (c *RouterController) handleFirstSync() bool {
c.lock.Lock()
defer c.lock.Unlock()

synced := c.RoutesListSuccessfulAtLeastOnce() &&
c.EndpointsListSuccessfulAtLeastOnce() &&
(c.Namespaces == nil || c.filteredByNamespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the c.Namespaces == nil check needed? Wouldn't c.filteredByNamespace suffice? I see c.filteredByNamespace set in handle namespaces on no errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is needed. If c.Namespaces == nil then c.filteredByNamespace will never be set to true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If c.Namespaces == nil, then c.HandleNamespaces() will never be called:

https://github.com/openshift/origin/blob/master/pkg/router/controller/controller.go#L51

if !synced {
return false
}

// If either of the event queues were empty after the initial
// List, the tracking listConsumed variable's default value of
// 'false' may prevent the router from reloading to indicate the
// readiness status. Set the value to 'true' to ensure that a
// reload will be performed if necessary.
if c.RoutesListCount() == 0 {
c.routesListConsumed = true
}
if c.EndpointsListCount() == 0 {
c.endpointsListConsumed = true
}
c.updateLastSyncProcessed()

err := c.Plugin.SetSyncedAtLeastOnce()
if err == nil {
return true
}
utilruntime.HandleError(err)
return false
}

// watchForFirstSync loops until the first sync has been handled.
func (c *RouterController) watchForFirstSync() {
for {
if c.handleFirstSync() {
return
}
time.Sleep(50 * time.Millisecond)
}
}

func (c *RouterController) HandleNamespaces() {
Expand Down
7 changes: 7 additions & 0 deletions pkg/router/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type fakeRouterPlugin struct {
lastSyncProcessed bool
syncedAtLeastOnce bool
}

func (p *fakeRouterPlugin) HandleRoute(t watch.EventType, route *routeapi.Route) error {
Expand All @@ -26,11 +27,17 @@ func (p *fakeRouterPlugin) HandleEndpoints(watch.EventType, *kapi.Endpoints) err
func (p *fakeRouterPlugin) HandleNamespaces(namespaces sets.String) error {
return nil
}

func (p *fakeRouterPlugin) SetLastSyncProcessed(processed bool) error {
p.lastSyncProcessed = processed
return nil
}

func (p *fakeRouterPlugin) SetSyncedAtLeastOnce() error {
p.syncedAtLeastOnce = true
return nil
}

type fakeNamespaceLister struct {
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/router/controller/extended_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,7 @@ func (p *ExtendedValidator) HandleNamespaces(namespaces sets.String) error {
func (p *ExtendedValidator) SetLastSyncProcessed(processed bool) error {
return p.plugin.SetLastSyncProcessed(processed)
}

func (p *ExtendedValidator) SetSyncedAtLeastOnce() error {
return p.plugin.SetSyncedAtLeastOnce()
}
12 changes: 12 additions & 0 deletions pkg/router/controller/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,18 @@ func (factory *RouterControllerFactory) Create(plugin router.Plugin, watchNodes
}
return eventType, obj.(*kapi.Node), nil
},
EndpointsListCount: func() int {
return endpointsEventQueue.ListCount()
},
RoutesListCount: func() int {
return routeEventQueue.ListCount()
},
EndpointsListSuccessfulAtLeastOnce: func() bool {
return endpointsEventQueue.ListSuccessfulAtLeastOnce()
},
RoutesListSuccessfulAtLeastOnce: func() bool {
return routeEventQueue.ListSuccessfulAtLeastOnce()
},
EndpointsListConsumed: func() bool {
return endpointsEventQueue.ListConsumed()
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/router/controller/host_admitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ func (p *HostAdmitter) SetLastSyncProcessed(processed bool) error {
return p.plugin.SetLastSyncProcessed(processed)
}

func (p *HostAdmitter) SetSyncedAtLeastOnce() error {
return p.plugin.SetSyncedAtLeastOnce()
}

// addRoute admits routes based on subdomain ownership - returns errors if the route is not admitted.
func (p *HostAdmitter) addRoute(route *routeapi.Route) error {
// Find displaced routes (or error if an existing route displaces us)
Expand Down
4 changes: 4 additions & 0 deletions pkg/router/controller/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,7 @@ func (a *StatusAdmitter) HandleNamespaces(namespaces sets.String) error {
func (a *StatusAdmitter) SetLastSyncProcessed(processed bool) error {
return a.plugin.SetLastSyncProcessed(processed)
}

func (a *StatusAdmitter) SetSyncedAtLeastOnce() error {
return a.plugin.SetSyncedAtLeastOnce()
}
4 changes: 4 additions & 0 deletions pkg/router/controller/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (p *fakePlugin) SetLastSyncProcessed(processed bool) error {
return fmt.Errorf("not expected")
}

func (p *fakePlugin) SetSyncedAtLeastOnce() error {
return fmt.Errorf("not expected")
}

func TestStatusNoOp(t *testing.T) {
now := nowFn()
touched := unversioned.Time{Time: now.Add(-time.Minute)}
Expand Down
4 changes: 4 additions & 0 deletions pkg/router/controller/unique_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,10 @@ func (p *UniqueHost) SetLastSyncProcessed(processed bool) error {
return p.plugin.SetLastSyncProcessed(processed)
}

func (p *UniqueHost) SetSyncedAtLeastOnce() error {
return p.plugin.SetSyncedAtLeastOnce()
}

// routeKeys returns the internal router key to use for the given Route.
func routeKeys(route *routeapi.Route) []string {
keys := make([]string, 1+len(route.Spec.AlternateBackends))
Expand Down
5 changes: 5 additions & 0 deletions pkg/router/f5/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,3 +618,8 @@ func (p *F5Plugin) HandleRoute(eventType watch.EventType,
func (p *F5Plugin) SetLastSyncProcessed(processed bool) error {
return nil
}

// No-op since f5 has its own concept of what 'ready' means
func (p *F5Plugin) SetSyncedAtLeastOnce() error {
return nil
}
1 change: 1 addition & 0 deletions pkg/router/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type Plugin interface {
HandleNamespaces(namespaces sets.String) error
HandleNode(watch.EventType, *kapi.Node) error
SetLastSyncProcessed(processed bool) error
SetSyncedAtLeastOnce() error
}
11 changes: 11 additions & 0 deletions pkg/router/template/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type TemplatePluginConfig struct {
IncludeUDP bool
AllowWildcardRoutes bool
PeerService *ktypes.NamespacedName
BindPortsAfterSync bool
}

// routerInterface controls the interaction of the plugin with the underlying router implementation
Expand Down Expand Up @@ -89,6 +90,9 @@ type routerInterface interface {

// SetSkipCommit indicates to the router whether commits should be skipped
SetSkipCommit(skipCommit bool)

// SetSyncedAtLeastOnce indicates to the router that state has been read from the api at least once
SetSyncedAtLeastOnce()
}

func env(name, defaultValue string) string {
Expand Down Expand Up @@ -143,6 +147,7 @@ func NewTemplatePlugin(cfg TemplatePluginConfig, lookupSvc ServiceLookup) (*Temp
statsPort: cfg.StatsPort,
allowWildcardRoutes: cfg.AllowWildcardRoutes,
peerEndpointsKey: peerKey,
bindPortsAfterSync: cfg.BindPortsAfterSync,
}
router, err := newTemplateRouter(templateRouterCfg)
return newDefaultTemplatePlugin(router, cfg.IncludeUDP, lookupSvc), err
Expand Down Expand Up @@ -239,6 +244,12 @@ func (p *TemplatePlugin) SetLastSyncProcessed(processed bool) error {
return nil
}

func (p *TemplatePlugin) SetSyncedAtLeastOnce() error {
p.Router.SetSyncedAtLeastOnce()
p.Router.Commit()
return nil
}

// routeKeys returns the internal router keys to use for the given Route.
// A route can have several services that it can point to, now
func routeKeys(route *routeapi.Route) ([]string, []int32) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/router/template/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ func (r *TestRouter) Commit() {
func (r *TestRouter) SetSkipCommit(skipCommit bool) {
}

func (r *TestRouter) SetSyncedAtLeastOnce() {
}

func (r *TestRouter) HasServiceUnit(key string) bool {
return false
}
Expand Down
16 changes: 16 additions & 0 deletions pkg/router/template/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ type templateRouter struct {
lock sync.Mutex
// the router should only reload when the value is false
skipCommit bool
// If true, haproxy should only bind ports when it has route and endpoint state
bindPortsAfterSync bool
// whether the router state has been read from the api at least once
syncedAtLeastOnce bool
}

// templateRouterCfg holds all configuration items required to initialize the template router
Expand All @@ -103,6 +107,7 @@ type templateRouterCfg struct {
allowWildcardRoutes bool
peerEndpointsKey string
includeUDP bool
bindPortsAfterSync bool
}

// templateConfig is a subset of the templateRouter information that should be passed to the template for generating
Expand All @@ -124,6 +129,8 @@ type templateData struct {
StatsPassword string
//port to expose stats with (if the template supports it)
StatsPort int
// whether the router should bind the default ports
BindPorts bool
}

func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
Expand Down Expand Up @@ -162,6 +169,7 @@ func newTemplateRouter(cfg templateRouterCfg) (*templateRouter, error) {
allowWildcardRoutes: cfg.allowWildcardRoutes,
peerEndpointsKey: cfg.peerEndpointsKey,
peerEndpoints: []Endpoint{},
bindPortsAfterSync: cfg.bindPortsAfterSync,

rateLimitedCommitFunction: nil,
rateLimitedCommitStopChannel: make(chan struct{}),
Expand Down Expand Up @@ -394,6 +402,7 @@ func (r *templateRouter) writeConfig() error {
StatsUser: r.statsUser,
StatsPassword: r.statsPassword,
StatsPort: r.statsPort,
BindPorts: !r.bindPortsAfterSync || r.syncedAtLeastOnce,
}
if err := template.Execute(file, data); err != nil {
file.Close()
Expand Down Expand Up @@ -729,6 +738,13 @@ func (r *templateRouter) SetSkipCommit(skipCommit bool) {
}
}

// SetSyncedAtLeastOnce indicates to the router that state has been
// read from the api.
func (r *templateRouter) SetSyncedAtLeastOnce() {
r.syncedAtLeastOnce = true
glog.V(4).Infof("Router state synchronized for the first time")
}

// HasServiceUnit attempts to retrieve a service unit for the given
// key, returning a boolean indication of whether the key is known.
func (r *templateRouter) HasServiceUnit(key string) bool {
Expand Down
Loading