Skip to content

Commit 761e8c4

Browse files
sohaibiftikharnik9000
authored andcommitted
HLRC: Add delete by query API (#32782)
Adds the delete-by-query API to the High Level REST Client.
1 parent 1457b07 commit 761e8c4

File tree

17 files changed

+552
-65
lines changed

17 files changed

+552
-65
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/RequestConverters.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.index.VersionType;
109109
import org.elasticsearch.index.rankeval.RankEvalRequest;
110110
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
111+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
111112
import org.elasticsearch.index.reindex.ReindexRequest;
112113
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
113114
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
@@ -866,6 +867,32 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
866867
return request;
867868
}
868869

870+
static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws IOException {
871+
String endpoint =
872+
endpoint(deleteByQueryRequest.indices(), deleteByQueryRequest.getDocTypes(), "_delete_by_query");
873+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
874+
Params params = new Params(request)
875+
.withRouting(deleteByQueryRequest.getRouting())
876+
.withRefresh(deleteByQueryRequest.isRefresh())
877+
.withTimeout(deleteByQueryRequest.getTimeout())
878+
.withWaitForActiveShards(deleteByQueryRequest.getWaitForActiveShards())
879+
.withIndicesOptions(deleteByQueryRequest.indicesOptions());
880+
if (deleteByQueryRequest.isAbortOnVersionConflict() == false) {
881+
params.putParam("conflicts", "proceed");
882+
}
883+
if (deleteByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
884+
params.putParam("scroll_size", Integer.toString(deleteByQueryRequest.getBatchSize()));
885+
}
886+
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
887+
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
888+
}
889+
if (deleteByQueryRequest.getSize() > 0) {
890+
params.putParam("size", Integer.toString(deleteByQueryRequest.getSize()));
891+
}
892+
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
893+
return request;
894+
}
895+
869896
static Request rollover(RolloverRequest rolloverRequest) throws IOException {
870897
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
871898
.addPathPart(rolloverRequest.getNewIndexName()).build();
@@ -1174,10 +1201,10 @@ static Request xPackInfo(XPackInfoRequest infoRequest) {
11741201
static Request xPackGraphExplore(GraphExploreRequest exploreRequest) throws IOException {
11751202
String endpoint = endpoint(exploreRequest.indices(), exploreRequest.types(), "_xpack/graph/_explore");
11761203
Request request = new Request(HttpGet.METHOD_NAME, endpoint);
1177-
request.setEntity(createEntity(exploreRequest, REQUEST_BODY_CONTENT_TYPE));
1204+
request.setEntity(createEntity(exploreRequest, REQUEST_BODY_CONTENT_TYPE));
11781205
return request;
1179-
}
1180-
1206+
}
1207+
11811208
static Request xPackWatcherPutWatch(PutWatchRequest putWatchRequest) {
11821209
String endpoint = new EndpointBuilder()
11831210
.addPathPartAsIs("_xpack")

client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.index.rankeval.RankEvalRequest;
6666
import org.elasticsearch.index.rankeval.RankEvalResponse;
6767
import org.elasticsearch.index.reindex.BulkByScrollResponse;
68+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
6869
import org.elasticsearch.index.reindex.ReindexRequest;
6970
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
7071
import org.elasticsearch.plugins.spi.NamedXContentProvider;
@@ -328,7 +329,7 @@ public final XPackClient xpack() {
328329
* Watcher APIs on elastic.co</a> for more information.
329330
*/
330331
public WatcherClient watcher() { return watcherClient; }
331-
332+
332333
/**
333334
* Provides methods for accessing the Elastic Licensed Graph explore API that
334335
* is shipped with the default distribution of Elasticsearch. All of
@@ -337,7 +338,7 @@ public final XPackClient xpack() {
337338
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/graph-explore-api.html">
338339
* Graph API on elastic.co</a> for more information.
339340
*/
340-
public GraphClient graph() { return graphClient; }
341+
public GraphClient graph() { return graphClient; }
341342

342343
/**
343344
* Provides methods for accessing the Elastic Licensed Licensing APIs that
@@ -454,6 +455,35 @@ public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, Reques
454455
);
455456
}
456457

458+
/**
459+
* Executes a delete by query request.
460+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
461+
* Delete By Query API on elastic.co</a>
462+
* @param deleteByQueryRequest the request
463+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
464+
* @return the response
465+
* @throws IOException in case there is a problem sending the request or parsing back the response
466+
*/
467+
public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options) throws IOException {
468+
return performRequestAndParseEntity(
469+
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, emptySet()
470+
);
471+
}
472+
473+
/**
474+
* Asynchronously executes a delete by query request.
475+
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
476+
* Delete By Query API on elastic.co</a>
477+
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
478+
* @param listener the listener to be notified upon request completion
479+
*/
480+
public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
481+
ActionListener<BulkByScrollResponse> listener) {
482+
performRequestAsyncAndParseEntity(
483+
reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
484+
);
485+
}
486+
457487
/**
458488
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
459489
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized

client/rest-high-level/src/test/java/org/elasticsearch/client/CrudIT.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.action.get.MultiGetResponse;
3737
import org.elasticsearch.action.index.IndexRequest;
3838
import org.elasticsearch.action.index.IndexResponse;
39+
import org.elasticsearch.action.search.SearchRequest;
3940
import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
4041
import org.elasticsearch.action.update.UpdateRequest;
4142
import org.elasticsearch.action.update.UpdateResponse;
@@ -50,6 +51,7 @@
5051
import org.elasticsearch.index.get.GetResult;
5152
import org.elasticsearch.index.query.IdsQueryBuilder;
5253
import org.elasticsearch.index.reindex.BulkByScrollResponse;
54+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
5355
import org.elasticsearch.index.reindex.ReindexRequest;
5456
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5557
import org.elasticsearch.rest.RestStatus;
@@ -758,6 +760,52 @@ public void testUpdateByQuery() throws IOException {
758760
}
759761
}
760762

763+
public void testDeleteByQuery() throws IOException {
764+
final String sourceIndex = "source1";
765+
{
766+
// Prepare
767+
Settings settings = Settings.builder()
768+
.put("number_of_shards", 1)
769+
.put("number_of_replicas", 0)
770+
.build();
771+
createIndex(sourceIndex, settings);
772+
assertEquals(
773+
RestStatus.OK,
774+
highLevelClient().bulk(
775+
new BulkRequest()
776+
.add(new IndexRequest(sourceIndex, "type", "1")
777+
.source(Collections.singletonMap("foo", 1), XContentType.JSON))
778+
.add(new IndexRequest(sourceIndex, "type", "2")
779+
.source(Collections.singletonMap("foo", 2), XContentType.JSON))
780+
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
781+
RequestOptions.DEFAULT
782+
).status()
783+
);
784+
}
785+
{
786+
// test1: delete one doc
787+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
788+
deleteByQueryRequest.indices(sourceIndex);
789+
deleteByQueryRequest.setQuery(new IdsQueryBuilder().addIds("1").types("type"));
790+
deleteByQueryRequest.setRefresh(true);
791+
BulkByScrollResponse bulkResponse =
792+
execute(deleteByQueryRequest, highLevelClient()::deleteByQuery, highLevelClient()::deleteByQueryAsync);
793+
assertEquals(1, bulkResponse.getTotal());
794+
assertEquals(1, bulkResponse.getDeleted());
795+
assertEquals(0, bulkResponse.getNoops());
796+
assertEquals(0, bulkResponse.getVersionConflicts());
797+
assertEquals(1, bulkResponse.getBatches());
798+
assertTrue(bulkResponse.getTook().getMillis() > 0);
799+
assertEquals(1, bulkResponse.getBatches());
800+
assertEquals(0, bulkResponse.getBulkFailures().size());
801+
assertEquals(0, bulkResponse.getSearchFailures().size());
802+
assertEquals(
803+
1,
804+
highLevelClient().search(new SearchRequest(sourceIndex), RequestOptions.DEFAULT).getHits().totalHits
805+
);
806+
}
807+
}
808+
761809
public void testBulkProcessorIntegration() throws IOException {
762810
int nbItems = randomIntBetween(10, 100);
763811
boolean[] errors = new boolean[nbItems];

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@
127127
import org.elasticsearch.index.rankeval.RankEvalSpec;
128128
import org.elasticsearch.index.rankeval.RatedRequest;
129129
import org.elasticsearch.index.rankeval.RestRankEvalAction;
130+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
130131
import org.elasticsearch.index.reindex.ReindexRequest;
131132
import org.elasticsearch.index.reindex.RemoteInfo;
132133
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
@@ -526,6 +527,53 @@ public void testUpdateByQuery() throws IOException {
526527
assertToXContentBody(updateByQueryRequest, request.getEntity());
527528
}
528529

530+
public void testDeleteByQuery() throws IOException {
531+
DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest();
532+
deleteByQueryRequest.indices(randomIndicesNames(1, 5));
533+
Map<String, String> expectedParams = new HashMap<>();
534+
if (randomBoolean()) {
535+
deleteByQueryRequest.setDocTypes(generateRandomStringArray(5, 5, false, false));
536+
}
537+
if (randomBoolean()) {
538+
int batchSize = randomInt(100);
539+
deleteByQueryRequest.setBatchSize(batchSize);
540+
expectedParams.put("scroll_size", Integer.toString(batchSize));
541+
}
542+
if (randomBoolean()) {
543+
deleteByQueryRequest.setRouting("=cat");
544+
expectedParams.put("routing", "=cat");
545+
}
546+
if (randomBoolean()) {
547+
int size = randomIntBetween(100, 1000);
548+
deleteByQueryRequest.setSize(size);
549+
expectedParams.put("size", Integer.toString(size));
550+
}
551+
if (randomBoolean()) {
552+
deleteByQueryRequest.setAbortOnVersionConflict(false);
553+
expectedParams.put("conflicts", "proceed");
554+
}
555+
if (randomBoolean()) {
556+
String ts = randomTimeValue();
557+
deleteByQueryRequest.setScroll(TimeValue.parseTimeValue(ts, "scroll"));
558+
expectedParams.put("scroll", ts);
559+
}
560+
if (randomBoolean()) {
561+
deleteByQueryRequest.setQuery(new TermQueryBuilder("foo", "fooval"));
562+
}
563+
setRandomIndicesOptions(deleteByQueryRequest::setIndicesOptions, deleteByQueryRequest::indicesOptions, expectedParams);
564+
setRandomTimeout(deleteByQueryRequest::setTimeout, ReplicationRequest.DEFAULT_TIMEOUT, expectedParams);
565+
Request request = RequestConverters.deleteByQuery(deleteByQueryRequest);
566+
StringJoiner joiner = new StringJoiner("/", "/", "");
567+
joiner.add(String.join(",", deleteByQueryRequest.indices()));
568+
if (deleteByQueryRequest.getDocTypes().length > 0)
569+
joiner.add(String.join(",", deleteByQueryRequest.getDocTypes()));
570+
joiner.add("_delete_by_query");
571+
assertEquals(joiner.toString(), request.getEndpoint());
572+
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
573+
assertEquals(expectedParams, request.getParameters());
574+
assertToXContentBody(deleteByQueryRequest, request.getEntity());
575+
}
576+
529577
public void testPutMapping() throws IOException {
530578
PutMappingRequest putMappingRequest = new PutMappingRequest();
531579

@@ -2720,7 +2768,7 @@ public void testXPackPutWatch() throws Exception {
27202768
request.getEntity().writeTo(bos);
27212769
assertThat(bos.toString("UTF-8"), is(body));
27222770
}
2723-
2771+
27242772
public void testGraphExplore() throws Exception {
27252773
Map<String, String> expectedParams = new HashMap<>();
27262774

@@ -2748,7 +2796,7 @@ public void testGraphExplore() throws Exception {
27482796
assertEquals(expectedParams, request.getParameters());
27492797
assertThat(request.getEntity().getContentType().getValue(), is(XContentType.JSON.mediaTypeWithoutParameters()));
27502798
assertToXContentBody(graphExploreRequest, request.getEntity());
2751-
}
2799+
}
27522800

27532801
public void testXPackDeleteWatch() {
27542802
DeleteWatchRequest deleteWatchRequest = new DeleteWatchRequest();

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,6 @@ public void testApiNamingConventions() throws Exception {
649649
"cluster.remote_info",
650650
"count",
651651
"create",
652-
"delete_by_query",
653652
"exists_source",
654653
"get_source",
655654
"indices.delete_alias",

client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CRUDDocumentationIT.java

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.elasticsearch.index.query.MatchAllQueryBuilder;
6666
import org.elasticsearch.index.query.TermQueryBuilder;
6767
import org.elasticsearch.index.reindex.BulkByScrollResponse;
68+
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
6869
import org.elasticsearch.index.reindex.ReindexRequest;
6970
import org.elasticsearch.index.reindex.RemoteInfo;
7071
import org.elasticsearch.index.reindex.ScrollableHitSource;
@@ -1020,6 +1021,113 @@ public void onFailure(Exception e) {
10201021
}
10211022
}
10221023

1024+
public void testDeleteByQuery() throws Exception {
1025+
RestHighLevelClient client = highLevelClient();
1026+
{
1027+
String mapping =
1028+
"\"doc\": {\n" +
1029+
" \"properties\": {\n" +
1030+
" \"user\": {\n" +
1031+
" \"type\": \"text\"\n" +
1032+
" },\n" +
1033+
" \"field1\": {\n" +
1034+
" \"type\": \"integer\"\n" +
1035+
" },\n" +
1036+
" \"field2\": {\n" +
1037+
" \"type\": \"integer\"\n" +
1038+
" }\n" +
1039+
" }\n" +
1040+
" }";
1041+
createIndex("source1", Settings.EMPTY, mapping);
1042+
createIndex("source2", Settings.EMPTY, mapping);
1043+
}
1044+
{
1045+
// tag::delete-by-query-request
1046+
DeleteByQueryRequest request = new DeleteByQueryRequest("source1", "source2"); // <1>
1047+
// end::delete-by-query-request
1048+
// tag::delete-by-query-request-conflicts
1049+
request.setConflicts("proceed"); // <1>
1050+
// end::delete-by-query-request-conflicts
1051+
// tag::delete-by-query-request-typeOrQuery
1052+
request.setDocTypes("doc"); // <1>
1053+
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <2>
1054+
// end::delete-by-query-request-typeOrQuery
1055+
// tag::delete-by-query-request-size
1056+
request.setSize(10); // <1>
1057+
// end::delete-by-query-request-size
1058+
// tag::delete-by-query-request-scrollSize
1059+
request.setBatchSize(100); // <1>
1060+
// end::delete-by-query-request-scrollSize
1061+
// tag::delete-by-query-request-timeout
1062+
request.setTimeout(TimeValue.timeValueMinutes(2)); // <1>
1063+
// end::delete-by-query-request-timeout
1064+
// tag::delete-by-query-request-refresh
1065+
request.setRefresh(true); // <1>
1066+
// end::delete-by-query-request-refresh
1067+
// tag::delete-by-query-request-slices
1068+
request.setSlices(2); // <1>
1069+
// end::delete-by-query-request-slices
1070+
// tag::delete-by-query-request-scroll
1071+
request.setScroll(TimeValue.timeValueMinutes(10)); // <1>
1072+
// end::delete-by-query-request-scroll
1073+
// tag::delete-by-query-request-routing
1074+
request.setRouting("=cat"); // <1>
1075+
// end::delete-by-query-request-routing
1076+
// tag::delete-by-query-request-indicesOptions
1077+
request.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); // <1>
1078+
// end::delete-by-query-request-indicesOptions
1079+
1080+
// tag::delete-by-query-execute
1081+
BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT);
1082+
// end::delete-by-query-execute
1083+
assertSame(0, bulkResponse.getSearchFailures().size());
1084+
assertSame(0, bulkResponse.getBulkFailures().size());
1085+
// tag::delete-by-query-response
1086+
TimeValue timeTaken = bulkResponse.getTook(); // <1>
1087+
boolean timedOut = bulkResponse.isTimedOut(); // <2>
1088+
long totalDocs = bulkResponse.getTotal(); // <3>
1089+
long deletedDocs = bulkResponse.getDeleted(); // <4>
1090+
long batches = bulkResponse.getBatches(); // <5>
1091+
long noops = bulkResponse.getNoops(); // <6>
1092+
long versionConflicts = bulkResponse.getVersionConflicts(); // <7>
1093+
long bulkRetries = bulkResponse.getBulkRetries(); // <8>
1094+
long searchRetries = bulkResponse.getSearchRetries(); // <9>
1095+
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); // <10>
1096+
TimeValue throttledUntilMillis = bulkResponse.getStatus().getThrottledUntil(); // <11>
1097+
List<ScrollableHitSource.SearchFailure> searchFailures = bulkResponse.getSearchFailures(); // <12>
1098+
List<BulkItemResponse.Failure> bulkFailures = bulkResponse.getBulkFailures(); // <13>
1099+
// end::delete-by-query-response
1100+
}
1101+
{
1102+
DeleteByQueryRequest request = new DeleteByQueryRequest();
1103+
request.indices("source1");
1104+
1105+
// tag::delete-by-query-execute-listener
1106+
ActionListener<BulkByScrollResponse> listener = new ActionListener<BulkByScrollResponse>() {
1107+
@Override
1108+
public void onResponse(BulkByScrollResponse bulkResponse) {
1109+
// <1>
1110+
}
1111+
1112+
@Override
1113+
public void onFailure(Exception e) {
1114+
// <2>
1115+
}
1116+
};
1117+
// end::delete-by-query-execute-listener
1118+
1119+
// Replace the empty listener by a blocking listener in test
1120+
final CountDownLatch latch = new CountDownLatch(1);
1121+
listener = new LatchedActionListener<>(listener, latch);
1122+
1123+
// tag::delete-by-query-execute-async
1124+
client.deleteByQueryAsync(request, RequestOptions.DEFAULT, listener); // <1>
1125+
// end::delete-by-query-execute-async
1126+
1127+
assertTrue(latch.await(30L, TimeUnit.SECONDS));
1128+
}
1129+
}
1130+
10231131
public void testGet() throws Exception {
10241132
RestHighLevelClient client = highLevelClient();
10251133
{

0 commit comments

Comments
 (0)