Skip to content

if_seq_no and if_primary_term parameters aren't wired correctly in REST Client's CRUD API #38411

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ static Request delete(DeleteRequest deleteRequest) {
parameters.withTimeout(deleteRequest.timeout());
parameters.withVersion(deleteRequest.version());
parameters.withVersionType(deleteRequest.versionType());
parameters.withIfSeqNo(deleteRequest.ifSeqNo());
parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm());
parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards());
return request;
Expand Down Expand Up @@ -191,6 +193,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
}
}

if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
metadata.field("if_seq_no", action.ifSeqNo());
metadata.field("if_primary_term", action.ifPrimaryTerm());
}

if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
IndexRequest indexRequest = (IndexRequest) action;
if (Strings.hasLength(indexRequest.getPipeline())) {
Expand Down Expand Up @@ -319,6 +326,8 @@ static Request index(IndexRequest indexRequest) {
parameters.withTimeout(indexRequest.timeout());
parameters.withVersion(indexRequest.version());
parameters.withVersionType(indexRequest.versionType());
parameters.withIfSeqNo(indexRequest.ifSeqNo());
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,13 @@ public void testDelete() throws IOException {
{
// Testing deletion
String docId = "id";
highLevelClient().index(
IndexResponse indexResponse = highLevelClient().index(
new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L));
DeleteRequest deleteRequest = new DeleteRequest("index", docId);
if (randomBoolean()) {
deleteRequest.version(1L);
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
}
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
assertEquals("index", deleteResponse.getIndex());
Expand All @@ -131,12 +133,13 @@ public void testDelete() throws IOException {
String docId = "version_conflict";
highLevelClient().index(
new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2);
DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " +
"version conflict, current version [1] is different than the one provided [2]]", exception.getMessage());
"version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]",
exception.getMessage());
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
Expand Down Expand Up @@ -519,13 +522,14 @@ public void testIndex() throws IOException {
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
IndexRequest wrongRequest = new IndexRequest("index").id("id");
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
wrongRequest.version(5L);
wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L);

execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync);
});
assertEquals(RestStatus.CONFLICT, exception.status());
assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " +
"version conflict, current version [2] is different than the one provided [5]]", exception.getMessage());
"version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]",
exception.getMessage());
assertEquals("index", exception.getMetadata("es.index").get(0));
}
{
Expand Down Expand Up @@ -820,7 +824,8 @@ public void testBulk() throws IOException {
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
bulkRequest.add(indexRequest);

Expand Down Expand Up @@ -1130,7 +1135,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
processor.add(indexRequest);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public void testDelete() {
setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams);
setRandomVersion(deleteRequest, expectedParams);
setRandomVersionType(deleteRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(deleteRequest, expectedParams);

if (frequently()) {
if (randomBoolean()) {
Expand Down Expand Up @@ -631,6 +632,7 @@ public void testIndex() throws IOException {
} else {
setRandomVersion(indexRequest, expectedParams);
setRandomVersionType(indexRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(indexRequest, expectedParams);
}

if (frequently()) {
Expand Down Expand Up @@ -768,6 +770,7 @@ public void testUpdate() throws IOException {
setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams);
setRandomVersion(updateRequest, expectedParams);
setRandomVersionType(updateRequest::versionType, expectedParams);
setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body
if (randomBoolean()) {
int retryOnConflict = randomIntBetween(0, 5);
updateRequest.retryOnConflict(retryOnConflict);
Expand Down Expand Up @@ -798,6 +801,7 @@ public void testUpdate() throws IOException {
assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert());
assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop());
assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource());
assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest);
assertEquals(updateRequest.script(), parsedUpdateRequest.script());
if (updateRequest.doc() != null) {
assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType);
Expand All @@ -811,6 +815,22 @@ public void testUpdate() throws IOException {
}
}

private static void assertIfSeqNoAndTerm(DocWriteRequest<?>request, DocWriteRequest<?> parsedRequest) {
assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo());
assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());
}

private static void setRandomIfSeqNoAndTerm(DocWriteRequest<?> request, Map<String, String> expectedParams) {
if (randomBoolean()) {
final long seqNo = randomNonNegativeLong();
request.setIfSeqNo(seqNo);
expectedParams.put("if_seq_no", Long.toString(seqNo));
final long primaryTerm = randomLongBetween(1, 200);
request.setIfPrimaryTerm(primaryTerm);
expectedParams.put("if_primary_term", Long.toString(primaryTerm));
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could these two methods be static?


public void testUpdateWithType() throws IOException {
String index = randomAlphaOfLengthBetween(3, 10);
String type = randomAlphaOfLengthBetween(3, 10);
Expand Down Expand Up @@ -892,10 +912,15 @@ public void testBulk() throws IOException {
docWriteRequest.routing(randomAlphaOfLength(10));
}
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
}
} else if (randomBoolean()) {
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
}
bulkRequest.add(docWriteRequest);
}
Expand Down Expand Up @@ -925,6 +950,8 @@ public void testBulk() throws IOException {
assertEquals(originalRequest.routing(), parsedRequest.routing());
assertEquals(originalRequest.version(), parsedRequest.version());
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo());
assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());

DocWriteRequest.OpType opType = originalRequest.opType();
if (opType == DocWriteRequest.OpType.INDEX) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
"Compare And Swap Sequence Numbers":

- skip:
version: " - 6.99.99"
reason: typeless API are add in 7.0.0

- do:
index:
index: test_1
id: 1
body: { foo: bar }
- match: { _version: 1}
- set: { _seq_no: seqno }
- set: { _primary_term: primary_term }

- do:
bulk:
body:
- index:
_index: test_1
_id: 1
if_seq_no: 10000
if_primary_term: $primary_term
- foo: bar2

- match: { errors: true }
- match: { items.0.index.status: 409 }
- match: { items.0.index.error.type: version_conflict_engine_exception }

- do:
bulk:
body:
- index:
_index: test_1
_id: 1
if_seq_no: $seqno
if_primary_term: $primary_term
- foo: bar2

- match: { errors: false}
- match: { items.0.index.status: 200 }
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
"Compare And Swap Sequence Numbers":

- skip:
version: " - 6.6.99"
reason: cas operations with sequence numbers was added in 6.7

- do:
index:
index: test_1
type: _doc
id: 1
body: { foo: bar }
- match: { _version: 1}
- set: { _seq_no: seqno }
- set: { _primary_term: primary_term }

- do:
bulk:
body:
- index:
_index: test_1
_type: _doc
_id: 1
if_seq_no: 10000
if_primary_term: $primary_term
- foo: bar2

- match: { errors: true }
- match: { items.0.index.status: 409 }
- match: { items.0.index.error.type: version_conflict_engine_exception }

- do:
bulk:
body:
- index:
_index: test_1
_type: _doc
_id: 1
if_seq_no: $seqno
if_primary_term: $primary_term
- foo: bar2

- match: { errors: false}
- match: { items.0.index.status: 200 }
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

- skip:
version: " - 6.99.99"
reason: cas ops are introduced in 7.0.0
reason: typesless api was introduces in 7.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/typesless/typeless s/introduces/introduced


- do:
index:
Expand Down