Skip to content

✨ Add a priority queue #3014

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ issues:
- linters:
- dupl
path: _test\.go
- linters:
- revive
path: .*/internal/.*
- linters:
- unused
# Seems to incorrectly trigger on the two implementations that are only
# used through an interface and not directly..?
path: pkg/controller/priorityqueue/metrics\.go

run:
go: "1.23"
Expand Down
3 changes: 3 additions & 0 deletions .gomodcheck.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ excludedModules:
# --- test dependencies:
- github.com/onsi/ginkgo/v2
- github.com/onsi/gomega

# --- We want a newer version with generics support for this
- github.com/google/btree
73 changes: 73 additions & 0 deletions examples/priorityqueue/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
Copyright 2024 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"fmt"
"os"
"time"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/builder"
kubeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

func init() {
}

func main() {
if err := run(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}

func run() error {
log.SetLogger(zap.New())

// Setup a Manager
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{
Controller: config.Controller{UsePriorityQueue: true},
})
if err != nil {
return fmt.Errorf("failed to set up controller-manager: %w", err)
}

if err := builder.ControllerManagedBy(mgr).
For(&corev1.ConfigMap{}).
Complete(reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
log.FromContext(ctx).Info("Reconciling")
time.Sleep(10 * time.Second)

return reconcile.Result{}, nil
})); err != nil {
return fmt.Errorf("failed to set up controller: %w", err)
}

if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
return fmt.Errorf("failed to start manager: %w", err)
}

return nil
}
1 change: 1 addition & 0 deletions examples/scratch-env/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/go-openapi/swag v0.23.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/scratch-env/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/evanphx/json-patch/v5 v5.9.0
github.com/go-logr/logr v1.4.2
github.com/go-logr/zapr v1.3.0
github.com/google/btree v1.1.3
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
github.com/onsi/ginkgo/v2 v2.21.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
github.com/google/cel-go v0.22.0 h1:b3FJZxpiv1vTMo2/5RDUqAHPxkT8mmMfJIrq1llbf7g=
github.com/google/cel-go v0.22.0/go.mod h1:BuznPXXfQDpXKWQ9sPW3TzlAJN5zzFe+i9tIs0yC4s8=
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
Expand Down
8 changes: 4 additions & 4 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (blder *TypedBuilder[request]) Watches(
) *TypedBuilder[request] {
input := WatchesInput[request]{
obj: object,
handler: eventHandler,
handler: handler.WithLowPriorityWhenUnchanged(eventHandler),
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
Expand Down Expand Up @@ -317,7 +317,7 @@ func (blder *TypedBuilder[request]) doWatch() error {
}

var hdler handler.TypedEventHandler[client.Object, request]
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(&handler.EnqueueRequestForObject{}))
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
Expand All @@ -341,11 +341,11 @@ func (blder *TypedBuilder[request]) doWatch() error {
}

var hdler handler.TypedEventHandler[client.Object, request]
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.EnqueueRequestForOwner(
reflect.ValueOf(&hdler).Elem().Set(reflect.ValueOf(handler.WithLowPriorityWhenUnchanged(handler.EnqueueRequestForOwner(
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
blder.forInput.object,
opts...,
)))
))))
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
src := source.TypedKind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ type Controller struct {
// NeedLeaderElection indicates whether the controller needs to use leader election.
// Defaults to true, which means the controller will use leader election.
NeedLeaderElection *bool

// UsePriorityQueue configures the controllers queue to use the controller-runtime provided
// priority queue.
//
// Note: This flag is disabled by default until a future version. It's currently in beta.
UsePriorityQueue bool
}
12 changes: 11 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand Down Expand Up @@ -189,11 +190,20 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
}

if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
if mgr.GetControllerOptions().UsePriorityQueue {
options.RateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[request](5*time.Millisecond, 1000*time.Second)
} else {
options.RateLimiter = workqueue.DefaultTypedControllerRateLimiter[request]()
}
}

if options.NewQueue == nil {
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
if mgr.GetControllerOptions().UsePriorityQueue {
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
o.RateLimiter = rateLimiter
})
}
return workqueue.NewTypedRateLimitingQueueWithConfig(rateLimiter, workqueue.TypedRateLimitingQueueConfig[request]{
Name: controllerName,
})
Expand Down
37 changes: 37 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalcontroller "sigs.k8s.io/controller-runtime/pkg/internal/controller"
Expand Down Expand Up @@ -437,5 +438,41 @@ var _ = Describe("controller.Controller", func() {
_, ok := c.(manager.LeaderElectionRunnable)
Expect(ok).To(BeTrue())
})

It("should configure a priority queue if UsePriorityQueue is set", func() {
m, err := manager.New(cfg, manager.Options{
Controller: config.Controller{UsePriorityQueue: true},
})
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller-16", m, controller.Options{
Reconciler: rec,
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())

q := ctrl.NewQueue("foo", nil)
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeTrue())
})

It("should not configure a priority queue if UsePriorityQueue is not set", func() {
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller-17", m, controller.Options{
Reconciler: rec,
})
Expect(err).NotTo(HaveOccurred())

ctrl, ok := c.(*internalcontroller.Controller[reconcile.Request])
Expect(ok).To(BeTrue())

q := ctrl.NewQueue("foo", nil)
_, ok = q.(priorityqueue.PriorityQueue[reconcile.Request])
Expect(ok).To(BeFalse())
})
})
})
Loading
Loading