Skip to content

Commit 37673b1

Browse files
authored
Implement DataGather status conditions and status propagation (#805)
* Implement DataGather status conditions * propagate status to clusteroperator status * propagate the failed job to the original status controller * Improve as suggested in review
1 parent 0f38b40 commit 37673b1

File tree

7 files changed

+283
-74
lines changed

7 files changed

+283
-74
lines changed

pkg/controller/gather_commands.go

+84-47
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,16 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res
107107
return gather.RecordArchiveMetadata(mapToArray(allFunctionReports), rec, anonymizer)
108108
}
109109

110-
// GatherAndUpload runs a single gather and stores the generated archive, uploads it
111-
// and waits for the corresponding Insights analysis report.
112-
// 1. Creates the necessary configs/clients
113-
// 2. Creates the configobserver
114-
// 3. Initiates the recorder
115-
// 4. Executes a Gather
116-
// 5. Flushes the results
117-
// 6. Get the latest archive
118-
// 7. Uploads the archive
119-
// 8. Waits for the corresponding Insights analysis download
110+
// GatherAndUpload runs a single gather and stores the generated archive, uploads it.
111+
// 1. Prepare the necessary kube configs
112+
// 2. Get the corresponding "datagathers.insights.openshift.io" resource
113+
// 3. Create all the gatherers
114+
// 4. Run data gathering
115+
// 5. Recodrd the data into the Insights archive
116+
// 6. Get the latest archive and upload it
117+
// 7. Updates the status of the corresponding "datagathers.insights.openshift.io" resource continuously
120118
func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) error { // nolint: funlen, gocyclo
121-
klog.Infof("Starting insights-operator %s", version.Get().String())
122-
// these are operator clients
119+
klog.Info("Starting data gathering")
123120
kubeClient, err := kubernetes.NewForConfig(protoKubeConfig)
124121
if err != nil {
125122
return err
@@ -143,11 +140,8 @@ func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
143140
klog.Error("failed to get coresponding DataGather custom resource: %v", err)
144141
return err
145142
}
146-
updatedCR := dataGatherCR.DeepCopy()
147-
updatedCR.Status.State = insightsv1alpha1.Running
148-
updatedCR.Status.StartTime = metav1.Now()
149143

150-
dataGatherCR, err = insightClient.DataGathers().UpdateStatus(ctx, updatedCR, metav1.UpdateOptions{})
144+
dataGatherCR, err = updateDataGatherStatus(ctx, *insightClient, dataGatherCR.DeepCopy(), insightsv1alpha1.Pending, nil)
151145
if err != nil {
152146
klog.Error("failed to update coresponding DataGather custom resource: %v", err)
153147
return err
@@ -186,6 +180,11 @@ func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
186180
)
187181
uploader := insightsuploader.New(nil, insightsClient, configObserver, nil, nil, 0)
188182

183+
dataGatherCR, err = updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Running, nil)
184+
if err != nil {
185+
klog.Error("failed to update coresponding DataGather custom resource: %v", err)
186+
return err
187+
}
189188
allFunctionReports := make(map[string]gather.GathererFunctionReport)
190189
for _, gatherer := range gatherers {
191190
functionReports, err := gather.CollectAndRecordGatherer(ctx, gatherer, rec, dataGatherCR.Spec.Gatherers) // nolint: govet
@@ -197,50 +196,50 @@ func (d *GatherJob) GatherAndUpload(kubeConfig, protoKubeConfig *rest.Config) er
197196
allFunctionReports[functionReports[i].FuncName] = functionReports[i]
198197
}
199198
}
200-
err = gather.RecordArchiveMetadata(mapToArray(allFunctionReports), rec, anonymizer)
201-
if err != nil {
202-
klog.Error(err)
203-
return err
204-
}
205-
err = rec.Flush()
206-
if err != nil {
207-
klog.Error(err)
208-
return err
199+
200+
for k := range allFunctionReports {
201+
fr := allFunctionReports[k]
202+
// duration = 0 means the gatherer didn't run
203+
if fr.Duration == 0 {
204+
continue
205+
}
206+
207+
gs := status.CreateDataGatherGathererStatus(&fr)
208+
dataGatherCR.Status.Gatherers = append(dataGatherCR.Status.Gatherers, gs)
209209
}
210-
lastArchive, err := recdriver.LastArchive()
210+
211+
// record data
212+
conditions := []metav1.Condition{}
213+
lastArchive, err := record(mapToArray(allFunctionReports), rec, recdriver, anonymizer)
211214
if err != nil {
212-
klog.Error(err)
215+
conditions = append(conditions, status.DataRecordedCondition(metav1.ConditionFalse, "RecordingFailed",
216+
fmt.Sprintf("Failed to record data: %v", err)))
217+
_, recErr := updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Failed, conditions)
218+
if recErr != nil {
219+
klog.Error("data recording failed and the update of DataGaher resource status failed as well: %v", recErr)
220+
}
213221
return err
214222
}
223+
conditions = append(conditions, status.DataRecordedCondition(metav1.ConditionTrue, "AsExpected", ""))
224+
225+
// upload data
215226
insightsRequestID, err := uploader.Upload(ctx, lastArchive)
216227
if err != nil {
217228
klog.Error(err)
229+
conditions = append(conditions, status.DataUploadedCondition(metav1.ConditionFalse, "UploadFailed",
230+
fmt.Sprintf("Failed to upload data: %v", err)))
231+
_, updateErr := updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Failed, conditions)
232+
if updateErr != nil {
233+
klog.Error("data upload failed and the update of DataGaher resource status failed as well: %v", updateErr)
234+
}
218235
return err
219236
}
220237
klog.Infof("Insights archive successfully uploaded with InsightsRequestID: %s", insightsRequestID)
221238

