@@ -131,8 +131,9 @@ func (m *Monitor) run() {
131
131
ticker := time .NewTicker (tickInterval )
132
132
defer ticker .Stop ()
133
133
134
- log .Infof ("Starting push channel monitor with %d checks per %s interval (check interval %s)" ,
135
- m .cfg .ChecksPerInterval , m .cfg .Interval , tickInterval )
134
+ log .Infof ("Starting push channel monitor with " +
135
+ "%d checks per %s interval (check interval %s); min bytes per interval: %d, restart backoff: %s; max consecutive restarts: %d" ,
136
+ m .cfg .ChecksPerInterval , m .cfg .Interval , tickInterval , m .cfg .MinBytesSent , m .cfg .RestartBackoff , m .cfg .MaxConsecutiveRestarts )
136
137
137
138
for {
138
139
select {
@@ -171,8 +172,8 @@ type monitoredChannel struct {
171
172
dataRatePoints chan * dataRatePoint
172
173
consecutiveRestarts int
173
174
174
- restartLk sync.RWMutex
175
- restarting bool
175
+ restartLk sync.RWMutex
176
+ restartedAt time. Time
176
177
}
177
178
178
179
func newMonitoredChannel (
@@ -248,8 +249,6 @@ func (mc *monitoredChannel) checkDataRate() {
248
249
mc .statsLk .Lock ()
249
250
defer mc .statsLk .Unlock ()
250
251
251
- log .Debugf ("%s: check data rate" , mc .chid )
252
-
253
252
// Before returning, add the current data rate stats to the queue
254
253
defer func () {
255
254
var pending uint64
@@ -264,6 +263,9 @@ func (mc *monitoredChannel) checkDataRate() {
264
263
265
264
// Check that there are enough data points that an interval has elapsed
266
265
if len (mc .dataRatePoints ) < int (mc .cfg .ChecksPerInterval ) {
266
+ log .Debugf ("%s: not enough data points to check data rate yet (%d / %d)" ,
267
+ mc .chid , len (mc .dataRatePoints ), mc .cfg .ChecksPerInterval )
268
+
267
269
return
268
270
}
269
271
@@ -274,25 +276,25 @@ func (mc *monitoredChannel) checkDataRate() {
274
276
// and the amount sent was lower than the minimum required, restart the
275
277
// channel
276
278
sentInInterval := mc .sent - atIntervalStart .sent
277
- log .Debugf ("%s: %d bytes sent since last check" , mc .chid , sentInInterval )
279
+ log .Debugf ("%s: since last check: sent: %d - %d = %d, pending: %d, required %d" ,
280
+ mc .chid , mc .sent , atIntervalStart .sent , sentInInterval , atIntervalStart .pending , mc .cfg .MinBytesSent )
278
281
if atIntervalStart .pending > sentInInterval && sentInInterval < mc .cfg .MinBytesSent {
279
282
go mc .restartChannel ()
280
283
}
281
284
}
282
285
283
286
func (mc * monitoredChannel ) restartChannel () {
284
- log .Debugf ("%s: restart channel" , mc .chid )
285
-
286
287
// Check if the channel is already being restarted
287
288
mc .restartLk .Lock ()
288
- alreadyRestarting := mc .restarting
289
- if ! alreadyRestarting {
290
- mc .restarting = true
289
+ restartedAt := mc .restartedAt
290
+ if restartedAt . IsZero () {
291
+ mc .restartedAt = time . Now ()
291
292
}
292
293
mc .restartLk .Unlock ()
293
294
294
- if alreadyRestarting {
295
- log .Debugf ("%s: already restarting, bailing out" , mc .chid )
295
+ if ! restartedAt .IsZero () {
296
+ log .Debugf ("%s: restart called but already restarting channel (for %s so far; restart backoff is %s)" ,
297
+ mc .chid , time .Since (mc .restartedAt ), mc .cfg .RestartBackoff )
296
298
return
297
299
}
298
300
@@ -310,20 +312,6 @@ func (mc *monitoredChannel) restartChannel() {
310
312
return
311
313
}
312
314
313
- defer func () {
314
- if mc .cfg .RestartBackoff > 0 {
315
- // Backoff a little time after a restart before attempting another
316
- select {
317
- case <- time .After (mc .cfg .RestartBackoff ):
318
- case <- mc .ctx .Done ():
319
- }
320
- }
321
-
322
- mc .restartLk .Lock ()
323
- mc .restarting = false
324
- mc .restartLk .Unlock ()
325
- }()
326
-
327
315
// Send a restart message for the channel.
328
316
// Note that at the networking layer there is logic to retry if a network
329
317
// connection cannot be established, so this may take some time.
@@ -334,7 +322,22 @@ func (mc *monitoredChannel) restartChannel() {
334
322
// and shut down the monitor
335
323
log .Errorf ("%s: closing push data transfer channel after failing to send restart message: %s" , mc .chid , err )
336
324
mc .closeChannelAndShutdown ()
325
+ } else if mc .cfg .RestartBackoff > 0 {
326
+ log .Infof ("%s: restart message sent successfully, backing off %s before allowing any other restarts" ,
327
+ mc .chid , mc .cfg .RestartBackoff )
328
+ // Backoff a little time after a restart before attempting another
329
+ select {
330
+ case <- time .After (mc .cfg .RestartBackoff ):
331
+ case <- mc .ctx .Done ():
332
+ }
333
+
334
+ log .Debugf ("%s: restart back-off %s complete" ,
335
+ mc .chid , mc .cfg .RestartBackoff )
337
336
}
337
+
338
+ mc .restartLk .Lock ()
339
+ mc .restartedAt = time.Time {}
340
+ mc .restartLk .Unlock ()
338
341
}
339
342
340
343
func (mc * monitoredChannel ) closeChannelAndShutdown () {
0 commit comments