Skip to content

Commit c7ef318

Browse files
authored
6.x Backport of sequence number powered CAS PRs: #37977, #37857 and #37872 (#38155)
* Move update and delete by query to use seq# for optimistic concurrency control (#37857) The delete and update by query APIs both offer protection against overriding concurrent user changes to the documents they touch. They currently are using internal versioning. This PR changes that to rely on sequences numbers and primary terms. Relates #37639 Relates #36148 Relates #10708 * Add Seq# based optimistic concurrency control to UpdateRequest (#37872) The update request has a lesser known support for a one off update of a known document version. This PR adds an a seq# based alternative to power these operations. Relates #36148 Relates #10708 * Move watcher to use seq# and primary term for concurrency control (#37977) * Adapt minimum versions for seq# power operations After backporting #37977, #37857 and #37872
1 parent 2038221 commit c7ef318

File tree

76 files changed

+1321
-288
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+1321
-288
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

+19
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
7676
import org.elasticsearch.index.reindex.ReindexRequest;
7777
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
78+
import org.elasticsearch.index.seqno.SequenceNumbers;
7879
import org.elasticsearch.rest.BaseRestHandler;
7980
import org.elasticsearch.rest.action.search.RestSearchAction;
8081
import org.elasticsearch.script.mustache.MultiSearchTemplateRequest;
@@ -882,6 +883,24 @@ Params withWaitForActiveShards(ActiveShardCount currentActiveShardCount, ActiveS
882883
return this;
883884
}
884885