222-
dataGatherCR.Status.FinishTime = metav1.Now()
223-
dataGatherCR.Status.State = insightsv1alpha1.Completed
224239
dataGatherCR.Status.InsightsRequestID = insightsRequestID
225-
dataGatherCR.Status.Conditions = []metav1.Condition{
226-
{
227-
Type: "DataUploaded",
228-
Status: metav1.ConditionTrue,
229-
Reason: "AsExpected",
230-
LastTransitionTime: metav1.Now(),
231-
},
232-
}
233-
for k := range allFunctionReports {
234-
fr := allFunctionReports[k]
235-
// duration = 0 means the gatherer didn't run
236-
if fr.Duration == 0 {
237-
continue
238-
}
240+
conditions = append(conditions, status.DataUploadedCondition(metav1.ConditionTrue, "AsExpected", ""))
239241

240-
gs := status.CreateDataGatherGathererStatus(&fr)
241-
dataGatherCR.Status.Gatherers = append(dataGatherCR.Status.Gatherers, gs)
242-
}
243-
_, err = insightClient.DataGathers().UpdateStatus(ctx, dataGatherCR, metav1.UpdateOptions{})
242+
_, err = updateDataGatherStatus(ctx, *insightClient, dataGatherCR, insightsv1alpha1.Completed, conditions)
244243
if err != nil {
245244
klog.Error(err)
246245
return err
@@ -256,3 +255,41 @@ func mapToArray(m map[string]gather.GathererFunctionReport) []gather.GathererFun
256255
}
257256
return a
258257
}
258+
259+
// record is a helper function recording the archive metadata as well as data.
260+
// Returns last known Insights archive and an error when recording failed.
261+
func record(functionReports []gather.GathererFunctionReport,
262+
rec *recorder.Recorder, recdriver *diskrecorder.DiskRecorder, anonymizer *anonymization.Anonymizer) (*insightsclient.Source, error) {
263+
err := gather.RecordArchiveMetadata(functionReports, rec, anonymizer)
264+
if err != nil {
265+
return nil, err
266+
}
267+
err = rec.Flush()
268+
if err != nil {
269+
return nil, err
270+
}
271+
return recdriver.LastArchive()
272+
}
273+
274+
// updateDataGatherStatus updates status' time attributes, state and conditions
275+
// of the provided DataGather resource
276+
func updateDataGatherStatus(ctx context.Context,
277+
insightsClient insightsv1alpha1cli.InsightsV1alpha1Client,
278+
dataGatherCR *insightsv1alpha1.DataGather,
279+
newState insightsv1alpha1.DataGatherState, conditions []metav1.Condition) (*insightsv1alpha1.DataGather, error) {
280+
switch newState {
281+
case insightsv1alpha1.Completed:
282+
dataGatherCR.Status.FinishTime = metav1.Now()
283+
case insightsv1alpha1.Failed:
284+
dataGatherCR.Status.FinishTime = metav1.Now()
285+
case insightsv1alpha1.Running:
286+
dataGatherCR.Status.StartTime = metav1.Now()
287+
case insightsv1alpha1.Pending:
288+
// no op
289+
}
290+
dataGatherCR.Status.State = newState
291+
if conditions != nil {
292+
dataGatherCR.Status.Conditions = append(dataGatherCR.Status.Conditions, conditions...)
293+
}
294+
return insightsClient.DataGathers().UpdateStatus(ctx, dataGatherCR, metav1.UpdateOptions{})
295+
}

