Skip to content

Commit 3e0819a

Browse files
authored
Backport of elastic#38411: if_seq_no and if_primary_term parameters aren't wired correctly in REST Client's CRUD API
1 parent 9dde53b commit 3e0819a

File tree

4 files changed

+130
-16
lines changed

4 files changed

+130
-16
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ static Request delete(DeleteRequest deleteRequest) {
109109
parameters.withTimeout(deleteRequest.timeout());
110110
parameters.withVersion(deleteRequest.version());
111111
parameters.withVersionType(deleteRequest.versionType());
112+
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
113+
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
112114
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
113115
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
114116
return request;
@@ -193,6 +195,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
193195
}
194196
}
195197

198+
if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
199+
metadata.field("if_seq_no", action.ifSeqNo());
200+
metadata.field("if_primary_term", action.ifPrimaryTerm());
201+
}
202+
196203
if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
197204
IndexRequest indexRequest = (IndexRequest) action;
198205
if (Strings.hasLength(indexRequest.getPipeline())) {
@@ -309,6 +316,8 @@ static Request index(IndexRequest indexRequest) {
309316
parameters.withTimeout(indexRequest.timeout());
310317
parameters.withVersion(indexRequest.version());
311318
parameters.withVersionType(indexRequest.versionType());
319+
parameters.withIfSeqNo(indexRequest.ifSeqNo());
320+
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
312321
parameters.withPipeline(indexRequest.getPipeline());
313322
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
314323
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.elasticsearch.client.core.MultiTermVectorsResponse;
4949
import org.elasticsearch.client.core.TermVectorsRequest;
5050
import org.elasticsearch.client.core.TermVectorsResponse;
51+
import org.elasticsearch.client.indices.CreateIndexRequest;
5152
import org.elasticsearch.common.Strings;
5253
import org.elasticsearch.common.bytes.BytesReference;
5354
import org.elasticsearch.common.settings.Settings;
@@ -95,14 +96,20 @@
9596
public class CrudIT extends ESRestHighLevelClientTestCase {
9697

9798
public void testDelete() throws IOException {
99+
highLevelClient().indices().create(
100+
new CreateIndexRequest("index").settings(Collections.singletonMap("index.number_of_shards", "1")),
101+
RequestOptions.DEFAULT);
98102
{
99103
// Testing deletion
100104
String docId = "id";
101-
highLevelClient().index(
105+
IndexResponse indexResponse = highLevelClient().index(
102106
new IndexRequest("index", "type", docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
103107
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId);
104108
if (randomBoolean()) {
105-
deleteRequest.version(1L);
109+
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
110+
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
111+
} else {
112+
deleteRequest.version(indexResponse.getVersion());
106113
}
107114
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync,
108115
highLevelClient()::delete, highLevelClient()::deleteAsync);
@@ -127,13 +134,26 @@ public void testDelete() throws IOException {
127134
String docId = "version_conflict";
128135
highLevelClient().index(
129136
new IndexRequest("index", "type", docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
130-
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId).version(2);
137+
DeleteRequest deleteRequest = new DeleteRequest("index", "type", docId);
138+
final boolean seqNos = randomBoolean();
139+
if (seqNos) {
140+
deleteRequest.setIfSeqNo(2).setIfPrimaryTerm(2);
141+
} else {
142+
deleteRequest.version(2);
143+
}
144+
131145
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
132146
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync,
133147
highLevelClient()::delete, highLevelClient()::deleteAsync));
134148
assertEquals(RestStatus.CONFLICT, exception.status());
135-
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][" + docId + "]: " +
136-
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
149+
if (seqNos) {
150+
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][" + docId + "]: " +
151+
"version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]",
152+
exception.getMessage());
153+
} else {
154+
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][" + docId + "]: " +
155+
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
156+
}
137157
assertEquals("index", exception.getMetadata("es.index").get(0));
138158
}
139159
{
@@ -453,18 +473,29 @@ public void testIndex() throws IOException {
453473
assertEquals("type", indexResponse.getType());
454474
assertEquals("id", indexResponse.getId());
455475
assertEquals(2L, indexResponse.getVersion());
476+
final boolean seqNosForConflict = randomBoolean();
456477

457478
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
458479
IndexRequest wrongRequest = new IndexRequest("index", "type", "id");
459480
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
460-
wrongRequest.version(5L);
481+
if (seqNosForConflict) {
482+
wrongRequest.setIfSeqNo(2).setIfPrimaryTerm(2);
483+
} else {
484+
wrongRequest.version(5);
485+
}
461486

462487
execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync,
463488
highLevelClient()::index, highLevelClient()::indexAsync);
464489
});
465490
assertEquals(RestStatus.CONFLICT, exception.status());
466-
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: " +
467-
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
491+
if (seqNosForConflict) {
492+
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: " +
493+
"version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]",
494+
exception.getMessage());
495+
} else {
496+
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[type][id]: " +
497+
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
498+
}
468499
assertEquals("index", exception.getMetadata("es.index").get(0));
469500
}
470501
{
@@ -763,7 +794,8 @@ public void testBulk() throws IOException {
763794
if (opType == DocWriteRequest.OpType.INDEX) {
764795
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(source, xContentType);
765796
if (erroneous) {
766-
indexRequest.version(12L);
797+
indexRequest.setIfSeqNo(12L);
798+
indexRequest.setIfPrimaryTerm(12L);
767799
}
768800
bulkRequest.add(indexRequest);
769801

@@ -1075,7 +1107,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
10751107
if (opType == DocWriteRequest.OpType.INDEX) {
10761108
IndexRequest indexRequest = new IndexRequest("index", "test", id).source(xContentType, "id", i);
10771109
if (erroneous) {
1078-
indexRequest.version(12L);
1110+
indexRequest.setIfSeqNo(12L);
1111+
indexRequest.setIfPrimaryTerm(12L);
10791112
}
10801113
processor.add(indexRequest);
10811114

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@
5353
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
5454
import org.elasticsearch.action.support.master.MasterNodeRequest;
5555
import org.elasticsearch.action.support.replication.ReplicationRequest;
56-
import org.elasticsearch.client.core.MultiTermVectorsRequest;
57-
import org.elasticsearch.client.core.TermVectorsRequest;
5856
import org.elasticsearch.action.update.UpdateRequest;
5957
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
6058
import org.elasticsearch.client.core.CountRequest;
59+
import org.elasticsearch.client.core.MultiTermVectorsRequest;
60+
import org.elasticsearch.client.core.TermVectorsRequest;
6161
import org.elasticsearch.common.CheckedBiConsumer;
6262
import org.elasticsearch.common.Strings;
6363
import org.elasticsearch.common.bytes.BytesArray;
@@ -216,6 +216,7 @@ public void testDelete() {
216216
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
217217
setRandomVersion(deleteRequest, expectedParams);
218218
setRandomVersionType(deleteRequest::versionType, expectedParams);
219+
setRandomIfSeqNoAndTerm(deleteRequest, expectedParams);
219220

220221
if (frequently()) {
221222
if (randomBoolean()) {
@@ -545,6 +546,7 @@ public void testIndex() throws IOException {
545546
} else {
546547
setRandomVersion(indexRequest, expectedParams);
547548
setRandomVersionType(indexRequest::versionType, expectedParams);
549+
setRandomIfSeqNoAndTerm(indexRequest, expectedParams);
548550
}
549551

550552
if (frequently()) {
@@ -650,6 +652,7 @@ public void testUpdate() throws IOException {
650652
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, ActiveShardCount.DEFAULT, expectedParams);
651653
setRandomVersion(updateRequest, expectedParams);
652654
setRandomVersionType(updateRequest::versionType, expectedParams);
655+
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
653656
if (randomBoolean()) {
654657
int retryOnConflict = randomIntBetween(0, 5);
655658
updateRequest.retryOnConflict(retryOnConflict);
@@ -680,6 +683,7 @@ public void testUpdate() throws IOException {
680683
assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert());
681684
assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop());
682685
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
686+
assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest);
683687
assertEquals(updateRequest.script(), parsedUpdateRequest.script());
684688
if (updateRequest.doc() != null) {
685689
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
@@ -693,6 +697,22 @@ public void testUpdate() throws IOException {
693697
}
694698
}
695699

700+
private static void assertIfSeqNoAndTerm(DocWriteRequest<?>request, DocWriteRequest<?> parsedRequest) {
701+
assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo());
702+
assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
703+
}
704+
705+
private static void setRandomIfSeqNoAndTerm(DocWriteRequest<?> request, Map<String, String> expectedParams) {
706+
if (randomBoolean()) {
707+
final long seqNo = randomNonNegativeLong();
708+
request.setIfSeqNo(seqNo);
709+
expectedParams.put("if_seq_no", Long.toString(seqNo));
710+
final long primaryTerm = randomLongBetween(1, 200);
711+
request.setIfPrimaryTerm(primaryTerm);
712+
expectedParams.put("if_primary_term", Long.toString(primaryTerm));
713+
}
714+
}
715+
696716
public void testUpdateWithDifferentContentTypes() {
697717
IllegalStateException exception = expectThrows(IllegalStateException.class, () -> {
698718
UpdateRequest updateRequest = new UpdateRequest();
@@ -767,10 +787,15 @@ public void testBulk() throws IOException {
767787
docWriteRequest.routing(randomAlphaOfLength(10));
768788
}
769789
if (randomBoolean()) {
770-
docWriteRequest.version(randomNonNegativeLong());
771-
}
772-
if (randomBoolean()) {
773-
docWriteRequest.versionType(randomFrom(VersionType.values()));
790+
if (randomBoolean()) {
791+
docWriteRequest.version(randomNonNegativeLong());
792+
}
793+
if (randomBoolean()) {
794+
docWriteRequest.versionType(randomFrom(VersionType.values()));
795+
}
796+
} else if (randomBoolean()) {
797+
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
798+
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
774799
}
775800
bulkRequest.add(docWriteRequest);
776801
}
@@ -801,6 +826,8 @@ public void testBulk() throws IOException {
801826
assertEquals(originalRequest.parent(), parsedRequest.parent());
802827
assertEquals(originalRequest.version(), parsedRequest.version());
803828
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
829+
assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo());
830+
assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
804831

805832
DocWriteRequest.OpType opType = originalRequest.opType();
806833
if (opType == DocWriteRequest.OpType.INDEX) {
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
---
2+
"Compare And Swap Sequence Numbers":
3+
4+
- skip:
5+
version: " - 6.5.99"
6+
reason: cas operations with sequence numbers was added in 6.6
7+
8+
- do:
9+
index:
10+
index: test_1
11+
type: _doc
12+
id: 1
13+
body: { foo: bar }
14+
- match: { _version: 1}
15+
- set: { _seq_no: seqno }
16+
- set: { _primary_term: primary_term }
17+
18+
- do:
19+
bulk:
20+
body:
21+
- index:
22+
_index: test_1
23+
_type: _doc
24+
_id: 1
25+
if_seq_no: 10000
26+
if_primary_term: $primary_term
27+
- foo: bar2
28+
29+
- match: { errors: true }
30+
- match: { items.0.index.status: 409 }
31+
- match: { items.0.index.error.type: version_conflict_engine_exception }
32+
33+
- do:
34+
bulk:
35+
body:
36+
- index:
37+
_index: test_1
38+
_type: _doc
39+
_id: 1
40+
if_seq_no: $seqno
41+
if_primary_term: $primary_term
42+
- foo: bar2
43+
44+
- match: { errors: false}
45+
- match: { items.0.index.status: 200 }

0 commit comments

Comments
 (0)