Skip to content

Commit d21df2a

Browse files
authored
Use Sequence number powered OCC for processing updates (#37308)
Updates perform realtime get, perform the requested update and then index the document again using optimistic concurrency control. This PR changes the logic to use sequence numbers instead of versioning. Note that the current versioning logic isn't suffering from the same problem as external OCC requests because the get and indexing is always done on the same primary. Relates #36148 Relates #10708
1 parent 19a7e0f commit d21df2a

File tree

2 files changed

+6
-37
lines changed

2 files changed

+6
-37
lines changed

server/src/main/java/org/elasticsearch/action/update/UpdateHelper.java

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -163,19 +163,6 @@ Result prepareUpsert(ShardId shardId, UpdateRequest request, final GetResult get
163163
return new Result(indexRequest, DocWriteResponse.Result.CREATED, null, null);
164164
}
165165

166-
/**
167-
* Calculate the version to use for the update request, using either the existing version if internal versioning is used, or the get
168-
* result document's version if the version type is "FORCE".
169-
*/
170-
static long calculateUpdateVersion(UpdateRequest request, GetResult getResult) {
171-
if (request.versionType() != VersionType.INTERNAL) {
172-
assert request.versionType() == VersionType.FORCE;
173-
return request.version(); // remember, match_any is excluded by the conflict test
174-
} else {
175-
return getResult.getVersion();
176-
}
177-
}
178-
179166
/**
180167
* Calculate a routing value to be used, either the included index request's routing, or retrieved document's routing when defined.
181168
*/
@@ -195,7 +182,6 @@ static String calculateRouting(GetResult getResult, @Nullable IndexRequest updat
195182
* containing a new {@code IndexRequest} to be executed on the primary and replicas.
196183
*/
197184
Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResult getResult, boolean detectNoop) {
198-
final long updateVersion = calculateUpdateVersion(request, getResult);
199185
final IndexRequest currentRequest = request.doc();
200186
final String routing = calculateRouting(getResult, currentRequest);
201187
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
@@ -215,7 +201,8 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
215201
} else {
216202
final IndexRequest finalIndexRequest = Requests.indexRequest(request.index())
217203
.type(request.type()).id(request.id()).routing(routing)
218-
.source(updatedSourceAsMap, updateSourceContentType).version(updateVersion).versionType(request.versionType())
204+
.source(updatedSourceAsMap, updateSourceContentType)
205+
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
219206
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
220207
.setRefreshPolicy(request.getRefreshPolicy());
221208
return new Result(finalIndexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
@@ -228,7 +215,6 @@ Result prepareUpdateIndexRequest(ShardId shardId, UpdateRequest request, GetResu
228215
* primary and replicas.
229216
*/
230217
Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetResult getResult, LongSupplier nowInMillis) {
231-
final long updateVersion = calculateUpdateVersion(request, getResult);
232218
final IndexRequest currentRequest = request.doc();
233219
final String routing = calculateRouting(getResult, currentRequest);
234220
final Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(getResult.internalSourceRef(), true);
@@ -256,14 +242,16 @@ Result prepareUpdateScriptRequest(ShardId shardId, UpdateRequest request, GetRes
256242
case INDEX:
257243
final IndexRequest indexRequest = Requests.indexRequest(request.index())
258244
.type(request.type()).id(request.id()).routing(routing)
259-
.source(updatedSourceAsMap, updateSourceContentType).version(updateVersion).versionType(request.versionType())
245+
.source(updatedSourceAsMap, updateSourceContentType)
246+
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
260247
.waitForActiveShards(request.waitForActiveShards()).timeout(request.timeout())
261248
.setRefreshPolicy(request.getRefreshPolicy());
262249
return new Result(indexRequest, DocWriteResponse.Result.UPDATED, updatedSourceAsMap, updateSourceContentType);
263250
case DELETE:
264251
DeleteRequest deleteRequest = Requests.deleteRequest(request.index())
265252
.type(request.type()).id(request.id()).routing(routing)
266-
.version(updateVersion).versionType(request.versionType()).waitForActiveShards(request.waitForActiveShards())
253+
.setIfSeqNo(getResult.getSeqNo()).setIfPrimaryTerm(getResult.getPrimaryTerm())
254+
.waitForActiveShards(request.waitForActiveShards())
267255
.timeout(request.timeout()).setRefreshPolicy(request.getRefreshPolicy());
268256
return new Result(deleteRequest, DocWriteResponse.Result.DELETED, updatedSourceAsMap, updateSourceContentType);
269257
default:

server/src/test/java/org/elasticsearch/action/update/UpdateRequestTests.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.elasticsearch.common.xcontent.XContentType;
3939
import org.elasticsearch.common.xcontent.json.JsonXContent;
4040
import org.elasticsearch.env.Environment;
41-
import org.elasticsearch.index.VersionType;
4241
import org.elasticsearch.index.get.GetResult;
4342
import org.elasticsearch.index.shard.ShardId;
4443
import org.elasticsearch.script.MockScriptEngine;
@@ -570,24 +569,6 @@ public void testRoutingExtraction() throws Exception {
570569
assertThat(UpdateHelper.calculateRouting(getResult, indexRequest), equalTo("routing1"));
571570
}
572571

573-
@SuppressWarnings("deprecated") // VersionType.FORCE is deprecated
574-
public void testCalculateUpdateVersion() throws Exception {
575-
long randomVersion = randomIntBetween(0, 100);
576-
GetResult getResult = new GetResult("test", "type", "1", 0, 1, randomVersion, true, new BytesArray("{}"), null);
577-
578-
UpdateRequest request = new UpdateRequest("test", "type1", "1");
579-
long version = UpdateHelper.calculateUpdateVersion(request, getResult);
580-
581-
// Use the get result's version
582-
assertThat(version, equalTo(randomVersion));
583-
584-
request = new UpdateRequest("test", "type1", "1").versionType(VersionType.FORCE).version(1337);
585-
version = UpdateHelper.calculateUpdateVersion(request, getResult);
586-
587-
// Use the forced update request version
588-
assertThat(version, equalTo(1337L));
589-
}
590-
591572
public void testNoopDetection() throws Exception {
592573
ShardId shardId = new ShardId("test", "", 0);
593574
GetResult getResult = new GetResult("test", "type", "1", 0, 1, 0, true,

0 commit comments

Comments
 (0)