Skip to content

feat(NODE-6258): add signal support to find and aggregate #4364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
ab02d53
feat(NODE-6258): add signal support to cursor APIs
nbbeeken Jan 15, 2025
0d1c165
chore: readmany options
nbbeeken Jan 17, 2025
3de15cf
chore: drain options
nbbeeken Jan 17, 2025
96c3612
chore: explicit signal
nbbeeken Jan 17, 2025
ee00a38
docs: fix up api docs
nbbeeken Jan 17, 2025
8955d8e
test: better helper name
nbbeeken Jan 17, 2025
8e6ec04
test: update name
nbbeeken Jan 17, 2025
a73940e
test: improve iteration test organization
nbbeeken Jan 17, 2025
45a4b65
test: cruft
nbbeeken Jan 17, 2025
e9338cb
feat: make sure connections are closed after abort if aborted during …
nbbeeken Jan 17, 2025
152be95
test: remove redundant fle tests
nbbeeken Jan 17, 2025
5af48af
chore: make findLast simple
nbbeeken Jan 17, 2025
6e8bd46
test: no kill cursors on lb and don't wait on connection close
nbbeeken Jan 17, 2025
263b185
chore: lint
nbbeeken Jan 21, 2025
017c77e
chore: consistent cursor option types
nbbeeken Jan 22, 2025
7683fc9
docs: connection churn
nbbeeken Jan 22, 2025
ef231ee
fix: do not hang when the signal is aborted
nbbeeken Jan 22, 2025
ec2feee
fix: state checks
nbbeeken Jan 23, 2025
93b88fb
docs
nbbeeken Jan 23, 2025
89beb22
test: getaddrinfo error codes aren't on windows
nbbeeken Jan 23, 2025
c2ef0a6
experimental
nbbeeken Jan 23, 2025
f1e4f86
test filter
nbbeeken Jan 23, 2025
aabd070
check time pass
nbbeeken Jan 23, 2025
a46a919
where
nbbeeken Jan 24, 2025
b69286b
test: add where test
nbbeeken Jan 24, 2025
7f14566
test: stream test flaky
nbbeeken Jan 24, 2025
c24cf28
fix: reauth must finish before check in
nbbeeken Jan 24, 2025
dc52b67
test: flake
nbbeeken Jan 24, 2025
76fee52
test: lb
nbbeeken Jan 24, 2025
f05e8bc
test: flake
nbbeeken Jan 24, 2025
f30dc05
fix options name, docs for abortable, docs for signal
nbbeeken Jan 24, 2025
907a1a8
Merge branch 'main' into NODE-6258-abortsignal
W-A-James Jan 24, 2025
0e6f566
docs
nbbeeken Jan 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import {
import type { Topology } from '../sdam/topology';
import type { ClientSession } from '../sessions';
import { TimeoutContext } from '../timeout';
import { supportsRetryableWrites } from '../utils';
import { abortable, supportsRetryableWrites } from '../utils';
import { AbstractOperation, Aspect } from './operation';

const MMAPv1_RETRY_WRITES_ERROR_CODE = MONGODB_ERROR_CODES.IllegalOperation;
Expand Down Expand Up @@ -65,9 +65,7 @@ export async function executeOperation<
}

// Like CSOT, an operation signal interruption does not relate to auto-connect
operation.options.signal?.throwIfAborted();
const topology = await autoConnect(client);
operation.options.signal?.throwIfAborted();
const topology = await abortable(autoConnect(client), operation.options);

