Skip to content

Commit 4870124

Browse files
author
Serhii Zakharov
authored
Implement fingerprint for records (#603)
* implement fingerprint for records * unit test gatherer producing a warning * some fixes * fixes after a review * fixes after a review * rebase * calm down linters * change the format of errors and improved logic for gatherer's metadata
1 parent 14d32dc commit 4870124

34 files changed

+782
-313
lines changed

docs/insights-archive-sample/insights-operator/gathers.json

+309-105
Large diffs are not rendered by default.

pkg/config/mock_configurator.go

+13
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ type MockConfigurator struct {
55
Conf *Controller
66
}
77

8+
// NewMockConfigurator constructs a new MockConfigurator with default config values
9+
func NewMockConfigurator(conf *Controller) *MockConfigurator {
10+
if conf == nil {
11+
conf = &Controller{}
12+
}
13+
if len(conf.Gather) == 0 {
14+
conf.Gather = []string{"ALL"}
15+
}
16+
return &MockConfigurator{
17+
Conf: conf,
18+
}
19+
}
20+
821
func (mc *MockConfigurator) Config() *Controller {
922
return mc.Conf
1023
}

pkg/controller/gather_job.go

-4
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ func (d *GatherJob) Gather(ctx context.Context, kubeConfig, protoKubeConfig *res
8484
functionReports, err := gather.CollectAndRecordGatherer(ctx, gatherer, rec, configObserver)
8585
if err != nil {
8686
klog.Errorf("unable to process gatherer %v, error: %v", gatherer.GetName(), err)
87-
functionReports = append(functionReports, gather.GathererFunctionReport{
88-
FuncName: gatherer.GetName(),
89-
Errors: []string{err.Error()},
90-
})
9187
}
9288

9389
for i := range functionReports {

pkg/controller/periodic/periodic.go

+5-6
Original file line numberDiff line numberDiff line change
@@ -135,12 +135,11 @@ func (c *Controller) Gather() {
135135
}
136136

137137
utilruntime.HandleError(fmt.Errorf("%v failed after %s with: %v", name, time.Since(start).Truncate(time.Millisecond), err))
138-
c.statuses[name].UpdateStatus(
139-
controllerstatus.Summary{
140-
Operation: controllerstatus.GatheringReport,
141-
Reason: "PeriodicGatherFailed",
142-
Message: fmt.Sprintf("Source %s could not be retrieved: %v", name, err),
143-
})
138+
c.statuses[name].UpdateStatus(controllerstatus.Summary{
139+
Operation: controllerstatus.GatheringReport,
140+
Reason: "PeriodicGatherFailed",
141+
Message: fmt.Sprintf("Source %s could not be retrieved: %v", name, err),
142+
})
144143
}()
145144
}
146145

pkg/controller/periodic/periodic_test.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package periodic
22

33
import (
4-
"context"
54
"encoding/json"
65
"fmt"
76
"testing"
@@ -17,7 +16,7 @@ import (
1716

1817
func Test_Controller_CustomPeriodGatherer(t *testing.T) {
1918
c, mockRecorder := getMocksForPeriodicTest([]gatherers.Interface{
20-
&gather.MockGatherer{CanFail: true},
19+
&gather.MockGatherer{},
2120
&gather.MockCustomPeriodGatherer{Period: 999 * time.Hour},
2221
}, 1*time.Hour)
2322

@@ -34,7 +33,7 @@ func Test_Controller_CustomPeriodGatherer(t *testing.T) {
3433

3534
func Test_Controller_Run(t *testing.T) {
3635
c, mockRecorder := getMocksForPeriodicTest([]gatherers.Interface{
37-
&gather.MockGatherer{CanFail: true},
36+
&gather.MockGatherer{},
3837
}, 1*time.Hour)
3938

4039
// No delay, 5 gatherers + metadata
@@ -65,7 +64,7 @@ func Test_Controller_Run(t *testing.T) {
6564

6665
func Test_Controller_periodicTrigger(t *testing.T) {
6766
c, mockRecorder := getMocksForPeriodicTest([]gatherers.Interface{
68-
&gather.MockGatherer{CanFail: true},
67+
&gather.MockGatherer{},
6968
}, 1*time.Hour)
7069

7170
// 1 sec interval, 5 gatherers + metadata
@@ -90,7 +89,7 @@ func Test_Controller_periodicTrigger(t *testing.T) {
9089
}
9190

9291
func Test_Controller_Sources(t *testing.T) {
93-
mockGatherer := gather.MockGatherer{CanFail: true}
92+
mockGatherer := gather.MockGatherer{}
9493
mockCustomPeriodGatherer := gather.MockCustomPeriodGathererNoPeriod{ShouldBeProcessed: true}
9594
// 1 Gatherer ==> 1 source
9695
c, _ := getMocksForPeriodicTest([]gatherers.Interface{
@@ -109,7 +108,7 @@ func Test_Controller_Sources(t *testing.T) {
109108
func Test_Controller_CustomPeriodGathererNoPeriod(t *testing.T) {
110109
mockGatherer := gather.MockCustomPeriodGathererNoPeriod{ShouldBeProcessed: true}
111110
c, mockRecorder := getMocksForPeriodicTest([]gatherers.Interface{
112-
&gather.MockGatherer{CanFail: true},
111+
&gather.MockGatherer{},
113112
&mockGatherer,
114113
}, 1*time.Hour)
115114

@@ -154,12 +153,12 @@ func Test_Controller_FailingGatherer(t *testing.T) {
154153
continue
155154
}
156155
metadataFound = true
157-
b, err := mockRecorder.Records[i].Item.Marshal(context.Background())
156+
b, err := mockRecorder.Records[i].Item.Marshal()
158157
assert.NoError(t, err)
159158
metaData := make(map[string]interface{})
160159
err = json.Unmarshal(b, &metaData)
161160
assert.NoError(t, err)
162-
assert.Len(t, metaData["status_reports"].([]interface{}), 1,
161+
assert.Len(t, metaData["status_reports"].([]interface{}), 2,
163162
fmt.Sprintf("Only one function for %s expected ", c.gatherers[0].GetName()))
164163
}
165164
assert.Truef(t, metadataFound, fmt.Sprintf("%s not found in records", recorder.MetadataRecordName))

pkg/gather/gather.go

+102-41
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// gather package contains common gathering logic for all gatherers
1+
// Package gather contains common gathering logic for all gatherers
22
package gather
33

44
import (
@@ -20,11 +20,14 @@ import (
2020
"github.com/openshift/insights-operator/pkg/insights/insightsclient"
2121
"github.com/openshift/insights-operator/pkg/record"
2222
"github.com/openshift/insights-operator/pkg/recorder"
23+
"github.com/openshift/insights-operator/pkg/types"
2324
"github.com/openshift/insights-operator/pkg/utils"
2425
)
2526

2627
// norevive
2728
const (
29+
// AllGatherersConst is used to specify in the config that we want to enable
30+
// all gathering functions from all gatherers
2831
AllGatherersConst = "ALL"
2932
)
3033

@@ -36,6 +39,7 @@ type GathererFunctionReport struct {
3639
Duration int64 `json:"duration_in_ms"`
3740
RecordsCount int `json:"records_count"`
3841
Errors []string `json:"errors"`
42+
Warnings []string `json:"warnings"`
3943
Panic interface{} `json:"panic"`
4044
}
4145

@@ -78,57 +82,114 @@ func CollectAndRecordGatherer(
7882
rec recorder.Interface,
7983
configurator configobserver.Configurator,
8084
) ([]GathererFunctionReport, error) {
85+
startTime := time.Now()
86+
reports, totalNumberOfRecords, errs := collectAndRecordGatherer(ctx, gatherer, rec, configurator)
87+
reports = append(reports, GathererFunctionReport{
88+
FuncName: gatherer.GetName(),
89+
Duration: time.Since(startTime).Milliseconds(),
90+
RecordsCount: totalNumberOfRecords,
91+
Errors: utils.ErrorsToStrings(errs),
92+
})
93+
94+
return reports, utils.SumErrors(errs)
95+
}
96+
97+
func collectAndRecordGatherer(
98+
ctx context.Context,
99+
gatherer gatherers.Interface,
100+
rec recorder.Interface,
101+
configurator configobserver.Configurator,
102+
) (reports []GathererFunctionReport, totalNumberOfRecords int, allErrors []error) {
81103
resultsChan, err := startGatheringConcurrently(ctx, gatherer, configurator.Config().Gather)
82104
if err != nil {
83-
return nil, err
105+
allErrors = append(allErrors, err)
106+
return reports, totalNumberOfRecords, allErrors
84107
}
85108

86-
gathererName := gatherer.GetName()
109+
for result := range resultsChan {
110+
report, errs := recordGatheringFunctionResult(rec, &result, gatherer.GetName())
111+
allErrors = append(allErrors, errs...)
112+
reports = append(reports, report)
113+
totalNumberOfRecords += report.RecordsCount
114+
}
115+
116+
return reports, totalNumberOfRecords, allErrors
117+
}
87118

88-
var errs []error
89-
var functionReports []GathererFunctionReport
119+
func recordGatheringFunctionResult(
120+
rec recorder.Interface, result *GatheringFunctionResult, gathererName string,
121+
) (GathererFunctionReport, []error) {
122+
var allErrors []error
123+
var recordWarnings []error
124+
var recordErrs []error
90125

91-
for result := range resultsChan {
92-
if result.Panic != nil {
93-
klog.Error(fmt.Errorf(
94-
"gatherer %v's function %v panicked with error: %v",
95-
gathererName, result.FunctionName, result.Panic,
96-
))
97-
result.Errs = append(result.Errs, fmt.Errorf("%v", result.Panic))
98-
}
126+
if result.Panic != nil {
127+
recordErrs = append(recordErrs, fmt.Errorf("panic: %v", result.Panic))
128+
klog.Error(fmt.Errorf(
129+
`gatherer "%v" function "%v" panicked with the error: %v`,
130+
gathererName, result.FunctionName, result.Panic,
131+
))
132+
allErrors = append(allErrors, fmt.Errorf(`function "%v" panicked`, result.FunctionName))
133+
}
99134

100-
for _, err := range result.Errs {
101-
errs = append(errs, fmt.Errorf(
102-
"gatherer %v's function %v failed with error: %v",
135+
for _, err := range result.Errs {
136+
if w, isWarning := err.(*types.Warning); isWarning {
137+
recordWarnings = append(recordWarnings, w)
138+
klog.Warningf(
139+
`gatherer "%v" function "%v" produced the warning: %v`, gathererName, result.FunctionName, w,
140+
)
141+
} else {
142+
recordErrs = append(recordErrs, err)
143+
klog.Errorf(
144+
`gatherer "%v" function "%v" failed with the error: %v`,
103145
gathererName, result.FunctionName, err,
104-
))
146+
)
147+
allErrors = append(allErrors, fmt.Errorf(`function "%v" failed with an error`, result.FunctionName))
105148
}
106-
recordedRecs := 0
107-
for _, r := range result.Records {
108-
if err := rec.Record(r); err != nil {
109-
result.Errs = append(result.Errs, fmt.Errorf(
110-
"unable to record gatherer %v function %v' result %v because of error: %v",
111-
gathererName, result.FunctionName, r.Name, err,
112-
))
113-
continue
149+
}
150+
151+
recordedRecs := 0
152+
for _, r := range result.Records {
153+
wasRecorded := true
154+
if errs := rec.Record(r); len(errs) > 0 {
155+
for _, err := range errs {
156+
if w, isWarning := err.(*types.Warning); isWarning {
157+
recordWarnings = append(recordWarnings, w)
158+
klog.Warningf(
159+
`issue recording gatherer "%v" function "%v" result "%v" because of the warning: %v`,
160+
gathererName, result.FunctionName, r.GetFilename(), w,
161+
)
162+
} else {
163+
recordErrs = append(recordErrs, err)
164+
klog.Errorf(
165+
`error recording gatherer "%v" function "%v" result "%v" because of the error: %v`,
166+
gathererName, result.FunctionName, r.GetFilename(), err,
167+
)
168+
allErrors = append(allErrors, fmt.Errorf(
169+
`unable to record function "%v" record "%v"`, result.FunctionName, r.GetFilename(),
170+
))
171+
wasRecorded = false
172+
}
114173
}
174+
}
175+
if wasRecorded {
115176
recordedRecs++
116177
}
117-
118-
klog.Infof(
119-
"Gather %v's function %v took %v to process %v records",
120-
gathererName, result.FunctionName, result.TimeElapsed, len(result.Records),
121-
)
122-
123-
functionReports = append(functionReports, GathererFunctionReport{
124-
FuncName: fmt.Sprintf("%v/%v", gathererName, result.FunctionName),
125-
Duration: result.TimeElapsed.Milliseconds(),
126-
RecordsCount: recordedRecs,
127-
Errors: utils.ErrorsToStrings(result.Errs),
128-
Panic: result.Panic,
129-
})
130178
}
131-
return functionReports, utils.SumErrors(errs)
179+
180+
klog.Infof(
181+
`gatherer "%v" function "%v" took %v to process %v records`,
182+
gathererName, result.FunctionName, result.TimeElapsed, len(result.Records),
183+
)
184+
185+
return GathererFunctionReport{
186+
FuncName: fmt.Sprintf("%v/%v", gathererName, result.FunctionName),
187+
Duration: result.TimeElapsed.Milliseconds(),
188+
RecordsCount: recordedRecs,
189+
Errors: utils.ErrorsToStrings(recordErrs),
190+
Warnings: utils.ErrorsToStrings(recordWarnings),
191+
Panic: result.Panic,
192+
}, allErrors
132193
}
133194

134195
// RecordArchiveMetadata records info about archive and gatherers' reports
@@ -149,8 +210,8 @@ func RecordArchiveMetadata(
149210
IsGlobalObfuscationEnabled: anonymizer != nil,
150211
}},
151212
}
152-
if err := rec.Record(archiveMetadata); err != nil {
153-
return fmt.Errorf("unable to record archive metadata because of error: %v", err)
213+
if errs := rec.Record(archiveMetadata); len(errs) > 0 {
214+
return fmt.Errorf("unable to record archive metadata because of the errors: %v", errs)
154215
}
155216

156217
return nil

0 commit comments

Comments
 (0)