Skip to content

Commit 355c6ea

Browse files
stIncMaleValentin Kovalenkovbabaninjyemin
authored
Introduce Client Bulk Write API (#1509)
Introduce the Client Bulk Write API to support write operations across multiple collections and databases. This update allows mixed operations and cross-namespace bulk writes to be executed in a single batch: https://www.mongodb.com/docs/manual/reference/command/bulkWrite/. JAVA-4586 --------- Co-authored-by: Valentin Kovalenko <[email protected]> Co-authored-by: Viacheslav Babanin <[email protected]> Co-authored-by: Jeff Yemin <[email protected]>
1 parent 7340b5c commit 355c6ea

File tree

166 files changed

+14983
-1118
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

166 files changed

+14983
-1118
lines changed

Diff for: bson/src/main/org/bson/AbstractBsonWriter.java

+9
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,15 @@ protected void throwInvalidState(final String methodName, final State... validSt
748748
methodName, validStatesString, state));
749749
}
750750

751+
/**
752+
* {@inheritDoc}
753+
* <p>
754+
* The {@link #flush()} method of {@link AbstractBsonWriter} does nothing.</p>
755+
*/
756+
@Override
757+
public void flush() {
758+
}
759+
751760
@Override
752761
public void close() {
753762
closed = true;

Diff for: bson/src/main/org/bson/BSONCallbackAdapter.java

-5
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,6 @@ protected BSONCallbackAdapter(final BsonWriterSettings settings, final BSONCallb
3939
this.bsonCallback = bsonCallback;
4040
}
4141

42-
@Override
43-
public void flush() {
44-
//Looks like should be no-op?
45-
}
46-
4742
@Override
4843
public void doWriteStartDocument() {
4944
BsonContextType contextType = getState() == State.SCOPE_DOCUMENT

Diff for: bson/src/main/org/bson/BsonBinaryWriter.java

-4
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,6 @@ public BsonBinaryWriterSettings getBinaryWriterSettings() {
108108
return binaryWriterSettings;
109109
}
110110

111-
@Override
112-
public void flush() {
113-
}
114-
115111
@Override
116112
protected Context getContext() {
117113
return (Context) super.getContext();

Diff for: bson/src/main/org/bson/BsonDocumentWriter.java

-4
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,6 @@ public void doWriteUndefined() {
194194
write(new BsonUndefined());
195195
}
196196

197-
@Override
198-
public void flush() {
199-
}
200-
201197
@Override
202198
protected Context getContext() {
203199
return (Context) super.getContext();

Diff for: bson/src/main/org/bson/io/OutputBuffer.java

+10
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ public void write(final byte[] b) {
4141
public void close() {
4242
}
4343

44+
/**
45+
* {@inheritDoc}
46+
* <p>
47+
* The {@link #flush()} method of {@link OutputBuffer} does nothing.</p>
48+
*/
49+
@Override
50+
public void flush() throws IOException {
51+
super.flush();
52+
}
53+
4454
@Override
4555
public void write(final byte[] bytes, final int offset, final int length) {
4656
writeBytes(bytes, offset, length);

Diff for: build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ ext {
5959
junitBomVersion = '5.10.2'
6060
logbackVersion = '1.3.14'
6161
graalSdkVersion = '24.0.0'
62+
reflectionsVersion = '0.9.10'
6263
gitVersion = getGitVersion()
6364
}
6465

@@ -128,7 +129,7 @@ configure(scalaProjects) {
128129
testImplementation('org.scalatestplus:junit-4-13_%%:3.2.9.0')
129130
testImplementation('org.scalatestplus:mockito-3-12_%%:3.2.10.0')
130131
testImplementation("ch.qos.logback:logback-classic:$logbackVersion")
131-
testImplementation('org.reflections:reflections:0.9.10')
132+
testImplementation("org.reflections:reflections:$reflectionsVersion")
132133
}
133134

134135
test{

Diff for: config/detekt/detekt.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ complexity:
159159
active: true
160160
excludes: ['**/test/**']
161161
thresholdInFiles: 25
162-
thresholdInClasses: 25
162+
thresholdInClasses: 27
163163
thresholdInInterfaces: 25
164164
thresholdInObjects: 25
165165
thresholdInEnums: 25

Diff for: driver-core/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ dependencies {
6060

6161
testImplementation project(':bson').sourceSets.test.output
6262
testImplementation('org.junit.jupiter:junit-jupiter-api')
63+
testImplementation("org.reflections:reflections:$reflectionsVersion")
6364
testRuntimeOnly "io.netty:netty-tcnative-boringssl-static"
6465

6566
classifiers.forEach {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb;
17+
18+
import com.mongodb.bulk.WriteConcernError;
19+
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
20+
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
21+
import com.mongodb.lang.Nullable;
22+
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.Optional;
26+
27+
import static com.mongodb.assertions.Assertions.isTrueArgument;
28+
import static com.mongodb.assertions.Assertions.notNull;
29+
import static com.mongodb.internal.operation.ClientBulkWriteOperation.Exceptions.serverAddressFromException;
30+
import static java.util.Collections.emptyList;
31+
import static java.util.Collections.emptyMap;
32+
import static java.util.Collections.unmodifiableList;
33+
import static java.util.Collections.unmodifiableMap;
34+
import static java.util.Optional.ofNullable;
35+
36+
/**
37+
* The result of an unsuccessful or partially unsuccessful client-level bulk write operation.
38+
* Note that the {@linkplain #getCode() code} and {@linkplain #getErrorLabels() labels} from this exception are not useful.
39+
* An application should use those from the {@linkplain #getCause() top-level error}.
40+
*
41+
* @see ClientBulkWriteResult
42+
* @since 5.3
43+
* @serial exclude
44+
*/
45+
public final class ClientBulkWriteException extends MongoServerException {
46+
private static final long serialVersionUID = 1;
47+
48+
private final List<WriteConcernError> writeConcernErrors;
49+
private final Map<Integer, WriteError> writeErrors;
50+
@Nullable
51+
private final ClientBulkWriteResult partialResult;
52+
53+
/**
54+
* Constructs a new instance.
55+
*
56+
* @param error The {@linkplain #getCause() top-level error}.
57+
* @param writeConcernErrors The {@linkplain #getWriteConcernErrors() write concern errors}.
58+
* @param writeErrors The {@linkplain #getWriteErrors() write errors}.
59+
* @param partialResult The {@linkplain #getPartialResult() partial result}.
60+
* @param serverAddress The {@linkplain MongoServerException#getServerAddress() server address}.
61+
* If {@code error} is a {@link MongoServerException} or a {@link MongoSocketException}, then {@code serverAddress}
62+
* must be equal to the {@link ServerAddress} they bear.
63+
*/
64+
public ClientBulkWriteException(
65+
@Nullable final MongoException error,
66+
@Nullable final List<WriteConcernError> writeConcernErrors,
67+
@Nullable final Map<Integer, WriteError> writeErrors,
68+
@Nullable final ClientBulkWriteResult partialResult,
69+
final ServerAddress serverAddress) {
70+
super(
71+
message(
72+
error, writeConcernErrors, writeErrors, partialResult,
73+
notNull("serverAddress", serverAddress)),
74+
validateServerAddress(error, serverAddress));
75+
initCause(error);
76+
isTrueArgument("At least one of `writeConcernErrors`, `writeErrors`, `partialResult` must be non-null or non-empty",
77+
!(writeConcernErrors == null || writeConcernErrors.isEmpty())
78+
|| !(writeErrors == null || writeErrors.isEmpty())
79+
|| partialResult != null);
80+
this.writeConcernErrors = writeConcernErrors == null ? emptyList() : unmodifiableList(writeConcernErrors);
81+
this.writeErrors = writeErrors == null ? emptyMap() : unmodifiableMap(writeErrors);
82+
this.partialResult = partialResult;
83+
}
84+
85+
private static String message(
86+
@Nullable final MongoException error,
87+
@Nullable final List<WriteConcernError> writeConcernErrors,
88+
@Nullable final Map<Integer, WriteError> writeErrors,
89+
@Nullable final ClientBulkWriteResult partialResult,
90+
final ServerAddress serverAddress) {
91+
return "Client-level bulk write operation error on server " + serverAddress + "."
92+
+ (error == null ? "" : " Top-level error: " + error + ".")
93+
+ (writeErrors == null || writeErrors.isEmpty() ? "" : " Write errors: " + writeErrors + ".")
94+
+ (writeConcernErrors == null || writeConcernErrors.isEmpty() ? "" : " Write concern errors: " + writeConcernErrors + ".")
95+
+ (partialResult == null ? "" : " Partial result: " + partialResult + ".");
96+
}
97+
98+
private static ServerAddress validateServerAddress(@Nullable final MongoException error, final ServerAddress serverAddress) {
99+
serverAddressFromException(error).ifPresent(serverAddressFromError ->
100+
isTrueArgument("`serverAddress` must be equal to that of the `error`", serverAddressFromError.equals(serverAddress)));
101+
return error instanceof MongoServerException
102+
? ((MongoServerException) error).getServerAddress()
103+
: serverAddress;
104+
}
105+
106+
/**
107+
* The top-level error. That is an error that is neither a {@linkplain #getWriteConcernErrors() write concern error},
108+
* nor is an {@linkplain #getWriteErrors() error of an individual write operation}.
109+
*
110+
* @return The top-level error. Non-{@code null} only if a top-level error occurred.
111+
*/
112+
@Override
113+
@Nullable
114+
public MongoException getCause() {
115+
return (MongoException) super.getCause();
116+
}
117+
118+
/**
119+
* The {@link WriteConcernError}s that occurred while executing the client-level bulk write operation.
120+
* <p>
121+
* There are no guarantees on mutability of the {@link List} returned.</p>
122+
*
123+
* @return The {@link WriteConcernError}s.
124+
*/
125+
public List<WriteConcernError> getWriteConcernErrors() {
126+
return writeConcernErrors;
127+
}
128+
129+
/**
130+
* The indexed {@link WriteError}s.
131+
* The {@linkplain Map#keySet() keys} are the indexes of the corresponding {@link ClientNamespacedWriteModel}s
132+
* in the corresponding client-level bulk write operation.
133+
* <p>
134+
* There are no guarantees on mutability or iteration order of the {@link Map} returned.</p>
135+
*
136+
* @return The indexed {@link WriteError}s.
137+
* @see ClientBulkWriteResult.VerboseResults#getInsertResults()
138+
* @see ClientBulkWriteResult.VerboseResults#getUpdateResults()
139+
* @see ClientBulkWriteResult.VerboseResults#getDeleteResults()
140+
*/
141+
public Map<Integer, WriteError> getWriteErrors() {
142+
return writeErrors;
143+
}
144+
145+
/**
146+
* The result of the part of a client-level bulk write operation that is known to be successful.
147+
*
148+
* @return The successful partial result. {@linkplain Optional#isPresent() Present} only if the client received a response indicating success
149+
* of at least one {@linkplain ClientNamespacedWriteModel individual write operation}.
150+
*/
151+
public Optional<ClientBulkWriteResult> getPartialResult() {
152+
return ofNullable(partialResult);
153+
}
154+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client.model.bulk;
18+
19+
/**
20+
* The methods declared in this interface are part of the public API of subclasses or sub-interfaces.
21+
*/
22+
interface BaseClientDeleteOptions extends BaseClientWriteModelOptions {
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.mongodb.client.model.bulk;
17+
18+
import com.mongodb.lang.Nullable;
19+
import org.bson.conversions.Bson;
20+
21+
/**
22+
* The methods declared in this interface are part of the public API of subclasses or sub-interfaces.
23+
*/
24+
interface BaseClientUpdateOptions extends BaseClientWriteModelOptions, BaseClientUpsertableWriteModelOptions {
25+
26+
BaseClientUpdateOptions arrayFilters(@Nullable Iterable<? extends Bson> arrayFilters);
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client.model.bulk;
18+
19+
import com.mongodb.lang.Nullable;
20+
21+
/**
22+
* The methods declared in this interface are part of the public API of subclasses or sub-interfaces.
23+
*/
24+
interface BaseClientUpsertableWriteModelOptions {
25+
BaseClientUpsertableWriteModelOptions upsert(@Nullable Boolean upsert);
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.client.model.bulk;
18+
19+
import com.mongodb.client.model.Collation;
20+
import com.mongodb.lang.Nullable;
21+
import org.bson.conversions.Bson;
22+
23+
/**
24+
* The methods declared in this interface are part of the public API of subclasses or sub-interfaces.
25+
*/
26+
interface BaseClientWriteModelOptions {
27+
BaseClientWriteModelOptions collation(@Nullable Collation collation);
28+
29+
BaseClientWriteModelOptions hint(@Nullable Bson hint);
30+
31+
BaseClientWriteModelOptions hintString(@Nullable String hintString);
32+
}

0 commit comments

Comments
 (0)