// The driver sessions spec mandates that we implicitly create sessions for operations
// that are not explicitly provided with a session.
Expand Down
3 changes: 2 additions & 1 deletion src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import type { ClientSession } from '../sessions';
import { type TimeoutContext } from '../timeout';
import { isTransactionCommand } from '../transactions';
import {
abortable,
type EventEmitterWithState,
makeStateMachine,
maxWireVersion,
Expand Down Expand Up @@ -345,7 +346,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
operationError instanceof MongoError &&
operationError.code === MONGODB_ERROR_CODES.Reauthenticate
) {
await this.pool.reauthenticate(conn);
await abortable(this.pool.reauthenticate(conn), options);
try {
const res = await conn.command(ns, cmd, options, responseType);
throwIfWriteConcernError(res);
Expand Down
53 changes: 53 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1475,11 +1475,28 @@ export function decorateDecryptionResult(
}
}

/** @internal */
export const kDispose: unique symbol = (Symbol.dispose as any) ?? Symbol('dispose');

/** @internal */
export interface Disposable {
[kDispose](): void;
}

/**
* A utility that helps with writing listener code idiomatically
*
* @example
* ```js
* using listener = addAbortListener(signal, function () {
* console.log('aborted', this.reason);
* });
* ```
*
* @param signal - if exists adds an abort listener
* @param listener - the listener to be added to signal
* @returns A disposable that will remove the abort listener
*/
export function addAbortListener(
signal: AbortSignal | undefined | null,
listener: (this: AbortSignal, event: Event) => void
Expand All @@ -1488,3 +1505,39 @@ export function addAbortListener(
signal.addEventListener('abort', listener, { once: true });
return { [kDispose]: () => signal.removeEventListener('abort', listener) };
}

/**
* Takes a promise and races it with a promise wrapping the abort event of the optionally provided signal.
* The given promise is _always_ ordered before the signal's abort promise.
* When given an already rejected promise and an already aborted signal, the promise's rejection takes precedence.
*
* This is useful **ONLY** when you do not care about cancelling the operation represented by `promise`.
* Proper cancellation must be implemented by the promise returning API.
*
* @see https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Promise/race
*
* @param promise - A promise to discard if the signal aborts
* @param options - An options object carrying an optional signal
*/
export async function abortable<T>(
promise: Promise<T>,
{ signal }: { signal?: AbortSignal }
): Promise<T> {
if (signal == null) {
return await promise;
}

const { promise: aborted, reject } = promiseWithResolvers<never>();

const abortListener = signal.aborted
? reject(signal.reason)
: addAbortListener(signal, function () {
reject(this.reason);
});

try {
return await Promise.race([promise, aborted]);
} finally {
abortListener?.[kDispose]();
}
}
156 changes: 156 additions & 0 deletions test/integration/node-specific/abort_signal.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import {
type AbstractCursor,
AggregationCursor,
type Collection,
Connection,
type Db,
FindCursor,
ListCollectionsCursor,
type Log,
type MongoClient,
MongoServerError,
promiseWithResolvers,
ReadPreference,
setDifference,
Expand Down Expand Up @@ -545,6 +547,160 @@ describe('AbortSignal support', () => {
});
});

describe('when auto connecting and the signal aborts', () => {
let client: MongoClient;
let db: Db;
let collection: Collection<{ a: number; ssn: string }>;
const logs: Log[] = [];
let connectStarted;
let controller: AbortController;
let signal: AbortSignal;
let cursor: AbstractCursor<{ a: number }>;

describe('when connect succeeds', () => {
beforeEach(async function () {
logs.length = 0;

const promise = promiseWithResolvers<void>();
connectStarted = promise.promise;

client = this.configuration.newClient(
{},
{
__enableMongoLogger: true,
__internalLoggerConfig: { MONGODB_LOG_SERVER_SELECTION: 'debug' },
mongodbLogPath: {
write: log => {
if (log.c === 'serverSelection' && log.operation === 'ping') {
controller.abort();
promise.resolve();
}
logs.push(log);
}
},
serverSelectionTimeoutMS: 1000
}
);
db = client.db('abortSignal');
collection = db.collection('support');

controller = new AbortController();
signal = controller.signal;

cursor = collection.find({}, { signal });
});

afterEach(async function () {
logs.length = 0;
await client?.close();
});

it('escapes auto connect without interrupting it', async () => {
const toArray = cursor.toArray().catch(error => error);
await connectStarted;
expect(await toArray).to.be.instanceOf(DOMException);
await sleep(1100);
expect(client.topology).to.exist;
expect(client.topology.description).to.have.property('type').not.equal('Unknown');
});
});

describe('when connect fails', () => {
beforeEach(async function () {
logs.length = 0;

const promise = promiseWithResolvers<void>();
connectStarted = promise.promise;

client = this.configuration.newClient('mongodb://iLoveJavaScript', {
__enableMongoLogger: true,
__internalLoggerConfig: { MONGODB_LOG_SERVER_SELECTION: 'debug' },
mongodbLogPath: {
write: log => {
if (log.c === 'serverSelection' && log.operation === 'ping') {
controller.abort();
promise.resolve();
}
logs.push(log);
}
},
serverSelectionTimeoutMS: 200,
maxPoolSize: 1
});
db = client.db('abortSignal');
collection = db.collection('support');

controller = new AbortController();
signal = controller.signal;

cursor = collection.find({}, { signal });
});

afterEach(async function () {
logs.length = 0;
await client?.close();
});

it('escapes auto connect without interrupting it', async () => {
const toArray = cursor.toArray().catch(error => error);
await connectStarted;
expect(await toArray).to.be.instanceOf(DOMException);
await sleep(500);
expect(client.topology).to.exist;
expect(client.topology.description).to.have.property('type', 'Unknown');
expect(findLast(logs, l => l.failure.includes('ENOTFOUND'))).to.exist;
});
});
});

describe('when reauthenticating and the signal aborts', () => {
let client: MongoClient;
let collection: Collection;
let cursor;
let controller: AbortController;
let signal: AbortSignal;

class ReAuthenticationError extends MongoServerError {
override code = 391; // reauth code.
}

beforeEach(async function () {
client = this.configuration.newClient();
await client.connect();

const db = client.db('abortSignal');
collection = db.collection('support');

controller = new AbortController();
signal = controller.signal;

cursor = collection.find({}, { signal });

const commandStub = sinon.stub(Connection.prototype, 'command').callsFake(async function (
...args
) {
if (args[1].find != null) {
sinon.restore();
controller.abort();
throw new ReAuthenticationError({});
}
return commandStub.wrappedMethod.apply(this, args);
});
});

afterEach(async function () {
logs.length = 0;
await client?.close();
});

it('escapes reauth without interrupting it', async () => {
const checkIn = events.once(client, 'connectionCheckedIn');
const toArray = cursor.toArray().catch(error => error);
expect(await toArray).to.be.instanceOf(DOMException);
await checkIn; // checks back in despite the abort
});
});

describe('KMS requests', function () {
const stateMachine = new StateMachine({} as any);
const request = {
Expand Down
Loading
Loading