Skip to content

Commit 12657fd

Browse files
authored
if_seq_no and if_primary_term parameters aren't wired correctly in REST Client's CRUD API (#38411)
1 parent df4eb04 commit 12657fd

File tree

6 files changed

+142
-13
lines changed

6 files changed

+142
-13
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ static Request delete(DeleteRequest deleteRequest) {
108108
parameters.withTimeout(deleteRequest.timeout());
109109
parameters.withVersion(deleteRequest.version());
110110
parameters.withVersionType(deleteRequest.versionType());
111+
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
112+
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
111113
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
112114
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards());
113115
return request;
@@ -191,6 +193,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
191193
}
192194
}
193195

196+
if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
197+
metadata.field("if_seq_no", action.ifSeqNo());
198+
metadata.field("if_primary_term", action.ifPrimaryTerm());
199+
}
200+
194201
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
195202
IndexRequest indexRequest = (IndexRequest) action;
196203
if (Strings.hasLength(indexRequest.getPipeline())) {
@@ -319,6 +326,8 @@ static Request index(IndexRequest indexRequest) {
319326
parameters.withTimeout(indexRequest.timeout());
320327
parameters.withVersion(indexRequest.version());
321328
parameters.withVersionType(indexRequest.versionType());
329+
parameters.withIfSeqNo(indexRequest.ifSeqNo());
330+
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
322331
parameters.withPipeline(indexRequest.getPipeline());
323332
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
324333
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,13 @@ public void testDelete() throws IOException {
104104
{
105105
// Testing deletion
106106
String docId = "id";
107-
highLevelClient().index(
107+
IndexResponse indexResponse = highLevelClient().index(
108108
new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
109+
assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L));
109110
DeleteRequest deleteRequest = new DeleteRequest("index", docId);
110111
if (randomBoolean()) {
111-
deleteRequest.version(1L);
112+
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
113+
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
112114
}
113115
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
114116
assertEquals("index", deleteResponse.getIndex());
@@ -131,12 +133,13 @@ public void testDelete() throws IOException {
131133
String docId = "version_conflict";
132134
highLevelClient().index(
133135
new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
134-
DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2);
136+
DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2);
135137
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
136138
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync));
137139
assertEquals(RestStatus.CONFLICT, exception.status());
138140
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " +
139-
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
141+
"version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]",
142+
exception.getMessage());
140143
assertEquals("index", exception.getMetadata("es.index").get(0));
141144
}
142145
{
@@ -519,13 +522,14 @@ public void testIndex() throws IOException {
519522
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
520523
IndexRequest wrongRequest = new IndexRequest("index").id("id");
521524
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
522-
wrongRequest.version(5L);
525+
wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L);
523526

524527
execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync);
525528
});
526529
assertEquals(RestStatus.CONFLICT, exception.status());
527530
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " +
528-
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
531+
"version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]",
532+
exception.getMessage());
529533
assertEquals("index", exception.getMetadata("es.index").get(0));
530534
}
531535
{
@@ -820,7 +824,8 @@ public void testBulk() throws IOException {
820824
if (opType == DocWriteRequest.OpType.INDEX) {
821825
IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType);
822826
if (erroneous) {
823-
indexRequest.version(12L);
827+
indexRequest.setIfSeqNo(12L);
828+
indexRequest.setIfPrimaryTerm(12L);
824829
}
825830
bulkRequest.add(indexRequest);
826831

@@ -1130,7 +1135,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
11301135
if (opType == DocWriteRequest.OpType.INDEX) {
11311136
IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i);
11321137
if (erroneous) {
1133-
indexRequest.version(12L);
1138+
indexRequest.setIfSeqNo(12L);
1139+
indexRequest.setIfPrimaryTerm(12L);
11341140
}
11351141
processor.add(indexRequest);
11361142

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ public void testDelete() {
281281
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
282282
setRandomVersion(deleteRequest, expectedParams);
283283
setRandomVersionType(deleteRequest::versionType, expectedParams);
284+
setRandomIfSeqNoAndTerm(deleteRequest, expectedParams);
284285

285286
if (frequently()) {
286287
if (randomBoolean()) {
@@ -631,6 +632,7 @@ public void testIndex() throws IOException {
631632
} else {
632633
setRandomVersion(indexRequest, expectedParams);
633634
setRandomVersionType(indexRequest::versionType, expectedParams);
635+
setRandomIfSeqNoAndTerm(indexRequest, expectedParams);
634636
}
635637

636638
if (frequently()) {
@@ -768,6 +770,7 @@ public void testUpdate() throws IOException {
768770
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
769771
setRandomVersion(updateRequest, expectedParams);
770772
setRandomVersionType(updateRequest::versionType, expectedParams);
773+
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
771774
if (randomBoolean()) {
772775
int retryOnConflict = randomIntBetween(0, 5);
773776
updateRequest.retryOnConflict(retryOnConflict);
@@ -798,6 +801,7 @@ public void testUpdate() throws IOException {
798801
assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert());
799802
assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop());
800803
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
804+
assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest);
801805
assertEquals(updateRequest.script(), parsedUpdateRequest.script());
802806
if (updateRequest.doc() != null) {
803807
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
@@ -811,6 +815,22 @@ public void testUpdate() throws IOException {
811815
}
812816
}
813817

818+
private static void assertIfSeqNoAndTerm(DocWriteRequest<?>request, DocWriteRequest<?> parsedRequest) {
819+
assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo());
820+
assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
821+
}
822+
823+
private static void setRandomIfSeqNoAndTerm(DocWriteRequest<?> request, Map<String, String> expectedParams) {
824+
if (randomBoolean()) {
825+
final long seqNo = randomNonNegativeLong();
826+
request.setIfSeqNo(seqNo);
827+
expectedParams.put("if_seq_no", Long.toString(seqNo));
828+
final long primaryTerm = randomLongBetween(1, 200);
829+
request.setIfPrimaryTerm(primaryTerm);
830+
expectedParams.put("if_primary_term", Long.toString(primaryTerm));
831+
}
832+
}
833+
814834
public void testUpdateWithType() throws IOException {
815835
String index = randomAlphaOfLengthBetween(3, 10);
816836
String type = randomAlphaOfLengthBetween(3, 10);
@@ -892,10 +912,15 @@ public void testBulk() throws IOException {
892912
docWriteRequest.routing(randomAlphaOfLength(10));
893913
}
894914
if (randomBoolean()) {
895-
docWriteRequest.version(randomNonNegativeLong());
896-
}
897-
if (randomBoolean()) {
898-
docWriteRequest.versionType(randomFrom(VersionType.values()));
915+
if (randomBoolean()) {
916+
docWriteRequest.version(randomNonNegativeLong());
917+
}
918+
if (randomBoolean()) {
919+
docWriteRequest.versionType(randomFrom(VersionType.values()));
920+
}
921+
} else if (randomBoolean()) {
922+
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
923+
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
899924
}
900925
bulkRequest.add(docWriteRequest);
901926
}
@@ -925,6 +950,8 @@ public void testBulk() throws IOException {
925950
assertEquals(originalRequest.routing(), parsedRequest.routing());
926951
assertEquals(originalRequest.version(), parsedRequest.version());
927952
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
953+
assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo());
954+
assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
928955

929956
DocWriteRequest.OpType opType = originalRequest.opType();
930957
if (opType == DocWriteRequest.OpType.INDEX) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
---
2+
"Compare And Swap Sequence Numbers":
3+
4+
- skip:
5+
version: " - 6.99.99"
6+
reason: typeless API are add in 7.0.0
7+
8+
- do:
9+
index:
10+
index: test_1
11+
id: 1
12+
body: { foo: bar }
13+
- match: { _version: 1}
14+
- set: { _seq_no: seqno }
15+
- set: { _primary_term: primary_term }
16+
17+
- do:
18+
bulk:
19+
body:
20+
- index:
21+
_index: test_1
22+
_id: 1
23+
if_seq_no: 10000
24+
if_primary_term: $primary_term
25+
- foo: bar2
26+
27+
- match: { errors: true }
28+
- match: { items.0.index.status: 409 }
29+
- match: { items.0.index.error.type: version_conflict_engine_exception }
30+
31+
- do:
32+
bulk:
33+
body:
34+
- index:
35+
_index: test_1
36+
_id: 1
37+
if_seq_no: $seqno
38+
if_primary_term: $primary_term
39+
- foo: bar2
40+
41+
- match: { errors: false}
42+
- match: { items.0.index.status: 200 }
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
---
2+
"Compare And Swap Sequence Numbers":
3+
4+
- skip:
5+
version: " - 6.6.99"
6+
reason: cas operations with sequence numbers was added in 6.7
7+
8+
- do:
9+
index:
10+
index: test_1
11+
type: _doc
12+
id: 1
13+
body: { foo: bar }
14+
- match: { _version: 1}
15+
- set: { _seq_no: seqno }
16+
- set: { _primary_term: primary_term }
17+
18+
- do:
19+
bulk:
20+
body:
21+
- index:
22+
_index: test_1
23+
_type: _doc
24+
_id: 1
25+
if_seq_no: 10000
26+
if_primary_term: $primary_term
27+
- foo: bar2
28+
29+
- match: { errors: true }
30+
- match: { items.0.index.status: 409 }
31+
- match: { items.0.index.error.type: version_conflict_engine_exception }
32+
33+
- do:
34+
bulk:
35+
body:
36+
- index:
37+
_index: test_1
38+
_type: _doc
39+
_id: 1
40+
if_seq_no: $seqno
41+
if_primary_term: $primary_term
42+
- foo: bar2
43+
44+
- match: { errors: false}
45+
- match: { items.0.index.status: 200 }

rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
- skip:
55
version: " - 6.99.99"
6-
reason: cas ops are introduced in 7.0.0
6+
reason: typesless api was introduces in 7.0
77

88
- do:
99
index:

0 commit comments

Comments
 (0)