pkg/controller/operator.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
197197
reportRetriever := insightsreport.NewWithTechPreview(insightsClient, secretConfigObserver, operatorClient.InsightsOperators())
198198
periodicGather = periodic.NewWithTechPreview(reportRetriever, secretConfigObserver,
199199
insightsDataGatherObserver, gatherers, kubeClient, insightClient, operatorClient.InsightsOperators())
200+
statusReporter.AddSources(periodicGather.Sources()...)
200201
go periodicGather.PeriodicPrune(ctx)
201202
}
202203

@@ -209,7 +210,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
209210
initialDelay = wait.Jitter(baseInitialDelay, 0.5)
210211
klog.Infof("Unable to check insights-operator pod status. Setting initial delay to %s", initialDelay)
211212
}
212-
go periodicGather.Run(ctx.Done(), initialDelay, insightsConfigAPIEnabled)
213+
go periodicGather.Run(ctx.Done(), initialDelay)
213214

214215
if !insightsConfigAPIEnabled {
215216
// upload results to the provided client - if no client is configured reporting

pkg/controller/periodic/periodic.go

+56-15
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type Controller struct {
5252
image string
5353
jobController *JobController
5454
pruneInterval time.Duration
55+
techPreview bool
5556
}
5657

5758
func NewWithTechPreview(
@@ -64,6 +65,8 @@ func NewWithTechPreview(
6465
insightsOperatorCLI operatorv1client.InsightsOperatorInterface,
6566
) *Controller {
6667
statuses := make(map[string]controllerstatus.StatusController)
68+
69+
statuses["insightsuploader"] = controllerstatus.New("insightsuploader")
6770
jobController := NewJobController(kubeClient)
6871
return &Controller{
6972
reportRetriever: reportRetriever,
@@ -76,6 +79,7 @@ func NewWithTechPreview(
7679
jobController: jobController,
7780
insightsOperatorCLI: insightsOperatorCLI,
7881
pruneInterval: 1 * time.Hour,
82+
techPreview: true,
7983
}
8084
}
8185

@@ -120,7 +124,7 @@ func (c *Controller) Sources() []controllerstatus.StatusController {
120124
return sources
121125
}
122126

123-
func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration, techPreview bool) {
127+
func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration) {
124128
defer utilruntime.HandleCrash()
125129
defer klog.Info("Shutting down")
126130

@@ -130,21 +134,21 @@ func (c *Controller) Run(stopCh <-chan struct{}, initialDelay time.Duration, tec
130134
case <-stopCh:
131135
return
132136
case <-time.After(initialDelay):
133-
if techPreview {
137+
if c.techPreview {
134138
c.GatherJob()
135139
} else {
136140
c.Gather()
137141
}
138142
}
139143
} else {
140-
if techPreview {
144+
if c.techPreview {
141145
c.GatherJob()
142146
} else {
143147
c.Gather()
144148
}
145149
}
146150

147-
go wait.Until(func() { c.periodicTrigger(stopCh, techPreview) }, time.Second, stopCh)
151+
go wait.Until(func() { c.periodicTrigger(stopCh) }, time.Second, stopCh)
148152

149153
<-stopCh
150154
}
@@ -217,7 +221,7 @@ func (c *Controller) Gather() {
217221

218222
// Periodically starts the gathering.
219223
// If there is an initialDelay set then it waits that much for the first gather to happen.
220-
func (c *Controller) periodicTrigger(stopCh <-chan struct{}, techPreview bool) {
224+
func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
221225
configCh, closeFn := c.secretConfigurator.ConfigChanged()
222226
defer closeFn()
223227

@@ -237,7 +241,7 @@ func (c *Controller) periodicTrigger(stopCh <-chan struct{}, techPreview bool) {
237241
klog.Infof("Gathering cluster info every %s", interval)
238242

239243
case <-time.After(interval):
240-
if techPreview {
244+
if c.techPreview {
241245
c.GatherJob()
242246
} else {
243247
c.Gather()
@@ -263,8 +267,8 @@ func (c *Controller) GatherJob() {
263267
c.image = image
264268
}
265269

266-
// create a new datagather.insights.openshift.io custom resource
267270
disabledGatherers, dp := c.createDataGatherAttributeValues()
271+
// create a new datagather.insights.openshift.io custom resource
268272
dataGatherCR, err := c.createNewDataGatherCR(ctx, disabledGatherers, dp)
269273
if err != nil {
270274
klog.Errorf("Failed to create a new DataGather resource: %v", err)
@@ -288,8 +292,19 @@ func (c *Controller) GatherJob() {
288292
klog.Error(err)
289293
}
290294
klog.Infof("Job completed %s", gj.Name)
295+
dataGatherFinished, err := c.dataGatherClient.DataGathers().Get(ctx, dataGatherCR.Name, metav1.GetOptions{})
296+
if err != nil {
297+
klog.Errorf("Failed to get DataGather resource %s: %v", dataGatherCR.Name, err)
298+
return
299+
}
300+
dataGatheredOK := c.wasDataGatherSuccessful(dataGatherFinished)
301+
if !dataGatheredOK {
302+
klog.Errorf("Last data gathering %v was not successful", dataGatherFinished.Name)
303+
return
304+
}
305+
291306
c.reportRetriever.RetrieveReport()
292-
_, err = c.copyDataGatherStatusToOperatorStatus(ctx, dataGatherCR.Name)
307+
_, err = c.copyDataGatherStatusToOperatorStatus(ctx, dataGatherFinished)
293308
if err != nil {
294309
klog.Errorf("Failed to copy the last DataGather status to \"cluster\" operator status: %v", err)
295310
return
@@ -298,18 +313,14 @@ func (c *Controller) GatherJob() {
298313
}
299314

300315
// copyDataGatherStatusToOperatorStatus gets the "cluster" "insightsoperator.operator.openshift.io" resource
301-
// and updates its status with values from the provided "dgName" "datagather.insights.openshift.io" resource.
302-
func (c *Controller) copyDataGatherStatusToOperatorStatus(ctx context.Context, dgName string) (*v1.InsightsOperator, error) {
316+
// and updates its status with values from the provided "datagather.insights.openshift.io" resource.
317+
func (c *Controller) copyDataGatherStatusToOperatorStatus(ctx context.Context,
318+
dataGather *insightsv1alpha1.DataGather) (*v1.InsightsOperator, error) {
303319
operator, err := c.insightsOperatorCLI.Get(ctx, "cluster", metav1.GetOptions{})
304320
if err != nil {
305321
return nil, err
306322
}
307323
statusToUpdate := operator.Status.DeepCopy()
308-
309-
dataGather, err := c.dataGatherClient.DataGathers().Get(ctx, dgName, metav1.GetOptions{})
310-
if err != nil {
311-
return nil, err
312-
}
313324
statusToUpdate.GatherStatus = status.DataGatherStatusToOperatorGatherStatus(&dataGather.Status)
314325
operator.Status = *statusToUpdate
315326

@@ -493,6 +504,36 @@ func (c *Controller) createDataGatherAttributeValues() ([]string, insightsv1alph
493504
return disabledGatherers, dp
494505
}
495506

507+
// wasDataGatherSuccessful reads status conditions of the provided "dataGather" "datagather.insights.openshift.io"
508+
// custom resource and checks whether the data was successfully uploaded or not and updates status accordingly
509+
func (c *Controller) wasDataGatherSuccessful(dataGather *insightsv1alpha1.DataGather) bool {
510+
var dataUploadedCon *metav1.Condition
511+
for i := range dataGather.Status.Conditions {
512+
con := dataGather.Status.Conditions[i]
513+
if con.Type == status.DataUploaded {
514+
dataUploadedCon = &con
515+
}
516+
}
517+
statusSummary := controllerstatus.Summary{
518+
Operation: controllerstatus.Uploading,
519+
Healthy: true,
520+
}
521+
if dataUploadedCon == nil {
522+
statusSummary.Healthy = false
523+
statusSummary.Count = 5
524+
statusSummary.Reason = "DataUploadedConditionNotAvailable"
525+
statusSummary.Message = fmt.Sprintf("did not find any %q condition in the %s dataGather resource",
526+
status.DataUploaded, dataGather.Name)
527+
} else if dataUploadedCon.Status == metav1.ConditionFalse {
528+
statusSummary.Healthy = false
529+
statusSummary.Count = 5
530+
statusSummary.Reason = dataUploadedCon.Reason
531+
statusSummary.Message = dataUploadedCon.Message
532+
}
533+
c.statuses["insightsuploader"].UpdateStatus(statusSummary)
534+
return statusSummary.Healthy
535+
}
536+
496537
func mapToArray(m map[string]gather.GathererFunctionReport) []gather.GathererFunctionReport {
497538
a := make([]gather.GathererFunctionReport, 0, len(m))
498539
for _, v := range m {

0 commit comments

Comments
 (0)