Skip to content

Commit ddcae44

Browse files
committed
feat(NODE-6329): client bulk write happy path
1 parent 643a875 commit ddcae44

29 files changed

+2762
-53
lines changed

src/cmap/command_monitoring_events.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
LEGACY_HELLO_COMMAND_CAMEL_CASE
88
} from '../constants';
99
import { calculateDurationInMs, deepCopy } from '../utils';
10-
import { OpMsgRequest, type OpQueryRequest, type WriteProtocolMessageType } from './commands';
10+
import {
11+
DocumentSequence,
12+
OpMsgRequest,
13+
type OpQueryRequest,
14+
type WriteProtocolMessageType
15+
} from './commands';
1116
import type { Connection } from './connection';
1217

1318
/**
@@ -249,7 +254,16 @@ const OP_QUERY_KEYS = [
249254
/** Extract the actual command from the query, possibly up-converting if it's a legacy format */
250255
function extractCommand(command: WriteProtocolMessageType): Document {
251256
if (command instanceof OpMsgRequest) {
252-
return deepCopy(command.command);
257+
const cmd = deepCopy(command.command);
258+
// For OP_MSG with payload type 1 we need to pull the documents
259+
// array out of the document sequence for monitoring.
260+
if (cmd.ops instanceof DocumentSequence) {
261+
cmd.ops = cmd.ops.documents;
262+
}
263+
if (cmd.nsInfo instanceof DocumentSequence) {
264+
cmd.nsInfo = cmd.nsInfo.documents;
265+
}
266+
return cmd;
253267
}
254268

255269
if (command.query?.$query) {

src/cmap/commands.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -544,10 +544,10 @@ export class OpMsgRequest {
544544
for (const [key, value] of Object.entries(document)) {
545545
if (value instanceof DocumentSequence) {
546546
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length);
547+
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548548
buffer[0] = 1;
549-
// Third part is the field name at offset 5.
550-
encodeUTF8Into(buffer, key, 5);
549+
// Third part is the field name at offset 5 with trailing null byte.
550+
encodeUTF8Into(buffer, `${key}\0`, 5);
551551
chunks.push(buffer);
552552
// Fourth part are the documents' bytes.
553553
let docsLength = 0;
@@ -557,7 +557,7 @@ export class OpMsgRequest {
557557
chunks.push(docBson);
558558
}
559559
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(key.length + docsLength, 1);
560+
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
561561
// Why are we removing the field from the command? This is because it needs to be
562562
// removed in the OP_MSG request first section, and DocumentSequence is not a
563563
// BSON type and is specific to the MongoDB wire protocol so there's nothing

src/cmap/wire_protocol/responses.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,3 +329,29 @@ export class ExplainedCursorResponse extends CursorResponse {
329329
return this.toObject(options);
330330
}
331331
}
332+
333+
/**
334+
* Client bulk writes have some extra metadata at the top level that needs to be
335+
* included in the result returned to the user.
336+
*/
337+
export class ClientBulkWriteCursorResponse extends CursorResponse {
338+
get insertedCount() {
339+
return this.get('nInserted', BSONType.int, true);
340+
}
341+
342+
get upsertedCount() {
343+
return this.get('nUpserted', BSONType.int, true);
344+
}
345+
346+
get matchedCount() {
347+
return this.get('nMatched', BSONType.int, true);
348+
}
349+
350+
get modifiedCount() {
351+
return this.get('nModified', BSONType.int, true);
352+
}
353+
354+
get deletedCount() {
355+
return this.get('nDeleted', BSONType.int, true);
356+
}
357+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import type { Document } from '../bson';
2+
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
3+
import type { MongoClient } from '../mongo_client';
4+
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
5+
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
6+
import { executeOperation } from '../operations/execute_operation';
7+
import type { ClientSession } from '../sessions';
8+
import { mergeOptions, MongoDBNamespace } from '../utils';
9+
import {
10+
AbstractCursor,
11+
type AbstractCursorOptions,
12+
type InitialCursorResponse
13+
} from './abstract_cursor';
14+
15+
/** @public */
16+
export interface ClientBulkWriteCursorOptions
17+
extends AbstractCursorOptions,
18+
ClientBulkWriteOptions {}
19+
20+
/**
21+
* This is the cursor that handles client bulk write operations. Note this is never
22+
* exposed directly to the user and is always immediately exhausted.
23+
* @internal
24+
*/
25+
export class ClientBulkWriteCursor extends AbstractCursor {
26+
public readonly command: Document;
27+
/** @internal */
28+
private cursorResponse?: ClientBulkWriteCursorResponse;
29+
/** @internal */
30+
private clientBulkWriteOptions: ClientBulkWriteOptions;
31+
32+
/** @internal */
33+
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
34+
super(client, new MongoDBNamespace('admin', '$cmd'), options);
35+
36+
this.command = command;
37+
this.clientBulkWriteOptions = options;
38+
}
39+
40+
/**
41+
* We need a way to get the top level cursor response fields for
42+
* generating the bulk write result, so we expose this here.
43+
*/
44+
get response(): ClientBulkWriteCursorResponse {
45+
if (this.cursorResponse) return this.cursorResponse;
46+
throw new Error('no cursor response');
47+
}
48+
49+
clone(): ClientBulkWriteCursor {
50+
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
51+
delete clonedOptions.session;
52+
return new ClientBulkWriteCursor(this.client, this.command, {
53+
...clonedOptions
54+
});
55+
}
56+
57+
/** @internal */
58+
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
59+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
60+
...this.clientBulkWriteOptions,
61+
...this.cursorOptions,
62+
session
63+
});
64+
65+
const response = await executeOperation(this.client, clientBulkWriteOperation);
66+
this.cursorResponse = response;
67+
68+
return { server: clientBulkWriteOperation.server, session, response };
69+
}
70+
}

