Skip to content

Commit c1cc6e0

Browse files
authored
feat: rework low level message stream retries, add debugging (#1713)
* feat: rework how subscriber streams are retried and generally managed * feat: add debug logging from the low level message stream for reconnects * chore: rename retryBackoff * chore: update retry max backoff * feat: clarify 'debug' channel a bit, add explicit debug message class * chore: push stream retries to full individual retry
1 parent eee377a commit c1cc6e0

13 files changed

+363
-214
lines changed

Diff for: src/debug.ts

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* Represents a debug message the user might want to print out for logging
17+
* while debugging or whatnot. These will always come by way of the 'error'
18+
* channel on streams or other event emitters. It's completely fine to
19+
* ignore them, as some will just be verbose logging info, but they may
20+
* help figure out what's going wrong. Support may also ask you to catch
21+
* these channels, which you can do like so:
22+
*
23+
* ```
24+
* subscription.on('debug', msg => console.log(msg.message));
25+
* ```
26+
*
27+
* These values are _not_ guaranteed to remain stable, even within a major
28+
* version, so don't depend on them for your program logic. Debug outputs
29+
* may be added or removed at any time, without warning.
30+
*/
31+
export class DebugMessage {
32+
constructor(public message: string, public error?: Error) {}
33+
}

Diff for: src/exponential-retry.ts

+11
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ export class ExponentialRetry<T> {
138138
this.scheduleRetry();
139139
}
140140

141+
/**
142+
* Resets an item that was previously retried. This is useful if you have
143+
* persistent items that just need to be retried occasionally.
144+
*
145+
* @private
146+
*/
147+
reset(item: T) {
148+
const retried = item as RetriedItem<T>;
149+
delete retried.retryInfo;
150+
}
151+
141152
// Takes a time delta and adds fuzz.
142153
private randomizeDelta(durationMs: number): number {
143154
// The fuzz distance should never exceed one second, but in the

Diff for: src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ export {
174174
TopicMetadata,
175175
} from './topic';
176176
export {Duration, TotalOfUnit, DurationLike} from './temporal';
177+
export {DebugMessage} from './debug';
177178

178179
if (process.env.DEBUG_GRPC) {
179180
console.info('gRPC logging set to verbose');

Diff for: src/message-queues.ts

+11-8
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
} from './subscriber';
3333
import {Duration} from './temporal';
3434
import {addToBucket} from './util';
35+
import {DebugMessage} from './debug';
3536

3637
/**
3738
* @private
@@ -65,15 +66,16 @@ export interface BatchOptions {
6566
* @param {string} message The error message.
6667
* @param {GoogleError} err The grpc error.
6768
*/
68-
export class BatchError extends Error {
69+
export class BatchError extends DebugMessage {
6970
ackIds: string[];
7071
code: grpc.status;
7172
details: string;
7273
constructor(err: GoogleError, ackIds: string[], rpc: string) {
7374
super(
7475
`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${
7576
process.env.DEBUG_GRPC ? err.stack : err.message
76-
}`
77+
}`,
78+
err
7779
);
7880

7981
this.ackIds = ackIds;
@@ -278,7 +280,9 @@ export abstract class MessageQueue {
278280
// These queues are used for ack and modAck messages, which should
279281
// never surface an error to the user level. However, we'll emit
280282
// them onto this debug channel in case debug info is needed.
281-
this._subscriber.emit('debug', e);
283+
const err = e as Error;
284+
const debugMsg = new DebugMessage(err.message, err);
285+
this._subscriber.emit('debug', debugMsg);
282286
}
283287

284288
this.numInFlightRequests -= batchSize;
@@ -404,10 +408,8 @@ export abstract class MessageQueue {
404408
const others = toError.get(AckResponses.Other);
405409
if (others?.length) {
406410
const otherIds = others.map(e => e.ackId);
407-
this._subscriber.emit(
408-
'debug',
409-
new BatchError(rpcError, otherIds, operation)
410-
);
411+
const debugMsg = new BatchError(rpcError, otherIds, operation);
412+
this._subscriber.emit('debug', debugMsg);
411413
}
412414

413415
// Take care of following up on all the Promises.
@@ -492,7 +494,8 @@ export class AckQueue extends MessageQueue {
492494
return results.toRetry;
493495
} catch (e) {
494496
// This should only ever happen if there's a code failure.
495-
this._subscriber.emit('debug', e);
497+
const err = e as Error;
498+
this._subscriber.emit('debug', new DebugMessage(err.message, err));
496499
const exc = new AckError(AckResponses.Other, 'Code error');
497500
batch.forEach(m => {
498501
m.responsePromise?.reject(exc);

0 commit comments

Comments
 (0)