From 7e35372cbbe16ca34742624be7cd5592586129c8 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 5 Feb 2019 12:08:53 +0100 Subject: [PATCH 1/5] Wire if_seq_no and if_primary_term in rest client bulk --- .../client/RequestConverters.java | 5 +++ .../java/org/elasticsearch/client/CrudIT.java | 16 ++++--- .../client/RequestConvertersTests.java | 15 +++++-- .../rest-api-spec/test/bulk/80_cas.yml | 42 +++++++++++++++++ .../test/bulk/81_cas_with_types.yml | 45 +++++++++++++++++++ .../rest-api-spec/test/index/30_cas.yml | 2 +- 6 files changed, 114 insertions(+), 11 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index a30fec41b0bf3..7cfca939913e6 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -191,6 +191,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException { } } + if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) { + metadata.field("if_seq_no", action.ifSeqNo()); + metadata.field("if_primary_term", action.ifPrimaryTerm()); + } + if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) { IndexRequest indexRequest = (IndexRequest) action; if (Strings.hasLength(indexRequest.getPipeline())) { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 3bd3c79072dc9..6addd8558c879 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -104,11 +104,13 @@ public void testDelete() throws IOException { { // Testing deletion String docId = "id"; - highLevelClient().index( + IndexResponse indexResponse = highLevelClient().index( new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT); + assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L)); DeleteRequest deleteRequest = new DeleteRequest("index", docId); if (randomBoolean()) { - deleteRequest.version(1L); + deleteRequest.setIfSeqNo(indexResponse.getSeqNo()); + deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm()); } DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync); assertEquals("index", deleteResponse.getIndex()); @@ -131,7 +133,7 @@ public void testDelete() throws IOException { String docId = "version_conflict"; highLevelClient().index( new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT); - DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2); + DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2); ElasticsearchException exception = expectThrows(ElasticsearchException.class, () -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync)); assertEquals(RestStatus.CONFLICT, exception.status()); @@ -519,7 +521,7 @@ public void testIndex() throws IOException { ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> { IndexRequest wrongRequest = new IndexRequest("index").id("id"); wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject()); - wrongRequest.version(5L); + wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L); execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync); }); @@ -820,7 +822,8 @@ public void testBulk() throws IOException { if (opType == DocWriteRequest.OpType.INDEX) { IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType); if (erroneous) { - indexRequest.version(12L); + indexRequest.setIfSeqNo(12L); + indexRequest.setIfPrimaryTerm(12L); } bulkRequest.add(indexRequest); @@ -1130,7 +1133,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) if (opType == DocWriteRequest.OpType.INDEX) { IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i); if (erroneous) { - indexRequest.version(12L); + indexRequest.setIfSeqNo(12L); + indexRequest.setIfPrimaryTerm(12L); } processor.add(indexRequest); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index b58e5ae8852d3..327a12727d23e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -892,10 +892,15 @@ public void testBulk() throws IOException { docWriteRequest.routing(randomAlphaOfLength(10)); } if (randomBoolean()) { - docWriteRequest.version(randomNonNegativeLong()); - } - if (randomBoolean()) { - docWriteRequest.versionType(randomFrom(VersionType.values())); + if (randomBoolean()) { + docWriteRequest.version(randomNonNegativeLong()); + } + if (randomBoolean()) { + docWriteRequest.versionType(randomFrom(VersionType.values())); + } + } else if (randomBoolean()) { + docWriteRequest.setIfSeqNo(randomNonNegativeLong()); + docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200)); } bulkRequest.add(docWriteRequest); } @@ -925,6 +930,8 @@ public void testBulk() throws IOException { assertEquals(originalRequest.routing(), parsedRequest.routing()); assertEquals(originalRequest.version(), parsedRequest.version()); assertEquals(originalRequest.versionType(), parsedRequest.versionType()); + assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo()); + assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm()); DocWriteRequest.OpType opType = originalRequest.opType(); if (opType == DocWriteRequest.OpType.INDEX) { diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml new file mode 100644 index 0000000000000..902621cfba578 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/80_cas.yml @@ -0,0 +1,42 @@ +--- +"Compare And Swap Sequence Numbers": + + - skip: + version: " - 6.99.99" + reason: typeless API are add in 7.0.0 + + - do: + index: + index: test_1 + id: 1 + body: { foo: bar } + - match: { _version: 1} + - set: { _seq_no: seqno } + - set: { _primary_term: primary_term } + + - do: + bulk: + body: + - index: + _index: test_1 + _id: 1 + if_seq_no: 10000 + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: true } + - match: { items.0.index.status: 409 } + - match: { items.0.index.error.type: version_conflict_engine_exception } + + - do: + bulk: + body: + - index: + _index: test_1 + _id: 1 + if_seq_no: $seqno + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: false} + - match: { items.0.index.status: 200 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml new file mode 100644 index 0000000000000..101316e7bf504 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/bulk/81_cas_with_types.yml @@ -0,0 +1,45 @@ +--- +"Compare And Swap Sequence Numbers": + + - skip: + version: " - 6.6.99" + reason: cas operations with sequence numbers was added in 6.7 + + - do: + index: + index: test_1 + type: _doc + id: 1 + body: { foo: bar } + - match: { _version: 1} + - set: { _seq_no: seqno } + - set: { _primary_term: primary_term } + + - do: + bulk: + body: + - index: + _index: test_1 + _type: _doc + _id: 1 + if_seq_no: 10000 + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: true } + - match: { items.0.index.status: 409 } + - match: { items.0.index.error.type: version_conflict_engine_exception } + + - do: + bulk: + body: + - index: + _index: test_1 + _type: _doc + _id: 1 + if_seq_no: $seqno + if_primary_term: $primary_term + - foo: bar2 + + - match: { errors: false} + - match: { items.0.index.status: 200 } diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml index a43ec1437a50b..550582e9816eb 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/index/30_cas.yml @@ -3,7 +3,7 @@ - skip: version: " - 6.99.99" - reason: cas ops are introduced in 7.0.0 + reason: typesless api was introduces in 7.0 - do: index: From 2df14d48231d91af98b0aca112b438ac2fab91cb Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 5 Feb 2019 13:24:18 +0100 Subject: [PATCH 2/5] wire index and delete --- .../client/RequestConverters.java | 4 ++++ .../java/org/elasticsearch/client/CrudIT.java | 5 +++-- .../client/RequestConvertersTests.java | 22 +++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java index 7cfca939913e6..860788e0157a5 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java @@ -108,6 +108,8 @@ static Request delete(DeleteRequest deleteRequest) { parameters.withTimeout(deleteRequest.timeout()); parameters.withVersion(deleteRequest.version()); parameters.withVersionType(deleteRequest.versionType()); + parameters.withIfSeqNo(deleteRequest.ifSeqNo()); + parameters.withIfPrimaryTerm(deleteRequest.ifPrimaryTerm()); parameters.withRefreshPolicy(deleteRequest.getRefreshPolicy()); parameters.withWaitForActiveShards(deleteRequest.waitForActiveShards()); return request; @@ -324,6 +326,8 @@ static Request index(IndexRequest indexRequest) { parameters.withTimeout(indexRequest.timeout()); parameters.withVersion(indexRequest.version()); parameters.withVersionType(indexRequest.versionType()); + parameters.withIfSeqNo(indexRequest.ifSeqNo()); + parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm()); parameters.withPipeline(indexRequest.getPipeline()); parameters.withRefreshPolicy(indexRequest.getRefreshPolicy()); parameters.withWaitForActiveShards(indexRequest.waitForActiveShards()); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 6addd8558c879..8daea5ff415a8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -138,7 +138,7 @@ public void testDelete() throws IOException { () -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync)); assertEquals(RestStatus.CONFLICT, exception.status()); assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " + - "version conflict, current version [1] is different than the one provided [2]]", exception.getMessage()); + "version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]", exception.getMessage()); assertEquals("index", exception.getMetadata("es.index").get(0)); } { @@ -527,7 +527,8 @@ public void testIndex() throws IOException { }); assertEquals(RestStatus.CONFLICT, exception.status()); assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][id]: " + - "version conflict, current version [2] is different than the one provided [5]]", exception.getMessage()); + "version conflict, required seqNo [1], primary term [5]. current document has seqNo [2] and primary term [1]]", + exception.getMessage()); assertEquals("index", exception.getMetadata("es.index").get(0)); } { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 327a12727d23e..d347049edbf00 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.client; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.http.HttpEntity; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; @@ -131,6 +132,7 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.nullValue; +@Repeat(iterations = 200) public class RequestConvertersTests extends ESTestCase { public void testPing() { Request request = RequestConverters.ping(); @@ -281,6 +283,7 @@ public void testDelete() { setRandomRefreshPolicy(deleteRequest::setRefreshPolicy, expectedParams); setRandomVersion(deleteRequest, expectedParams); setRandomVersionType(deleteRequest::versionType, expectedParams); + setRandomIfSeqNoAndTerm(deleteRequest, expectedParams); if (frequently()) { if (randomBoolean()) { @@ -631,6 +634,7 @@ public void testIndex() throws IOException { } else { setRandomVersion(indexRequest, expectedParams); setRandomVersionType(indexRequest::versionType, expectedParams); + setRandomIfSeqNoAndTerm(indexRequest, expectedParams); } if (frequently()) { @@ -768,6 +772,7 @@ public void testUpdate() throws IOException { setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams); setRandomVersion(updateRequest, expectedParams); setRandomVersionType(updateRequest::versionType, expectedParams); + setRandomIfSeqNoAndTerm(updateRequest, expectedParams); if (randomBoolean()) { int retryOnConflict = randomIntBetween(0, 5); updateRequest.retryOnConflict(retryOnConflict); @@ -798,6 +803,7 @@ public void testUpdate() throws IOException { assertEquals(updateRequest.docAsUpsert(), parsedUpdateRequest.docAsUpsert()); assertEquals(updateRequest.detectNoop(), parsedUpdateRequest.detectNoop()); assertEquals(updateRequest.fetchSource(), parsedUpdateRequest.fetchSource()); + assertIfSeqNoAndTerm(updateRequest, parsedUpdateRequest); assertEquals(updateRequest.script(), parsedUpdateRequest.script()); if (updateRequest.doc() != null) { assertToXContentEquivalent(updateRequest.doc().source(), parsedUpdateRequest.doc().source(), xContentType); @@ -811,6 +817,22 @@ public void testUpdate() throws IOException { } } + private void assertIfSeqNoAndTerm(DocWriteRequestrequest, DocWriteRequest parsedRequest) { + assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo()); + assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm()); + } + + private void setRandomIfSeqNoAndTerm(DocWriteRequest request, Map expectedParams) { + if (randomBoolean()) { + final long seqNo = randomNonNegativeLong(); + request.setIfSeqNo(seqNo); + expectedParams.put("if_seq_no", Long.toString(seqNo)); + final long primaryTerm = randomLongBetween(1, 200); + request.setIfPrimaryTerm(primaryTerm); + expectedParams.put("if_primary_term", Long.toString(primaryTerm)); + } + } + public void testUpdateWithType() throws IOException { String index = randomAlphaOfLengthBetween(3, 10); String type = randomAlphaOfLengthBetween(3, 10); From 949ddb901d234b069f6ea2f7ae48f51392af928e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 5 Feb 2019 13:36:14 +0100 Subject: [PATCH 3/5] feedback --- .../org/elasticsearch/client/RequestConvertersTests.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index d347049edbf00..761b90e69a534 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.client; -import com.carrotsearch.randomizedtesting.annotations.Repeat; import org.apache.http.HttpEntity; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; @@ -132,7 +131,6 @@ import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.Matchers.nullValue; -@Repeat(iterations = 200) public class RequestConvertersTests extends ESTestCase { public void testPing() { Request request = RequestConverters.ping(); @@ -817,12 +815,12 @@ public void testUpdate() throws IOException { } } - private void assertIfSeqNoAndTerm(DocWriteRequestrequest, DocWriteRequest parsedRequest) { + private static void assertIfSeqNoAndTerm(DocWriteRequestrequest, DocWriteRequest parsedRequest) { assertEquals(request.ifSeqNo(), parsedRequest.ifSeqNo()); assertEquals(request.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm()); } - private void setRandomIfSeqNoAndTerm(DocWriteRequest request, Map expectedParams) { + private static void setRandomIfSeqNoAndTerm(DocWriteRequest request, Map expectedParams) { if (randomBoolean()) { final long seqNo = randomNonNegativeLong(); request.setIfSeqNo(seqNo); From 621457f508960da24a8a47966c82337659e7a30e Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 5 Feb 2019 13:48:49 +0100 Subject: [PATCH 4/5] lint --- .../src/test/java/org/elasticsearch/client/CrudIT.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java index 8daea5ff415a8..e2102236cc422 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java @@ -138,7 +138,8 @@ public void testDelete() throws IOException { () -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync)); assertEquals(RestStatus.CONFLICT, exception.status()); assertEquals("Elasticsearch exception [type=version_conflict_engine_exception, reason=[_doc][" + docId + "]: " + - "version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]", exception.getMessage()); + "version conflict, required seqNo [2], primary term [2]. current document has seqNo [3] and primary term [1]]", + exception.getMessage()); assertEquals("index", exception.getMetadata("es.index").get(0)); } { From 57bafed7e7bfbde73b8e334af95d9c33f20cff74 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Tue, 5 Feb 2019 14:58:38 +0100 Subject: [PATCH 5/5] update requests don't have parameters in url --- .../java/org/elasticsearch/client/RequestConvertersTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java index 761b90e69a534..9364e2ce2d57c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java @@ -770,7 +770,7 @@ public void testUpdate() throws IOException { setRandomWaitForActiveShards(updateRequest::waitForActiveShards, expectedParams); setRandomVersion(updateRequest, expectedParams); setRandomVersionType(updateRequest::versionType, expectedParams); - setRandomIfSeqNoAndTerm(updateRequest, expectedParams); + setRandomIfSeqNoAndTerm(updateRequest, new HashMap<>()); // if* params are passed in the body if (randomBoolean()) { int retryOnConflict = randomIntBetween(0, 5); updateRequest.retryOnConflict(retryOnConflict);