Skip to content

Commit d17f346

Browse files
Merge branch 'main' into uri-validate-less
2 parents 79f41e4 + 91f3035 commit d17f346

26 files changed

+1248
-435
lines changed

Diff for: .evergreen/config.yml

+3-2
Original file line numberDiff line numberDiff line change
@@ -4708,7 +4708,7 @@ buildvariants:
47084708
display_name: rhel8 Node Latest
47094709
run_on: rhel80-large
47104710
expansions:
4711-
NODE_LTS_VERSION: 20
4711+
NODE_LTS_VERSION: latest
47124712
CLIENT_ENCRYPTION: true
47134713
tasks:
47144714
- test-latest-server
@@ -4749,9 +4749,10 @@ buildvariants:
47494749
- test-latest-load-balanced
47504750
- test-auth-kerberos
47514751
- test-auth-ldap
4752-
- test-socks5
47534752
- test-socks5-csfle
47544753
- test-socks5-tls
4754+
- test-zstd-compression
4755+
- test-snappy-compression
47554756
- test-tls-support-latest
47564757
- test-tls-support-8.0
47574758
- test-tls-support-7.0

Diff for: .evergreen/generate_evergreen_tasks.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ for (const {
395395
name: `${osName}-node-latest`,
396396
display_name: `${osDisplayName} Node Latest`,
397397
run_on,
398-
expansions: { NODE_LTS_VERSION: LATEST_LTS },
398+
expansions: { NODE_LTS_VERSION: 'latest' },
399399
tasks: tasks.map(({ name }) => name)
400400
};
401401
if (clientEncryption) {

Diff for: src/cmap/commands.ts

+53-17
Original file line numberDiff line numberDiff line change
@@ -429,10 +429,60 @@ export interface OpMsgOptions {
429429

430430
/** @internal */
431431
export class DocumentSequence {
432+
field: string;
432433
documents: Document[];
434+
serializedDocumentsLength: number;
435+
private chunks: Uint8Array[];
436+
private header: Buffer;
433437

434-
constructor(documents: Document[]) {
435-
this.documents = documents;
438+
/**
439+
* Create a new document sequence for the provided field.
440+
* @param field - The field it will replace.
441+
*/
442+
constructor(field: string, documents?: Document[]) {
443+
this.field = field;
444+
this.documents = [];
445+
this.chunks = [];
446+
this.serializedDocumentsLength = 0;
447+
// Document sequences starts with type 1 at the first byte.
448+
// Field strings must always be UTF-8.
449+
const buffer = Buffer.allocUnsafe(1 + 4 + this.field.length + 1);
450+
buffer[0] = 1;
451+
// Third part is the field name at offset 5 with trailing null byte.
452+
encodeUTF8Into(buffer, `${this.field}\0`, 5);
453+
this.chunks.push(buffer);
454+
this.header = buffer;
455+
if (documents) {
456+
for (const doc of documents) {
457+
this.push(doc, BSON.serialize(doc));
458+
}
459+
}
460+
}
461+
462+
/**
463+
* Push a document to the document sequence. Will serialize the document
464+
* as well and return the current serialized length of all documents.
465+
* @param document - The document to add.
466+
* @param buffer - The serialized document in raw BSON.
467+
* @returns The new total document sequence length.
468+
*/
469+
push(document: Document, buffer: Uint8Array): number {
470+
this.serializedDocumentsLength += buffer.length;
471+
// Push the document.
472+
this.documents.push(document);
473+
// Push the document raw bson.
474+
this.chunks.push(buffer);
475+
// Write the new length.
476+
this.header?.writeInt32LE(4 + this.field.length + 1 + this.serializedDocumentsLength, 1);
477+
return this.serializedDocumentsLength + this.header.length;
478+
}
479+
480+
/**
481+
* Get the fully serialized bytes for the document sequence section.
482+
* @returns The section bytes.
483+
*/
484+
toBin(): Uint8Array {
485+
return Buffer.concat(this.chunks);
436486
}
437487
}
438488

@@ -543,21 +593,7 @@ export class OpMsgRequest {
543593
const chunks = [];
544594
for (const [key, value] of Object.entries(document)) {
545595
if (value instanceof DocumentSequence) {
546-
// Document sequences starts with type 1 at the first byte.
547-
const buffer = Buffer.allocUnsafe(1 + 4 + key.length + 1);
548-
buffer[0] = 1;
549-
// Third part is the field name at offset 5 with trailing null byte.
550-
encodeUTF8Into(buffer, `${key}\0`, 5);
551-
chunks.push(buffer);
552-
// Fourth part are the documents' bytes.
553-
let docsLength = 0;
554-
for (const doc of value.documents) {
555-
const docBson = this.serializeBson(doc);
556-
docsLength += docBson.length;
557-
chunks.push(docBson);
558-
}
559-
// Second part of the sequence is the length at offset 1;
560-
buffer.writeInt32LE(4 + key.length + 1 + docsLength, 1);
596+
chunks.push(value.toBin());
561597
// Why are we removing the field from the command? This is because it needs to be
562598
// removed in the OP_MSG request first section, and DocumentSequence is not a
563599
// BSON type and is specific to the MongoDB wire protocol so there's nothing

Diff for: src/cmap/connection.ts

+4
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
237237
.on('error', this.onError.bind(this));
238238
this.socket.on('close', this.onClose.bind(this));
239239
this.socket.on('timeout', this.onTimeout.bind(this));
240+
241+
this.messageStream.pause();
240242
}
241243

242244
public get hello() {
@@ -651,6 +653,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
651653
private async *readMany(): AsyncGenerator<OpMsgResponse | OpReply> {
652654
try {
653655
this.dataEvents = onData(this.messageStream);
656+
this.messageStream.resume();
654657
for await (const message of this.dataEvents) {
655658
const response = await decompressResponse(message);
656659
yield response;
@@ -661,6 +664,7 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
661664
}
662665
} finally {
663666
this.dataEvents = null;
667+
this.messageStream.pause();
664668
this.throwIfAborted();
665669
}
666670
}

Diff for: src/cursor/client_bulk_write_cursor.ts

+21-8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import type { Document } from '../bson';
1+
import { type Document } from 'bson';
2+
23
import { type ClientBulkWriteCursorResponse } from '../cmap/wire_protocol/responses';
3-
import { MongoBulkWriteCursorError } from '../error';
4+
import { MongoClientBulkWriteCursorError } from '../error';
45
import type { MongoClient } from '../mongo_client';
56
import { ClientBulkWriteOperation } from '../operations/client_bulk_write/client_bulk_write';
7+
import { type ClientBulkWriteCommandBuilder } from '../operations/client_bulk_write/command_builder';
68
import { type ClientBulkWriteOptions } from '../operations/client_bulk_write/common';
79
import { executeOperation } from '../operations/execute_operation';
810
import type { ClientSession } from '../sessions';
@@ -24,17 +26,21 @@ export interface ClientBulkWriteCursorOptions
2426
* @internal
2527
*/
2628
export class ClientBulkWriteCursor extends AbstractCursor {
27-
public readonly command: Document;
29+
commandBuilder: ClientBulkWriteCommandBuilder;
2830
/** @internal */
2931
private cursorResponse?: ClientBulkWriteCursorResponse;
3032
/** @internal */
3133
private clientBulkWriteOptions: ClientBulkWriteOptions;
3234

3335
/** @internal */
34-
constructor(client: MongoClient, command: Document, options: ClientBulkWriteOptions = {}) {
36+
constructor(
37+
client: MongoClient,
38+
commandBuilder: ClientBulkWriteCommandBuilder,
39+
options: ClientBulkWriteOptions = {}
40+
) {
3541
super(client, new MongoDBNamespace('admin', '$cmd'), options);
3642

37-
this.command = command;
43+
this.commandBuilder = commandBuilder;
3844
this.clientBulkWriteOptions = options;
3945
}
4046

@@ -44,22 +50,29 @@ export class ClientBulkWriteCursor extends AbstractCursor {
4450
*/
4551
get response(): ClientBulkWriteCursorResponse {
4652
if (this.cursorResponse) return this.cursorResponse;
47-
throw new MongoBulkWriteCursorError(
53+
throw new MongoClientBulkWriteCursorError(
4854
'No client bulk write cursor response returned from the server.'
4955
);
5056
}
5157

58+
/**
59+
* Get the last set of operations the cursor executed.
60+
*/
61+
get operations(): Document[] {
62+
return this.commandBuilder.lastOperations;
63+
}
64+
5265
clone(): ClientBulkWriteCursor {
5366
const clonedOptions = mergeOptions({}, this.clientBulkWriteOptions);
5467
delete clonedOptions.session;
55-
return new ClientBulkWriteCursor(this.client, this.command, {
68+
return new ClientBulkWriteCursor(this.client, this.commandBuilder, {
5669
...clonedOptions
5770
});
5871
}
5972

6073
/** @internal */
6174
async _initialize(session: ClientSession): Promise<InitialCursorResponse> {
62-
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.command, {
75+
const clientBulkWriteOperation = new ClientBulkWriteOperation(this.commandBuilder, {
6376
...this.clientBulkWriteOptions,
6477
...this.cursorOptions,
6578
session

Diff for: src/error.ts

+29-2
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ export class MongoGCPError extends MongoOIDCError {
622622
* @public
623623
* @category Error
624624
*/
625-
export class MongoBulkWriteCursorError extends MongoRuntimeError {
625+
export class MongoClientBulkWriteCursorError extends MongoRuntimeError {
626626
/**
627627
* **Do not use this constructor!**
628628
*
@@ -639,7 +639,34 @@ export class MongoBulkWriteCursorError extends MongoRuntimeError {
639639
}
640640

641641
override get name(): string {
642-
return 'MongoBulkWriteCursorError';
642+
return 'MongoClientBulkWriteCursorError';
643+
}
644+
}
645+
646+
/**
647+
* An error indicating that an error occurred on the client when executing a client bulk write.
648+
*
649+
* @public
650+
* @category Error
651+
*/
652+
export class MongoClientBulkWriteExecutionError extends MongoRuntimeError {
653+
/**
654+
* **Do not use this constructor!**
655+
*
656+
* Meant for internal use only.
657+
*
658+
* @remarks
659+
* This class is only meant to be constructed within the driver. This constructor is
660+
* not subject to semantic versioning compatibility guarantees and may change at any time.
661+
*
662+
* @public
663+
**/
664+
constructor(message: string) {
665+
super(message);
666+
}
667+
668+
override get name(): string {
669+
return 'MongoClientBulkWriteExecutionError';
643670
}
644671
}
645672

Diff for: src/index.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ export {
4444
MongoAWSError,
4545
MongoAzureError,
4646
MongoBatchReExecutionError,
47-
MongoBulkWriteCursorError,
4847
MongoChangeStreamError,
48+
MongoClientBulkWriteCursorError,
49+
MongoClientBulkWriteExecutionError,
4950
MongoCompatibilityError,
5051
MongoCursorExhaustedError,
5152
MongoCursorInUseError,
+43-7
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
1-
import { type Document } from 'bson';
2-
1+
import { MongoClientBulkWriteExecutionError, ServerType } from '../../beta';
32
import { ClientBulkWriteCursorResponse } from '../../cmap/wire_protocol/responses';
43
import type { Server } from '../../sdam/server';
54
import type { ClientSession } from '../../sessions';
65
import { MongoDBNamespace } from '../../utils';
76
import { CommandOperation } from '../command';
87
import { Aspect, defineAspects } from '../operation';
8+
import { type ClientBulkWriteCommandBuilder } from './command_builder';
99
import { type ClientBulkWriteOptions } from './common';
1010

1111
/**
1212
* Executes a single client bulk write operation within a potential batch.
1313
* @internal
1414
*/
1515
export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCursorResponse> {
16-
command: Document;
16+
commandBuilder: ClientBulkWriteCommandBuilder;
1717
override options: ClientBulkWriteOptions;
1818

1919
override get commandName() {
2020
return 'bulkWrite' as const;
2121
}
2222

23-
constructor(command: Document, options: ClientBulkWriteOptions) {
23+
constructor(commandBuilder: ClientBulkWriteCommandBuilder, options: ClientBulkWriteOptions) {
2424
super(undefined, options);
25-
this.command = command;
25+
this.commandBuilder = commandBuilder;
2626
this.options = options;
2727
this.ns = new MongoDBNamespace('admin', '$cmd');
2828
}
@@ -37,9 +37,45 @@ export class ClientBulkWriteOperation extends CommandOperation<ClientBulkWriteCu
3737
server: Server,
3838
session: ClientSession | undefined
3939
): Promise<ClientBulkWriteCursorResponse> {
40-
return await super.executeCommand(server, session, this.command, ClientBulkWriteCursorResponse);
40+
let command;
41+
42+
if (server.description.type === ServerType.LoadBalancer) {
43+
if (session) {
44+
// Checkout a connection to build the command.
45+
const connection = await server.pool.checkOut();
46+
// Pin the connection to the session so it get used to execute the command and we do not
47+
// perform a double check-in/check-out.
48+
session.pin(connection);
49+
command = this.commandBuilder.buildBatch(
50+
connection.hello?.maxMessageSizeBytes,
51+
connection.hello?.maxWriteBatchSize
52+
);
53+
} else {
54+
throw new MongoClientBulkWriteExecutionError(
55+
'Session provided to the client bulk write operation must be present.'
56+
);
57+
}
58+
} else {
59+
// At this point we have a server and the auto connect code has already
60+
// run in executeOperation, so the server description will be populated.
61+
// We can use that to build the command.
62+
if (!server.description.maxWriteBatchSize || !server.description.maxMessageSizeBytes) {
63+
throw new MongoClientBulkWriteExecutionError(
64+
'In order to execute a client bulk write, both maxWriteBatchSize and maxMessageSizeBytes must be provided by the servers hello response.'
65+
);
66+
}
67+
command = this.commandBuilder.buildBatch(
68+
server.description.maxMessageSizeBytes,
69+
server.description.maxWriteBatchSize
70+
);
71+
}
72+
return await super.executeCommand(server, session, command, ClientBulkWriteCursorResponse);
4173
}
4274
}
4375

4476
// Skipping the collation as it goes on the individual ops.
45-
defineAspects(ClientBulkWriteOperation, [Aspect.WRITE_OPERATION, Aspect.SKIP_COLLATION]);
77+
defineAspects(ClientBulkWriteOperation, [
78+
Aspect.WRITE_OPERATION,
79+
Aspect.SKIP_COLLATION,
80+
Aspect.CURSOR_CREATING
81+
]);

0 commit comments

Comments
 (0)