src/index.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,21 @@ export type {
473473
AggregateOptions,
474474
DB_AGGREGATE_COLLECTION
475475
} from './operations/aggregate';
476+
export type {
477+
AnyClientBulkWriteModel,
478+
ClientBulkWriteOptions,
479+
ClientBulkWriteResult,
480+
ClientDeleteManyModel,
481+
ClientDeleteOneModel,
482+
ClientDeleteResult,
483+
ClientInsertOneModel,
484+
ClientInsertOneResult,
485+
ClientReplaceOneModel,
486+
ClientUpdateManyModel,
487+
ClientUpdateOneModel,
488+
ClientUpdateResult,
489+
ClientWriteModel
490+
} from './operations/client_bulk_write/common';
476491
export type {
477492
CollationOptions,
478493
CommandOperation,

src/mongo_client.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,12 @@ import {
3030
SeverityLevel
3131
} from './mongo_logger';
3232
import { TypedEventEmitter } from './mongo_types';
33+
import {
34+
type AnyClientBulkWriteModel,
35+
type ClientBulkWriteOptions,
36+
type ClientBulkWriteResult
37+
} from './operations/client_bulk_write/common';
38+
import { ClientBulkWriteExecutor } from './operations/client_bulk_write/executor';
3339
import { executeOperation } from './operations/execute_operation';
3440
import { RunAdminCommandOperation } from './operations/run_command';
3541
import type { ReadConcern, ReadConcernLevel, ReadConcernLike } from './read_concern';
@@ -477,6 +483,19 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> implements
477483
return this.s.bsonOptions;
478484
}
479485