886+
Params withIfSeqNo(long ifSeqNo) {
887+
if (ifSeqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
888+
return putParam("if_seq_no", Long.toString(ifSeqNo));
889+
}
890+
return this;
891+
}
892+
893+
Params withIfPrimaryTerm(long ifPrimaryTerm) {
894+
if (ifPrimaryTerm != SequenceNumbers.UNASSIGNED_PRIMARY_TERM) {
895+
return putParam("if_primary_term", Long.toString(ifPrimaryTerm));
896+
}
897+
return this;
898+
}
899+
900+
Params withWaitForActiveShards(ActiveShardCount activeShardCount) {
901+
return withWaitForActiveShards(activeShardCount, ActiveShardCount.DEFAULT);
902+
}
903+
885904
Params withIndicesOptions(IndicesOptions indicesOptions) {
886905
if (indicesOptions != null) {
887906
withIgnoreUnavailable(indicesOptions.ignoreUnavailable());

client/rest-high-level/src/main/java/org/elasticsearch/client/WatcherRequestConverters.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ static Request putWatch(PutWatchRequest putWatchRequest) {
7171
.build();
7272

7373
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
74-
RequestConverters.Params params = new RequestConverters.Params(request).withVersion(putWatchRequest.getVersion());
74+
RequestConverters.Params params = new RequestConverters.Params(request)
75+
.withVersion(putWatchRequest.getVersion())
76+
.withIfSeqNo(putWatchRequest.ifSeqNo())
77+
.withIfPrimaryTerm(putWatchRequest.ifPrimaryTerm());
7578
if (putWatchRequest.isActive() == false) {
7679
params.putParam("active", "false");
7780
}

client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/GetWatchResponse.java

+25-4
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@
3131
import java.util.Map;
3232
import java.util.Objects;
3333

34+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
35+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
36+
3437
public class GetWatchResponse {
3538
private final String id;
3639
private final long version;
40+
private final long seqNo;
41+
private final long primaryTerm;
3742
private final WatchStatus status;
3843

3944
private final BytesReference source;
@@ -43,15 +48,18 @@ public class GetWatchResponse {
4348
* Ctor for missing watch
4449
*/
4550
public GetWatchResponse(String id) {
46-
this(id, Versions.NOT_FOUND, null, null, null);
51+
this(id, Versions.NOT_FOUND, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, null, null, null);
4752
}
4853

49-
public GetWatchResponse(String id, long version, WatchStatus status, BytesReference source, XContentType xContentType) {
54+
public GetWatchResponse(String id, long version, long seqNo, long primaryTerm, WatchStatus status,
55+
BytesReference source, XContentType xContentType) {
5056
this.id = id;
5157
this.version = version;
5258
this.status = status;
5359
this.source = source;
5460
this.xContentType = xContentType;
61+
this.seqNo = seqNo;
62+
this.primaryTerm = primaryTerm;
5563
}
5664

5765
public String getId() {
@@ -62,6 +70,14 @@ public long getVersion() {
6270
return version;
6371
}
6472

73+
public long getSeqNo() {
74+
return seqNo;
75+
}
76+
77+
public long getPrimaryTerm() {
78+
return primaryTerm;
79+
}
80+
6581
public boolean isFound() {
6682
return version != Versions.NOT_FOUND;
6783
}
@@ -111,6 +127,8 @@ public int hashCode() {
111127
private static final ParseField ID_FIELD = new ParseField("_id");
112128
private static final ParseField FOUND_FIELD = new ParseField("found");
113129
private static final ParseField VERSION_FIELD = new ParseField("_version");
130+
private static final ParseField SEQ_NO_FIELD = new ParseField("_seq_no");
131+
private static final ParseField PRIMARY_TERM_FIELD = new ParseField("_primary_term");
114132
private static final ParseField STATUS_FIELD = new ParseField("status");
115133
private static final ParseField WATCH_FIELD = new ParseField("watch");
116134

@@ -119,9 +137,10 @@ public int hashCode() {
119137
a -> {
120138
boolean isFound = (boolean) a[1];
121139
if (isFound) {
122-
XContentBuilder builder = (XContentBuilder) a[4];
140+
XContentBuilder builder = (XContentBuilder) a[6];
123141
BytesReference source = BytesReference.bytes(builder);
124-
return new GetWatchResponse((String) a[0], (long) a[2], (WatchStatus) a[3], source, builder.contentType());
142+
return new GetWatchResponse((String) a[0], (long) a[2], (long) a[3], (long) a[4], (WatchStatus) a[5],
143+
source, builder.contentType());
125144
} else {
126145
return new GetWatchResponse((String) a[0]);
127146
}
@@ -131,6 +150,8 @@ public int hashCode() {
131150
PARSER.declareString(ConstructingObjectParser.constructorArg(), ID_FIELD);
132151
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FOUND_FIELD);
133152
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), VERSION_FIELD);
153+
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), SEQ_NO_FIELD);
154+
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), PRIMARY_TERM_FIELD);
134155
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
135156
(parser, context) -> WatchStatus.parse(parser), STATUS_FIELD);
136157
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),

client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchRequest.java

+57
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@
2424
import org.elasticsearch.common.lucene.uid.Versions;
2525
import org.elasticsearch.common.unit.TimeValue;
2626
import org.elasticsearch.common.xcontent.XContentType;
27+
import org.elasticsearch.index.seqno.SequenceNumbers;
2728

2829
import java.util.Objects;
2930
import java.util.regex.Pattern;
3031

32+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
33+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
34+
3135
/**
3236
* This request class contains the data needed to create a watch along with the name of the watch.
3337
* The name of the watch will become the ID of the indexed document.
@@ -42,6 +46,9 @@ public final class PutWatchRequest implements Validatable {
4246
private final XContentType xContentType;
4347
private boolean active = true;
4448
private long version = Versions.MATCH_ANY;
49+
private long ifSeqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
50+
private long ifPrimaryTerm = UNASSIGNED_PRIMARY_TERM;
51+
4552

4653
public PutWatchRequest(String id, BytesReference source, XContentType xContentType) {
4754
Objects.requireNonNull(id, "watch id is missing");
@@ -98,6 +105,56 @@ public void setVersion(long version) {
98105
this.version = version;
99106
}
100107

108+
/**
109+
* only performs this put request if the watch's last modification was assigned the given
110+
* sequence number. Must be used in combination with {@link #setIfPrimaryTerm(long)}
111+
*
112+
* If the watch's last modification was assigned a different sequence number a
113+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
114+
*/
115+
public PutWatchRequest setIfSeqNo(long seqNo) {
116+
if (seqNo < 0 && seqNo != UNASSIGNED_SEQ_NO) {
117+
throw new IllegalArgumentException("sequence numbers must be non negative. got [" + seqNo + "].");
118+
}
119+
ifSeqNo = seqNo;
120+
return this;
121+
}
122+
123+
/**
124+
* only performs this put request if the watch's last modification was assigned the given
125+
* primary term. Must be used in combination with {@link #setIfSeqNo(long)}
126+
*
127+
* If the watch last modification was assigned a different term a
128+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
129+
*/
130+
public PutWatchRequest setIfPrimaryTerm(long term) {
131+
if (term < 0) {
132+
throw new IllegalArgumentException("primary term must be non negative. got [" + term + "]");
133+
}
134+
ifPrimaryTerm = term;
135+
return this;
136+
}
137+
138+
/**
139+
* If set, only perform this put watch request if the watch's last modification was assigned this sequence number.
140+
* If the watch last last modification was assigned a different sequence number a
141+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
142+
*/
143+
public long ifSeqNo() {
144+
return ifSeqNo;
145+
}
146+
147+
/**
148+
* If set, only perform this put watch request if the watch's last modification was assigned this primary term.
149+
*
150+
* If the watch's last modification was assigned a different term a
151+
* {@link org.elasticsearch.index.engine.VersionConflictEngineException} will be thrown.
152+
*/
153+
public long ifPrimaryTerm() {
154+
return ifPrimaryTerm;
155+
}
156+
157+
101158
public static boolean isValidId(String id) {
102159
return Strings.isEmpty(id) == false && NO_WS_PATTERN.matcher(id).matches();
103160
}

client/rest-high-level/src/main/java/org/elasticsearch/client/watcher/PutWatchResponse.java

+28-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.common.ParseField;
2222
import org.elasticsearch.common.xcontent.ObjectParser;
2323
import org.elasticsearch.common.xcontent.XContentParser;
24+
import org.elasticsearch.index.seqno.SequenceNumbers;
2425

2526
import java.io.IOException;
2627
import java.util.Objects;
@@ -32,20 +33,26 @@ public class PutWatchResponse {
3233

3334
static {
3435
PARSER.declareString(PutWatchResponse::setId, new ParseField("_id"));
36+
PARSER.declareLong(PutWatchResponse::setSeqNo, new ParseField("_seq_no"));
37+
PARSER.declareLong(PutWatchResponse::setPrimaryTerm, new ParseField("_primary_term"));
3538
PARSER.declareLong(PutWatchResponse::setVersion, new ParseField("_version"));
3639
PARSER.declareBoolean(PutWatchResponse::setCreated, new ParseField("created"));
3740
}
3841

3942
private String id;
4043
private long version;
44+
private long seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
45+
private long primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
4146
private boolean created;
4247

4348
public PutWatchResponse() {
4449
}
4550

46-
public PutWatchResponse(String id, long version, boolean created) {
51+
public PutWatchResponse(String id, long version, long seqNo, long primaryTerm, boolean created) {
4752
this.id = id;
4853
this.version = version;
54+
this.seqNo = seqNo;
55+
this.primaryTerm = primaryTerm;
4956
this.created = created;
5057
}
5158

@@ -57,6 +64,14 @@ private void setVersion(long version) {
5764
this.version = version;
5865
}
5966

67+
private void setSeqNo(long seqNo) {
68+
this.seqNo = seqNo;
69+
}
70+
71+
private void setPrimaryTerm(long primaryTerm) {
72+
this.primaryTerm = primaryTerm;
73+
}
74+
6075
private void setCreated(boolean created) {
6176
this.created = created;
6277
}
@@ -69,6 +84,14 @@ public long getVersion() {
6984
return version;
7085
}
7186

87+
public long getSeqNo() {
88+
return seqNo;
89+
}
90+
91+
public long getPrimaryTerm() {
92+
return primaryTerm;
93+
}
94+
7295
public boolean isCreated() {
7396
return created;
7497
}
@@ -80,12 +103,14 @@ public boolean equals(Object o) {
80103

81104
PutWatchResponse that = (PutWatchResponse) o;
82105

83-
return Objects.equals(id, that.id) && Objects.equals(version, that.version) && Objects.equals(created, that.created);
106+
return Objects.equals(id, that.id) && Objects.equals(version, that.version)
107+
&& Objects.equals(seqNo, that.seqNo)
108+
&& Objects.equals(primaryTerm, that.primaryTerm) && Objects.equals(created, that.created);
84109
}
85110

86111
@Override
87112
public int hashCode() {
88-
return Objects.hash(id, version, created);
113+
return Objects.hash(id, version, seqNo, primaryTerm, created);
89114
}
90115

91116
public static PutWatchResponse fromXContent(XContentParser parser) throws IOException {

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

+40-16
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@
8484
import java.util.concurrent.atomic.AtomicReference;
8585

8686
import static java.util.Collections.singletonMap;
87+
import static org.hamcrest.Matchers.containsString;
8788
import static org.hamcrest.Matchers.empty;
8889
import static org.hamcrest.Matchers.equalTo;
90+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
8991
import static org.hamcrest.Matchers.hasSize;
9092
import static org.hamcrest.Matchers.instanceOf;
9193
import static org.hamcrest.Matchers.lessThan;
@@ -547,23 +549,45 @@ public void testUpdate() throws IOException {
547549
IndexResponse indexResponse = highLevelClient().index(indexRequest, RequestOptions.DEFAULT);
548550
assertEquals(RestStatus.CREATED, indexResponse.status());
549551

550-
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
551-
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
552-
553-
UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
554-
assertEquals(RestStatus.OK, updateResponse.status());
555-
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
556-
557-
UpdateRequest updateRequestConflict = new UpdateRequest("index", "type", "id");
558-
updateRequestConflict.doc(singletonMap("field", "with_version_conflict"), randomFrom(XContentType.values()));
559-
updateRequestConflict.version(indexResponse.getVersion());
552+
long lastUpdateSeqNo;
553+
long lastUpdatePrimaryTerm;
554+
{
555+
UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
556+
updateRequest.doc(singletonMap("field", "updated"), randomFrom(XContentType.values()));
557+
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
558+
assertEquals(RestStatus.OK, updateResponse.status());
559+
assertEquals(indexResponse.getVersion() + 1, updateResponse.getVersion());
560+
lastUpdateSeqNo = updateResponse.getSeqNo();
561+
lastUpdatePrimaryTerm = updateResponse.getPrimaryTerm();
562+
assertThat(lastUpdateSeqNo, greaterThanOrEqualTo(0L));
563+
assertThat(lastUpdatePrimaryTerm, greaterThanOrEqualTo(1L));
564+
}
560565

561-
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
562-
execute(updateRequestConflict, highLevelClient()::update, highLevelClient()::updateAsync,
563-
highLevelClient()::update, highLevelClient()::updateAsync));
564-
assertEquals(RestStatus.CONFLICT, exception.status());
565-
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: version conflict, " +
566-
"current version [2] is different than the one provided [1]]", exception.getMessage());
566+
{
567+
final UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
568+
updateRequest.doc(singletonMap("field", "with_seq_no_conflict"), randomFrom(XContentType.values()));
569+
if (randomBoolean()) {
570+
updateRequest.setIfSeqNo(lastUpdateSeqNo + 1);
571+
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
572+
} else {
573+
updateRequest.setIfSeqNo(lastUpdateSeqNo + (randomBoolean() ? 0 : 1));
574+
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm + 1);
575+
}
576+
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () ->
577+
execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync));
578+
assertEquals(exception.toString(),RestStatus.CONFLICT, exception.status());
579+
assertThat(exception.getMessage(), containsString("Elasticsearch exception [type=version_conflict_engine_exception"));
580+
}
581+
{
582+
final UpdateRequest updateRequest = new UpdateRequest("index", "type", "id");
583+
updateRequest.doc(singletonMap("field", "with_seq_no"), randomFrom(XContentType.values()));
584+
updateRequest.setIfSeqNo(lastUpdateSeqNo);
585+
updateRequest.setIfPrimaryTerm(lastUpdatePrimaryTerm);
586+
final UpdateResponse updateResponse = execute(updateRequest, highLevelClient()::update, highLevelClient()::updateAsync);
587+
assertEquals(RestStatus.OK, updateResponse.status());
588+
assertEquals(lastUpdateSeqNo + 1, updateResponse.getSeqNo());
589+
assertEquals(lastUpdatePrimaryTerm, updateResponse.getPrimaryTerm());
590+
}
567591
}
568592
{
569593
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {

client/rest-high-level/src/test/java/org/elasticsearch/client/watcher/PutWatchResponseTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,18 @@ private static XContentBuilder toXContent(PutWatchResponse response, XContentBui
4141
return builder.startObject()
4242
.field("_id", response.getId())
4343
.field("_version", response.getVersion())
44+
.field("_seq_no", response.getSeqNo())
45+
.field("_primary_term", response.getPrimaryTerm())
4446
.field("created", response.isCreated())
4547
.endObject();
4648
}
4749

4850
private static PutWatchResponse createTestInstance() {
4951
String id = randomAlphaOfLength(10);
52+
long seqNo = randomNonNegativeLong();
53+
long primaryTerm = randomLongBetween(1, 200);
5054
long version = randomLongBetween(1, 10);
5155
boolean created = randomBoolean();
52-
return new PutWatchResponse(id, version, created);
56+
return new PutWatchResponse(id, version, seqNo, primaryTerm, created);
5357
}
5458
}

docs/reference/docs/delete.asciidoc

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ The result of the above delete operation is:
3939
[[optimistic-concurrency-control-delete]]
4040
=== Optimistic concurrency control
4141

42-
Delete operations can be made optional and only be performed if the last
42+
Delete operations can be made conditional and only be performed if the last
4343
modification to the document was assigned the sequence number and primary
4444
term specified by the `if_seq_no` and `if_primary_term` parameters. If a
4545
mismatch is detected, the operation will result in a `VersionConflictException`

0 commit comments

Comments
 (0)