Skip to content

Commit 363c3b1

Browse files
authored
update & refactor "checkSummaryAndSend" function in the insightsuploader (#919)
1 parent 2bb7e46 commit 363c3b1

File tree

2 files changed

+58
-70
lines changed

2 files changed

+58
-70
lines changed

pkg/controller/operator.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ func (s *Operator) Run(ctx context.Context, controller *controllercmd.Controller
256256

257257
// start uploading status, so that we
258258
// know any previous last reported time
259-
go uploader.Run(ctx)
259+
go uploader.Run(ctx, initialDelay)
260260

261261
reportGatherer := insightsreport.New(insightsClient, configAggregator, uploader, operatorClient.OperatorV1().InsightsOperators())
262262
statusReporter.AddSources(reportGatherer)

pkg/insights/insightsuploader/insightsuploader.go

+57-69
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type Controller struct {
4040
apiConfigurator configobserver.InsightsDataGatherObserver
4141
reporter StatusReporter
4242
archiveUploaded chan struct{}
43-
initialDelay time.Duration
43+
uploadDelay time.Duration
4444
backoff wait.Backoff
4545
}
4646

@@ -59,7 +59,7 @@ func New(summarizer Summarizer,
5959
client: client,
6060
reporter: statusReporter,
6161
archiveUploaded: make(chan struct{}),
62-
initialDelay: initialDelay,
62+
uploadDelay: initialDelay,
6363
}
6464
ctrl.backoff = wait.Backoff{
6565
Duration: ctrl.configurator.Config().DataReporting.Interval / 4, // 30 min as first wait by default
@@ -69,83 +69,70 @@ func New(summarizer Summarizer,
6969
return ctrl
7070
}
7171

72-
func (c *Controller) Run(ctx context.Context) {
72+
func (c *Controller) Run(ctx context.Context, initialDelay time.Duration) {
7373
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
7474

7575
if c.client == nil {
7676
klog.Infof("No reporting possible without a configured client")
7777
return
7878
}
7979

80-
// the controller periodically uploads results to the remote insights endpoint
8180
cfg := c.configurator.Config()
82-
8381
interval := cfg.DataReporting.Interval
84-
lastReported := c.reporter.LastReportedTime()
85-
if !lastReported.IsZero() {
86-
next := lastReported.Add(interval)
87-
if now := time.Now(); next.After(now) {
88-
c.initialDelay = wait.Jitter(next.Sub(now), 1.2)
89-
}
90-
}
91-
klog.Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.initialDelay.Truncate(time.Second))
82+
// set the initial upload delay as initial delay + 2 minutes
83+
ud := 90 * time.Second
84+
c.uploadDelay = time.Duration(initialDelay.Nanoseconds() + ud.Nanoseconds())
85+
86+
klog.Infof("Reporting status periodically to %s every %s, starting in %s", cfg.DataReporting.UploadEndpoint, interval, c.uploadDelay.Truncate(time.Second))
9287
go wait.Until(func() { c.periodicTrigger(ctx.Done()) }, 5*time.Second, ctx.Done())
9388
}
9489

9590
func (c *Controller) periodicTrigger(stopCh <-chan struct{}) {
96-
klog.Infof("Checking archives to upload periodically every %s", c.initialDelay)
97-
lastReported := c.reporter.LastReportedTime()
9891
cfg := c.configurator.Config()
9992
interval := cfg.DataReporting.Interval
100-
endpoint := cfg.DataReporting.UploadEndpoint
10193
var disabledInAPI bool
10294
if c.apiConfigurator != nil {
10395
disabledInAPI = c.apiConfigurator.GatherDisabled()
10496
}
10597
reportingEnabled := cfg.DataReporting.Enabled && !disabledInAPI
106-
10798
configCh, cancelFn := c.configurator.ConfigChanged()
10899
defer cancelFn()
109100

110-
if c.initialDelay <= 0 {
111-
c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled)
112-
return
113-
}
114-
ticker := time.NewTicker(c.initialDelay)
101+
ticker := time.NewTicker(c.uploadDelay)
115102
for {
116103
select {
117104
case <-stopCh:
118105
ticker.Stop()
119106
case <-ticker.C:
120-
c.checkSummaryAndSend(interval, lastReported, endpoint, reportingEnabled)
121-
ticker.Reset(c.initialDelay)
107+
c.checkSummaryAndSend(reportingEnabled)
108+
ticker.Reset(c.uploadDelay)
122109
return
123110
case <-configCh:
124111
newCfg := c.configurator.Config()
125-
endpoint = newCfg.DataReporting.UploadEndpoint
126112
reportingEnabled = newCfg.DataReporting.Enabled
127113
var disabledInAPI bool
128114
if c.apiConfigurator != nil {
129115
disabledInAPI = c.apiConfigurator.GatherDisabled()
130116
}
131117
if !reportingEnabled || disabledInAPI {
132118
klog.Infof("Reporting was disabled")
133-
c.initialDelay = newCfg.DataReporting.Interval
134-
return
135119
}
136120
newInterval := newCfg.DataReporting.Interval
137-
if newInterval == interval {
138-
continue
121+
if newInterval != interval {
122+
c.uploadDelay = wait.Jitter(interval/8, 0.1)
123+
ticker.Reset(c.uploadDelay)
124+
return
139125
}
140-
interval = newInterval
141-
// there's no return in this case so set the initial delay again
142-
c.initialDelay = wait.Jitter(interval/8, 0.1)
143-
ticker.Reset(c.initialDelay)
144126
}
145127
}
146128
}
147129

148-
func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported time.Time, endpoint string, reportingEnabled bool) {
130+
func (c *Controller) checkSummaryAndSend(reportingEnabled bool) {
131+
lastReported := c.reporter.LastReportedTime()
132+
endpoint := c.configurator.Config().DataReporting.UploadEndpoint
133+
interval := c.configurator.Config().DataReporting.Interval
134+
c.uploadDelay = wait.Jitter(interval/8, 0.1)
135+
149136
// attempt to get a summary to send to the server
150137
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
151138
defer cancel()
@@ -159,51 +146,52 @@ func (c *Controller) checkSummaryAndSend(interval time.Duration, lastReported ti
159146
klog.Infof("Nothing to report since %s", lastReported.Format(time.RFC3339))
160147
return
161148
}
162-
defer source.Contents.Close()
163-
if reportingEnabled && len(endpoint) > 0 {
164-
// send the results
165-
start := time.Now()
166-
id := start.Format(time.RFC3339)
167-
klog.Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339))
168-
source.ID = id
169-
source.Type = "application/vnd.redhat.openshift.periodic"
170-
if err := c.client.Send(ctx, endpoint, *source); err != nil {
171-
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
172-
if errors.Is(err, insightsclient.ErrWaitingForVersion) {
173-
c.initialDelay = wait.Jitter(time.Second*15, 1)
174-
return
175-
}
176-
if authorizer.IsAuthorizationError(err) {
177-
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
178-
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
179-
c.initialDelay = wait.Jitter(interval/2, 2)
180149

181-
return
182-
}
150+
klog.Infof("Checking archives to upload periodically every %s", c.uploadDelay)
151+
defer source.Contents.Close()
183152

184-
c.initialDelay = wait.Jitter(interval/8, 1.2)
185-
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
186-
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
187-
return
188-
}
189-
klog.Infof("Uploaded report successfully in %s", time.Since(start))
190-
select {
191-
case c.archiveUploaded <- struct{}{}:
192-
default:
193-
}
194-
lastReported = start.UTC()
195-
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
196-
} else {
153+
if !reportingEnabled || len(endpoint) == 0 {
197154
klog.Info("Display report that would be sent")
198155
// display what would have been sent (to ensure we always exercise source processing)
199156
if err := reportToLogs(source.Contents); err != nil {
200157
klog.Errorf("Unable to log upload: %v", err)
201158
}
202-
// we didn't actually report logs, so don't advance the report date
159+
return
203160
}
204161

162+
// send the results
163+
start := time.Now()
164+
id := start.Format(time.RFC3339)
165+
klog.Infof("Uploading latest report since %s", lastReported.Format(time.RFC3339))
166+
source.ID = id
167+
source.Type = "application/vnd.redhat.openshift.periodic"
168+
if err := c.client.Send(ctx, endpoint, *source); err != nil {
169+
klog.Infof("Unable to upload report after %s: %v", time.Since(start).Truncate(time.Second/100), err)
170+
if errors.Is(err, insightsclient.ErrWaitingForVersion) {
171+
c.uploadDelay = wait.Jitter(time.Second*15, 1)
172+
return
173+
}
174+
if authorizer.IsAuthorizationError(err) {
175+
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
176+
Reason: "NotAuthorized", Message: fmt.Sprintf("Reporting was not allowed: %v", err)})
177+
c.uploadDelay = wait.Jitter(interval/2, 2)
178+
179+
return
180+
}
181+
182+
c.uploadDelay = wait.Jitter(interval/8, 1.2)
183+
c.StatusController.UpdateStatus(controllerstatus.Summary{Operation: controllerstatus.Uploading,
184+
Reason: "UploadFailed", Message: fmt.Sprintf("Unable to report: %v", err)})
185+
return
186+
}
187+
klog.Infof("Uploaded report successfully in %s", time.Since(start))
188+
select {
189+
case c.archiveUploaded <- struct{}{}:
190+
default:
191+
}
192+
lastReported = start.UTC()
193+
c.StatusController.UpdateStatus(controllerstatus.Summary{Healthy: true})
205194
c.reporter.SetLastReportedTime(lastReported)
206-
c.initialDelay = wait.Jitter(interval/8, 0.1)
207195
}
208196

209197
// ArchiveUploaded returns a channel that indicates when an archive is uploaded

0 commit comments

Comments
 (0)