486+
/**
487+
* Executes a client bulk write operation, available on server 8.0+.
488+
* @param models - The client bulk write models.
489+
* @param options - The client bulk write options.
490+
* @returns A ClientBulkWriteResult for acknowledged writes and ok: 1 for unacknowledged writes.
491+
*/
492+
async bulkWrite(
493+
models: AnyClientBulkWriteModel[],
494+
options?: ClientBulkWriteOptions
495+
): Promise<ClientBulkWriteResult | { ok: 1 }> {
496+
return await new ClientBulkWriteExecutor(this, models, options).execute();
497+
}
498+
480499
/**
481500
* Connect to MongoDB using a url
482501
*
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { type Document } from 'bson';
2+
3+
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
4+
import type { Server } from '../../sdam/server';
5+
import type { ClientSession } from '../../sessions';
6+
import { MongoDBNamespace } from '../../utils';
7+
import { CommandOperation } from '../command';
8+
import { Aspect, defineAspects } from '../operation';
9+
import { type ClientBulkWriteOptions } from './common';
10+
11+
/**
12+
* Executes a single vlient bulk write operation within a potential batch.
13+
* @internal
14+
*/
15+
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
16+
command: Document;
17+
override options: ClientBulkWriteOptions;
18+
19+
override get commandName() {
20+
return 'bulkWrite' as const;
21+
}
22+
23+
constructor(command: Document, options: ClientBulkWriteOptions) {
24+
super(undefined, options);
25+
this.command = command;
26+
this.options = options;
27+
this.ns = new MongoDBNamespace('admin', '$cmd');
28+
}
29+
30+
/**
31+
* Execute the command. Superclass will handle write concern, etc.
32+
* @param server - The server.
33+
* @param session - The session.
34+
* @returns The response.
35+
*/
36+
override async execute(
37+
server: Server,
38+
session: ClientSession | undefined
39+
): Promise<ClientBulkWriteCursorResponse> {
40+
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
41+
}
42+
}
43+
44+
// Skipping the collation as it goes on the individual ops.
45+
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);

src/operations/client_bulk_write/command_builder.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type Document } from '../../bson';
1+
import { type Document, ObjectId } from '../../bson';
22
import { DocumentSequence } from '../../cmap/commands';
33
import type { Filter, OptionalId, UpdateFilter, WithoutId } from '../../mongo_types';
44
import { type CollationOptions } from '../command';
@@ -23,6 +23,7 @@ export interface ClientBulkWriteCommand {
2323
nsInfo: DocumentSequence;
2424
bypassDocumentValidation?: boolean;
2525
let?: Document;
26+
comment?: any;
2627
}
2728

2829
/** @internal */
@@ -88,6 +89,10 @@ export class ClientBulkWriteCommandBuilder {
8889
if (this.options.let) {
8990
command.let = this.options.let;
9091
}
92+
93+
if (this.options.comment != null) {
94+
command.comment = this.options.comment;
95+
}
9196
return [command];
9297
}
9398
}
@@ -112,6 +117,7 @@ export const buildInsertOneOperation = (
112117
insert: index,
113118
document: model.document
114119
};
120+
document.document._id = model.document._id ?? new ObjectId();
115121
return document;
116122
};
117123

@@ -175,6 +181,7 @@ export interface ClientUpdateOperation {
175181
hint?: Hint;
176182
upsert?: boolean;
177183
arrayFilters?: Document[];
184+
collation?: CollationOptions;
178185
}
179186

180187
/**
@@ -226,6 +233,9 @@ function createUpdateOperation(
226233
if (model.arrayFilters) {
227234
document.arrayFilters = model.arrayFilters;
228235
}
236+
if (model.collation) {
237+
document.collation = model.collation;
238+
}
229239
return document;
230240
}
231241

@@ -237,6 +247,7 @@ export interface ClientReplaceOneOperation {
237247
updateMods: WithoutId<Document>;
238248
hint?: Hint;
239249
upsert?: boolean;
250+
collation?: CollationOptions;
240251
}
241252

242253
/**
@@ -261,6 +272,9 @@ export const buildReplaceOneOperation = (
261272
if (model.upsert) {
262273
document.upsert = model.upsert;
263274
}
275+
if (model.collation) {
276+
document.collation = model.collation;
277+
}
264278
return document;
265279
};
266280

0 commit comments

Comments
 (0)