-
Notifications
You must be signed in to change notification settings - Fork 236
/
Copy pathbuilder.go
360 lines (309 loc) · 13.8 KB
/
builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
package controllercmd
import (
"context"
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"time"
configv1 "github.com/openshift/api/config/v1"
operatorv1alpha1 "github.com/openshift/api/operator/v1alpha1"
"github.com/openshift/library-go/pkg/authorization/hardcodedauthorizer"
"github.com/openshift/library-go/pkg/config/client"
"github.com/openshift/library-go/pkg/config/configdefaults"
leaderelectionconverter "github.com/openshift/library-go/pkg/config/leaderelection"
"github.com/openshift/library-go/pkg/config/serving"
"github.com/openshift/library-go/pkg/controller/fileobserver"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/version"
"k8s.io/apiserver/pkg/authorization/union"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
)
// StartFunc is the function to call on leader election start
type StartFunc func(context.Context, *ControllerContext) error
type ControllerContext struct {
ComponentConfig *unstructured.Unstructured
// KubeConfig provides the REST config with no content type (it will default to JSON).
// Use this config for CR resources.
KubeConfig *rest.Config
// ProtoKubeConfig provides the REST config with "application/vnd.kubernetes.protobuf,application/json" content type.
// Note that this config might not be safe for CR resources, instead it should be used for other resources.
ProtoKubeConfig *rest.Config
// EventRecorder is used to record events in controllers.
EventRecorder events.Recorder
// Server is the GenericAPIServer serving healthz checks and debug info
Server *genericapiserver.GenericAPIServer
// Namespace where the operator runs. Either specified on the command line or autodetected.
OperatorNamespace string
}
// defaultObserverInterval specifies the default interval that file observer will do rehash the files it watches and react to any changes
// in those files.
var defaultObserverInterval = 5 * time.Second
// ControllerBuilder allows the construction of an controller in optional pieces.
type ControllerBuilder struct {
kubeAPIServerConfigFile *string
clientOverrides *client.ClientConnectionOverrides
leaderElection *configv1.LeaderElection
fileObserver fileobserver.Observer
fileObserverReactorFn func(file string, action fileobserver.ActionType) error
eventRecorderOptions record.CorrelatorOptions
startFunc StartFunc
componentName string
componentNamespace string
instanceIdentity string
observerInterval time.Duration
servingInfo *configv1.HTTPServingInfo
authenticationConfig *operatorv1alpha1.DelegatedAuthentication
authorizationConfig *operatorv1alpha1.DelegatedAuthorization
healthChecks []healthz.HealthChecker
versionInfo *version.Info
// nonZeroExitFn takes a function that exit the process with non-zero code.
// This stub exists for unit test where we can check if the graceful termination work properly.
// Default function will klog.Warning(args) and os.Exit(1).
nonZeroExitFn func(args ...interface{})
}
// NewController returns a builder struct for constructing the command you want to run
func NewController(componentName string, startFunc StartFunc) *ControllerBuilder {
return &ControllerBuilder{
startFunc: startFunc,
componentName: componentName,
observerInterval: defaultObserverInterval,
nonZeroExitFn: func(args ...interface{}) {
klog.Warning(args...)
os.Exit(1)
},
}
}
// WithRestartOnChange will enable a file observer controller loop that observes changes into specified files. If a change to a file is detected,
// the specified channel will be closed (allowing to graceful shutdown for other channels).
func (b *ControllerBuilder) WithRestartOnChange(stopCh chan<- struct{}, startingFileContent map[string][]byte, files ...string) *ControllerBuilder {
if len(files) == 0 {
return b
}
if b.fileObserver == nil {
observer, err := fileobserver.NewObserver(b.observerInterval)
if err != nil {
panic(err)
}
b.fileObserver = observer
}
var once sync.Once
b.fileObserverReactorFn = func(filename string, action fileobserver.ActionType) error {
once.Do(func() {
klog.Warning(fmt.Sprintf("Restart triggered because of %s", action.String(filename)))
close(stopCh)
})
return nil
}
b.fileObserver.AddReactor(b.fileObserverReactorFn, startingFileContent, files...)
return b
}
func (b *ControllerBuilder) WithComponentNamespace(ns string) *ControllerBuilder {
b.componentNamespace = ns
return b
}
// WithLeaderElection adds leader election options
func (b *ControllerBuilder) WithLeaderElection(leaderElection configv1.LeaderElection, defaultNamespace, defaultName string) *ControllerBuilder {
if leaderElection.Disable {
return b
}
defaulted := leaderelectionconverter.LeaderElectionDefaulting(leaderElection, defaultNamespace, defaultName)
b.leaderElection = &defaulted
return b
}
// WithVersion accepts a getting that provide binary version information that is used to report build_info information to prometheus
func (b *ControllerBuilder) WithVersion(info version.Info) *ControllerBuilder {
b.versionInfo = &info
return b
}
// WithServer adds a server that provides metrics and healthz
func (b *ControllerBuilder) WithServer(servingInfo configv1.HTTPServingInfo, authenticationConfig operatorv1alpha1.DelegatedAuthentication, authorizationConfig operatorv1alpha1.DelegatedAuthorization) *ControllerBuilder {
b.servingInfo = servingInfo.DeepCopy()
configdefaults.SetRecommendedHTTPServingInfoDefaults(b.servingInfo)
b.authenticationConfig = &authenticationConfig
b.authorizationConfig = &authorizationConfig
return b
}
// WithHealthChecks adds a list of healthchecks to the server
func (b *ControllerBuilder) WithHealthChecks(healthChecks ...healthz.HealthChecker) *ControllerBuilder {
b.healthChecks = append(b.healthChecks, healthChecks...)
return b
}
// WithKubeConfigFile sets an optional kubeconfig file. inclusterconfig will be used if filename is empty
func (b *ControllerBuilder) WithKubeConfigFile(kubeConfigFilename string, defaults *client.ClientConnectionOverrides) *ControllerBuilder {
b.kubeAPIServerConfigFile = &kubeConfigFilename
b.clientOverrides = defaults
return b
}
// WithInstanceIdentity sets the instance identity to use if you need something special. The default is just a UID which is
// usually fine for a pod.
func (b *ControllerBuilder) WithInstanceIdentity(identity string) *ControllerBuilder {
b.instanceIdentity = identity
return b
}
// WithEventRecorderOptions allows to override the default Kubernetes event recorder correlator options.
// This is needed if the binary is sending a lot of events.
// Using events.DefaultOperatorEventRecorderOptions here makes a good default for normal operator binary.
func (b *ControllerBuilder) WithEventRecorderOptions(options record.CorrelatorOptions) *ControllerBuilder {
b.eventRecorderOptions = options
return b
}
// Run starts your controller for you. It uses leader election if you asked, otherwise it directly calls you
func (b *ControllerBuilder) Run(ctx context.Context, config *unstructured.Unstructured) error {
clientConfig, err := b.getClientConfig()
if err != nil {
return err
}
if b.fileObserver != nil {
go b.fileObserver.Run(ctx.Done())
}
kubeClient := kubernetes.NewForConfigOrDie(clientConfig)
namespace, err := b.getComponentNamespace()
if err != nil {
klog.Warningf("unable to identify the current namespace for events: %v", err)
}
controllerRef, err := events.GetControllerReferenceForCurrentPod(kubeClient, namespace, nil)
if err != nil {
klog.Warningf("unable to get owner reference (falling back to namespace): %v", err)
}
eventRecorder := events.NewKubeRecorderWithOptions(kubeClient.CoreV1().Events(namespace), b.eventRecorderOptions, b.componentName, controllerRef)
utilruntime.PanicHandlers = append(utilruntime.PanicHandlers, func(r interface{}) {
eventRecorder.Warningf(fmt.Sprintf("%sPanic", strings.Title(b.componentName)), "Panic observed: %v", r)
})
// if there is file observer defined for this command, add event into default reaction function.
if b.fileObserverReactorFn != nil {
originalFileObserverReactorFn := b.fileObserverReactorFn
b.fileObserverReactorFn = func(file string, action fileobserver.ActionType) error {
eventRecorder.Warningf("OperatorRestart", "Restarted because of %s", action.String(file))
return originalFileObserverReactorFn(file, action)
}
}
// report the binary version metrics to prometheus
if b.versionInfo != nil {
buildInfo := metrics.NewGaugeVec(
&metrics.GaugeOpts{
Name: strings.Replace(namespace, "-", "_", -1) + "_build_info",
Help: "A metric with a constant '1' value labeled by major, minor, git version, git commit, git tree state, build date, Go version, " +
"and compiler from which " + b.componentName + " was built, and platform on which it is running.",
StabilityLevel: metrics.ALPHA,
},
[]string{"major", "minor", "gitVersion", "gitCommit", "gitTreeState", "buildDate", "goVersion", "compiler", "platform"},
)
legacyregistry.MustRegister(buildInfo)
buildInfo.WithLabelValues(b.versionInfo.Major, b.versionInfo.Minor, b.versionInfo.GitVersion, b.versionInfo.GitCommit, b.versionInfo.GitTreeState, b.versionInfo.BuildDate, b.versionInfo.GoVersion,
b.versionInfo.Compiler, b.versionInfo.Platform).Set(1)
klog.Infof("%s version %s-%s", b.componentName, b.versionInfo.GitVersion, b.versionInfo.GitCommit)
}
kubeConfig := ""
if b.kubeAPIServerConfigFile != nil {
kubeConfig = *b.kubeAPIServerConfigFile
}
var server *genericapiserver.GenericAPIServer
if b.servingInfo != nil {
serverConfig, err := serving.ToServerConfig(ctx, *b.servingInfo, *b.authenticationConfig, *b.authorizationConfig, kubeConfig)
if err != nil {
return err
}
serverConfig.Authorization.Authorizer = union.New(
// prefix the authorizer with the permissions for metrics scraping which are well known.
// openshift RBAC policy will always allow this user to read metrics.
hardcodedauthorizer.NewHardCodedMetricsAuthorizer(),
serverConfig.Authorization.Authorizer,
)
serverConfig.HealthzChecks = append(serverConfig.HealthzChecks, b.healthChecks...)
server, err = serverConfig.Complete(nil).New(b.componentName, genericapiserver.NewEmptyDelegate())
if err != nil {
return err
}
go func() {
if err := server.PrepareRun().Run(ctx.Done()); err != nil {
klog.Fatal(err)
}
klog.Info("server exited")
}()
}
protoConfig := rest.CopyConfig(clientConfig)
protoConfig.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json"
protoConfig.ContentType = "application/vnd.kubernetes.protobuf"
controllerContext := &ControllerContext{
ComponentConfig: config,
KubeConfig: clientConfig,
ProtoKubeConfig: protoConfig,
EventRecorder: eventRecorder,
Server: server,
OperatorNamespace: namespace,
}
if b.leaderElection == nil {
if err := b.startFunc(ctx, controllerContext); err != nil {
return err
}
return nil
}
// ensure blocking TCP connections don't block the leader election
leaderConfig := rest.CopyConfig(protoConfig)
leaderConfig.Timeout = b.leaderElection.RenewDeadline.Duration
leaderElection, err := leaderelectionconverter.ToConfigMapLeaderElection(leaderConfig, *b.leaderElection, b.componentName, b.instanceIdentity)
if err != nil {
return err
}
// 10s is the graceful termination time we give the controllers to finish their workers.
// when this time pass, we exit with non-zero code, killing all controller workers.
// NOTE: The pod must set the termination graceful time.
leaderElection.Callbacks.OnStartedLeading = b.getOnStartedLeadingFunc(controllerContext, 10*time.Second)
leaderelection.RunOrDie(ctx, leaderElection)
return nil
}
func (b ControllerBuilder) getOnStartedLeadingFunc(controllerContext *ControllerContext, gracefulTerminationDuration time.Duration) func(ctx context.Context) {
return func(ctx context.Context) {
stoppedCh := make(chan struct{})
go func() {
defer close(stoppedCh)
if err := b.startFunc(ctx, controllerContext); err != nil {
b.nonZeroExitFn(fmt.Sprintf("graceful termination failed, controllers failed with error: %v", err))
}
}()
select {
case <-ctx.Done(): // context closed means the process likely received signal to terminate
controllerContext.EventRecorder.Shutdown()
case <-stoppedCh:
// if context was not cancelled (it is not "done"), but the startFunc terminated, it means it terminated prematurely
// when this happen, it means the controllers terminated without error.
if ctx.Err() == nil {
b.nonZeroExitFn("graceful termination failed, controllers terminated prematurely")
}
}
select {
case <-time.After(gracefulTerminationDuration): // when context was closed above, give controllers extra time to terminate gracefully
b.nonZeroExitFn(fmt.Sprintf("graceful termination failed, some controllers failed to shutdown in %s", gracefulTerminationDuration))
case <-stoppedCh: // stoppedCh here means the controllers finished termination and we exit 0
}
}
}
func (b *ControllerBuilder) getComponentNamespace() (string, error) {
if len(b.componentNamespace) > 0 {
return b.componentNamespace, nil
}
nsBytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "openshift-config-managed", err
}
return string(nsBytes), nil
}
func (b *ControllerBuilder) getClientConfig() (*rest.Config, error) {
kubeconfig := ""
if b.kubeAPIServerConfigFile != nil {
kubeconfig = *b.kubeAPIServerConfigFile
}
return client.GetKubeConfigOrInClusterConfig(kubeconfig, b.clientOverrides)
}