forked from openshift/insights-operator
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcontroller.go
478 lines (420 loc) · 16.5 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
package status
import (
"context"
"encoding/json"
"fmt"
"os"
"reflect"
"sort"
"strings"
"sync"
"time"
"golang.org/x/time/rate"
configv1 "github.com/openshift/api/config/v1"
configv1client "github.com/openshift/client-go/config/clientset/versioned/typed/config/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"github.com/openshift/insights-operator/pkg/config/configobserver"
"github.com/openshift/insights-operator/pkg/controllerstatus"
"github.com/openshift/insights-operator/pkg/ocm"
"github.com/openshift/insights-operator/pkg/ocm/clustertransfer"
"github.com/openshift/insights-operator/pkg/ocm/sca"
)
const (
// How many upload failures in a row we tolerate before starting reporting
// as InsightsUploadDegraded
uploadFailuresCountThreshold = 5
insightsAvailableMessage = "Insights works as expected"
)
type Reported struct {
LastReportTime metav1.Time `json:"lastReportTime"`
}
// Controller is the type responsible for managing the statusMessage of the operator according to the statusMessage of the sources.
// Sources come from different major parts of the codebase, for the purpose of communicating their statusMessage with the controller.
type Controller struct {
name string
namespace string
client configv1client.ConfigV1Interface
statusCh chan struct{}
configurator configobserver.Configurator
sources map[string]controllerstatus.StatusController
reported Reported
start time.Time
ctrlStatus *controllerStatus
lock sync.Mutex
}
// NewController creates a statusMessage controller, responsible for monitoring the operators statusMessage and updating its cluster statusMessage accordingly.
func NewController(client configv1client.ConfigV1Interface, configurator configobserver.Configurator, namespace string) *Controller {
c := &Controller{
name: "insights",
statusCh: make(chan struct{}, 1),
configurator: configurator,
client: client,
namespace: namespace,
sources: make(map[string]controllerstatus.StatusController),
ctrlStatus: newControllerStatus(),
}
return c
}
func (c *Controller) triggerStatusUpdate() {
select {
case c.statusCh <- struct{}{}:
default:
}
}
func (c *Controller) controllerStartTime() time.Time {
c.lock.Lock()
defer c.lock.Unlock()
if c.start.IsZero() {
c.start = time.Now()
}
return c.start
}
// LastReportedTime provides the last reported time in a thread-safe way.
func (c *Controller) LastReportedTime() time.Time {
c.lock.Lock()
defer c.lock.Unlock()
return c.reported.LastReportTime.Time
}
// SetLastReportedTime sets the last reported time in a thread-safe way.
func (c *Controller) SetLastReportedTime(at time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
if c.reported.LastReportTime.IsZero() {
klog.V(2).Infof("Initializing last reported time to %s", at.UTC().Format(time.RFC3339))
}
c.reported.LastReportTime.Time = at
c.triggerStatusUpdate()
}
// AddSources adds sources in a thread-safe way.
// A source is used to monitor parts of the operator.
func (c *Controller) AddSources(sources ...controllerstatus.StatusController) {
c.lock.Lock()
defer c.lock.Unlock()
for i := range sources {
source := sources[i]
c.sources[source.Name()] = source
}
}
// Sources provides the sources in a thread-safe way.
// A source is used to monitor parts of the operator.
func (c *Controller) Sources() map[string]controllerstatus.StatusController {
c.lock.Lock()
defer c.lock.Unlock()
return c.sources
}
func (c *Controller) Source(name string) controllerstatus.StatusController {
c.lock.Lock()
defer c.lock.Unlock()
return c.sources[name]
}
func (c *Controller) merge(clusterOperator *configv1.ClusterOperator) *configv1.ClusterOperator {
// prime the object if it does not exist
if clusterOperator == nil {
clusterOperator = newClusterOperator(c.name, nil)
}
// make sure to start a clean status controller
c.ctrlStatus.reset()
// calculate the current controller state
allReady, lastTransition := c.currentControllerStatus()
clusterOperator = clusterOperator.DeepCopy()
now := time.Now()
if len(c.namespace) > 0 {
clusterOperator.Status.RelatedObjects = relatedObjects(c.namespace)
}
isInitializing := !allReady && now.Sub(c.controllerStartTime()) < 3*time.Minute
// cluster operator conditions
cs := newConditions(&clusterOperator.Status, metav1.Time{Time: now})
c.updateControllerConditions(cs, isInitializing, lastTransition)
updateControllerConditionsByStatus(cs, c.ctrlStatus, isInitializing)
// all status conditions from conditions to cluster operator
clusterOperator.Status.Conditions = cs.entries()
if release := os.Getenv("RELEASE_VERSION"); len(release) > 0 {
clusterOperator.Status.Versions = []configv1.OperandVersion{
{Name: "operator", Version: release},
}
}
reported := Reported{LastReportTime: metav1.Time{Time: c.LastReportedTime()}}
if data, err := json.Marshal(reported); err != nil {
klog.Errorf("Unable to marshal status extension: %v", err)
} else {
clusterOperator.Status.Extension.Raw = data
}
return clusterOperator
}
// calculate the current controller status based on its given sources
func (c *Controller) currentControllerStatus() (allReady bool, lastTransition time.Time) { //nolint: gocyclo
var errorReason string
var errs []string
allReady = true
for name, source := range c.Sources() {
summary, ready := source.CurrentStatus()
if !ready {
klog.V(4).Infof("Source %s %T is not ready", name, source)
allReady = false
continue
}
if summary.Healthy {
continue
}
if len(summary.Message) == 0 {
klog.Errorf("Programmer error: status source %s %T reported an empty message: %#v", name, source, summary)
continue
}
degradingFailure := false
if summary.Operation.Name == controllerstatus.Uploading.Name {
if summary.Count < uploadFailuresCountThreshold {
klog.V(4).Infof("Number of last upload failures %d lower than threshold %d. Not marking as degraded.",
summary.Count, uploadFailuresCountThreshold)
} else {
degradingFailure = true
klog.V(4).Infof("Number of last upload failures %d exceeded the threshold %d. Marking as degraded.",
summary.Count, uploadFailuresCountThreshold)
}
c.ctrlStatus.setStatus(UploadStatus, summary.Reason, summary.Message)
} else if summary.Operation.Name == controllerstatus.DownloadingReport.Name {
klog.V(4).Info("Failed to download Insights report")
c.ctrlStatus.setStatus(DownloadStatus, summary.Reason, summary.Message)
} else if summary.Operation.Name == controllerstatus.PullingSCACerts.Name {
// mark as degraded only in case of HTTP 500 and higher
if summary.Operation.HTTPStatusCode >= 500 {
klog.V(4).Infof("Failed to download the SCA certs within the threshold %d with exponential backoff. Marking as degraded.",
ocm.OCMAPIFailureCountThreshold)
degradingFailure = true
}
} else if summary.Operation.Name == controllerstatus.PullingClusterTransfer.Name {
// mark as degraded only in case of HTTP 500 and higher
if summary.Operation.HTTPStatusCode >= 500 {
klog.V(4).Infof("Failed to pull the cluster transfer object within the threshold %d with exponential backoff. Marking as degraded.",
ocm.OCMAPIFailureCountThreshold)
degradingFailure = true
}
}
if degradingFailure {
errorReason = summary.Reason
errs = append(errs, summary.Message)
}
if lastTransition.Before(summary.LastTransitionTime) {
lastTransition = summary.LastTransitionTime
}
}
// handling errors
errorReason, errorMessage := handleControllerStatusError(errs, errorReason)
if errorReason != "" || errorMessage != "" {
c.ctrlStatus.setStatus(ErrorStatus, errorReason, errorMessage)
}
// disabled state only when it's disabled by config. It means that gathering will not happen
if !c.configurator.Config().Report {
c.ctrlStatus.setStatus(DisabledStatus, "Disabled", "Health reporting is disabled")
}
return allReady, lastTransition
}
// Start starts the periodic checking of sources.
func (c *Controller) Start(ctx context.Context) error {
if err := c.updateStatus(ctx, true); err != nil {
return err
}
limiter := rate.NewLimiter(rate.Every(30*time.Second), 2)
go wait.Until(func() {
timer := time.NewTicker(2 * time.Minute)
defer timer.Stop()
for {
select {
case <-ctx.Done():
case <-timer.C:
err := limiter.Wait(ctx)
if err != nil {
klog.Errorf("Limiter error by timer: %v", err)
}
case <-c.statusCh:
err := limiter.Wait(ctx)
if err != nil {
klog.Errorf("Limiter error by status: %v", err)
}
}
if err := c.updateStatus(ctx, false); err != nil {
klog.Errorf("Unable to write cluster operator status: %v", err)
}
}
}, time.Second, ctx.Done())
return nil
}
func (c *Controller) updateStatus(ctx context.Context, initial bool) error {
existing, err := c.client.ClusterOperators().Get(ctx, c.name, metav1.GetOptions{})
if err != nil {
if !errors.IsNotFound(err) {
return err
}
existing = nil
}
if initial {
if existing != nil {
var reported Reported
if len(existing.Status.Extension.Raw) > 0 {
if err := json.Unmarshal(existing.Status.Extension.Raw, &reported); err != nil { //nolint: govet
klog.Errorf("The initial operator extension status is invalid: %v", err)
}
}
c.SetLastReportedTime(reported.LastReportTime.Time.UTC())
cs := newConditions(&existing.Status, metav1.Now())
if con := cs.findCondition(configv1.OperatorDegraded); con == nil ||
con != nil && con.Status == configv1.ConditionFalse {
klog.Info("The initial operator extension status is healthy")
}
}
}
updatedClusterOperator := c.merge(existing)
if existing == nil {
created, err := c.client.ClusterOperators().Create(ctx, updatedClusterOperator, metav1.CreateOptions{}) //nolint: govet
if err != nil {
return err
}
updatedClusterOperator.ObjectMeta = created.ObjectMeta
updatedClusterOperator.Spec = created.Spec
} else if reflect.DeepEqual(updatedClusterOperator.Status, existing.Status) {
klog.V(4).Infof("No status update necessary, objects are identical")
return nil
}
_, err = c.client.ClusterOperators().UpdateStatus(ctx, updatedClusterOperator, metav1.UpdateOptions{})
return err
}
// update the cluster controller status conditions
func (c *Controller) updateControllerConditions(cs *conditions,
isInitializing bool, lastTransition time.Time) {
if isInitializing {
// the disabled condition is optional, but set it now if we already know we're disabled
if ds := c.ctrlStatus.getStatus(DisabledStatus); ds != nil {
cs.setCondition(OperatorDisabled, configv1.ConditionTrue, ds.reason, ds.message, metav1.Now())
}
if !cs.hasCondition(configv1.OperatorDegraded) {
cs.setCondition(configv1.OperatorDegraded, configv1.ConditionFalse, "AsExpected", "", metav1.Now())
}
}
// once we've initialized set Failing and Disabled as best we know
// handle when disabled
if ds := c.ctrlStatus.getStatus(DisabledStatus); ds != nil {
cs.setCondition(OperatorDisabled, configv1.ConditionTrue, ds.reason, ds.message, metav1.Now())
} else {
cs.setCondition(OperatorDisabled, configv1.ConditionFalse, "AsExpected", "", metav1.Now())
}
// handle when has errors
if es := c.ctrlStatus.getStatus(ErrorStatus); es != nil && !c.ctrlStatus.isDisabled() {
cs.setCondition(configv1.OperatorDegraded, configv1.ConditionTrue, es.reason, es.message, metav1.Time{Time: lastTransition})
} else {
cs.setCondition(configv1.OperatorDegraded, configv1.ConditionFalse, "AsExpected", insightsAvailableMessage, metav1.Now())
}
// handle when upload fails
if ur := c.ctrlStatus.getStatus(UploadStatus); ur != nil && !c.ctrlStatus.isDisabled() {
cs.setCondition(InsightsUploadDegraded, configv1.ConditionTrue, ur.reason, ur.message, metav1.Time{Time: lastTransition})
} else {
cs.removeCondition(InsightsUploadDegraded)
}
// handle when download fails
if ds := c.ctrlStatus.getStatus(DownloadStatus); ds != nil && !c.ctrlStatus.isDisabled() {
cs.setCondition(InsightsDownloadDegraded, configv1.ConditionTrue, ds.reason, ds.message, metav1.Time{Time: lastTransition})
} else {
cs.removeCondition(InsightsDownloadDegraded)
}
c.updateControllerConditionByReason(cs, SCAAvailable, sca.ControllerName, sca.AvailableReason, isInitializing)
c.updateControllerConditionByReason(cs,
ClusterTransferAvailable,
clustertransfer.ControllerName,
clustertransfer.AvailableReason,
isInitializing)
}
func (c *Controller) updateControllerConditionByReason(cs *conditions,
condition configv1.ClusterStatusConditionType,
controllerName, reason string,
isInitializing bool) {
controller := c.Source(controllerName)
if controller == nil {
return
}
if isInitializing {
return
}
summary, ok := controller.CurrentStatus()
// no summary to read
if !ok {
return
}
if summary.Reason == reason {
cs.setCondition(condition, configv1.ConditionTrue, summary.Reason, summary.Message, metav1.Time{Time: summary.LastTransitionTime})
} else {
cs.setCondition(condition, configv1.ConditionFalse, summary.Reason, summary.Message, metav1.Time{Time: summary.LastTransitionTime})
}
}
// update the current controller state by it status
func updateControllerConditionsByStatus(cs *conditions, ctrlStatus *controllerStatus,
isInitializing bool) {
if isInitializing {
klog.V(4).Infof("The operator is still being initialized")
// if we're still starting up and some sources are not ready, initialize the conditions
// but don't update
if !cs.hasCondition(configv1.OperatorProgressing) {
cs.setCondition(configv1.OperatorProgressing, configv1.ConditionTrue, "Initializing", "Initializing the operator", metav1.Now())
}
}
if es := ctrlStatus.getStatus(ErrorStatus); es != nil && !ctrlStatus.isDisabled() {
klog.V(4).Infof("The operator has some internal errors: %s", es.message)
cs.setCondition(configv1.OperatorProgressing, configv1.ConditionFalse, "Degraded", "An error has occurred", metav1.Now())
cs.setCondition(configv1.OperatorAvailable, configv1.ConditionFalse, es.reason, es.message, metav1.Now())
cs.setCondition(configv1.OperatorUpgradeable, configv1.ConditionFalse, "InsightsNotUpgradeable", es.message, metav1.Now())
}
if ds := ctrlStatus.getStatus(DisabledStatus); ds != nil {
klog.V(4).Infof("The operator is marked as disabled")
cs.setCondition(configv1.OperatorProgressing, configv1.ConditionFalse, ds.reason, ds.message, metav1.Now())
cs.setCondition(configv1.OperatorAvailable, configv1.ConditionFalse, ds.reason, ds.message, metav1.Now())
cs.setCondition(configv1.OperatorUpgradeable, configv1.ConditionTrue, "InsightsUpgradeable",
"Insights operator can be upgraded", metav1.Now())
}
if ctrlStatus.isHealthy() {
klog.V(4).Infof("The operator is healthy")
cs.setCondition(configv1.OperatorProgressing, configv1.ConditionFalse, "AsExpected", "Monitoring the cluster", metav1.Now())
cs.setCondition(configv1.OperatorAvailable, configv1.ConditionTrue, "AsExpected", insightsAvailableMessage, metav1.Now())
cs.setCondition(configv1.OperatorUpgradeable, configv1.ConditionTrue, "InsightsUpgradeable",
"Insights operator can be upgraded", metav1.Now())
}
}
// handle the controller status error by formatting the present errors messages when needed
func handleControllerStatusError(errs []string, errorReason string) (reason, message string) {
if len(errs) > 1 {
reason = "MultipleFailures"
sort.Strings(errs)
message = fmt.Sprintf("There are multiple errors blocking progress:\n* %s", strings.Join(errs, "\n* "))
} else if len(errs) == 1 {
if len(errorReason) == 0 {
reason = "UnknownError"
}
message = errs[0]
reason = errorReason
}
return reason, message
}
// create a new cluster operator with defaults values
func newClusterOperator(name string, status *configv1.ClusterOperatorStatus) *configv1.ClusterOperator {
co := &configv1.ClusterOperator{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
if status != nil {
co.Status = *status
}
return co
}
// create the operator's related objects
func relatedObjects(namespace string) []configv1.ObjectReference {
return []configv1.ObjectReference{
{Resource: "namespaces", Name: namespace},
{Group: "apps", Resource: "deployments", Namespace: namespace, Name: "insights-operator"},
{Resource: "secrets", Namespace: "openshift-config", Name: "pull-secret"},
{Resource: "secrets", Namespace: "openshift-config", Name: "support"},
{Resource: "serviceaccounts", Namespace: namespace, Name: "gather"},
{Resource: "serviceaccounts", Namespace: namespace, Name: "operator"},
{Resource: "services", Namespace: namespace, Name: "metrics"},
{Resource: "configmaps", Namespace: namespace, Name: "service-ca-bundle"},
}
}