Skip to content

Reindex max_docs parameter name #41894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) throws I
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", updateByQueryRequest.getScrollTime());
}
if (updateByQueryRequest.getSize() > 0) {
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
if (updateByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
}
request.addParameters(params.asMap());
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
Expand All @@ -611,8 +611,8 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
if (deleteByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
params.putParam("scroll", deleteByQueryRequest.getScrollTime());
}
if (deleteByQueryRequest.getSize() > 0) {
params.putParam("size", Integer.toString(deleteByQueryRequest.getSize()));
if (deleteByQueryRequest.getMaxDocs() > 0) {
params.putParam("max_docs", Integer.toString(deleteByQueryRequest.getMaxDocs()));
}
request.addParameters(params.asMap());
request.setEntity(createEntity(deleteByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,11 @@ public void testReindex() throws IOException {
reindexRequest.setDestRouting("=cat");
}
if (randomBoolean()) {
reindexRequest.setSize(randomIntBetween(100, 1000));
if (randomBoolean()) {
reindexRequest.setMaxDocs(randomIntBetween(100, 1000));
} else {
reindexRequest.setSize(randomIntBetween(100, 1000));
}
}
if (randomBoolean()) {
reindexRequest.setAbortOnVersionConflict(false);
Expand Down Expand Up @@ -476,8 +480,12 @@ public void testUpdateByQuery() throws IOException {
}
if (randomBoolean()) {
int size = randomIntBetween(100, 1000);
updateByQueryRequest.setSize(size);
expectedParams.put("size", Integer.toString(size));
if (randomBoolean()) {
updateByQueryRequest.setMaxDocs(size);
} else {
updateByQueryRequest.setSize(size);
}
expectedParams.put("max_docs", Integer.toString(size));
}
if (randomBoolean()) {
updateByQueryRequest.setAbortOnVersionConflict(false);
Expand Down Expand Up @@ -521,8 +529,12 @@ public void testDeleteByQuery() throws IOException {
}
if (randomBoolean()) {
int size = randomIntBetween(100, 1000);
deleteByQueryRequest.setSize(size);
expectedParams.put("size", Integer.toString(size));
if (randomBoolean()) {
deleteByQueryRequest.setMaxDocs(size);
} else {
deleteByQueryRequest.setSize(size);
}
expectedParams.put("max_docs", Integer.toString(size));
}
if (randomBoolean()) {
deleteByQueryRequest.setAbortOnVersionConflict(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,9 @@ public void testReindex() throws Exception {
// tag::reindex-request-conflicts
request.setConflicts("proceed"); // <1>
// end::reindex-request-conflicts
// tag::reindex-request-size
request.setSize(10); // <1>
// end::reindex-request-size
// tag::reindex-request-maxDocs
request.setMaxDocs(10); // <1>
// end::reindex-request-maxDocs
// tag::reindex-request-sourceSize
request.setSourceBatchSize(100); // <1>
// end::reindex-request-sourceSize
Expand Down Expand Up @@ -1026,9 +1026,9 @@ public void testUpdateByQuery() throws Exception {
// tag::update-by-query-request-query
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <1>
// end::update-by-query-request-query
// tag::update-by-query-request-size
request.setSize(10); // <1>
// end::update-by-query-request-size
// tag::update-by-query-request-maxDocs
request.setMaxDocs(10); // <1>
// end::update-by-query-request-maxDocs
// tag::update-by-query-request-scrollSize
request.setBatchSize(100); // <1>
// end::update-by-query-request-scrollSize
Expand Down Expand Up @@ -1148,9 +1148,9 @@ public void testDeleteByQuery() throws Exception {
// tag::delete-by-query-request-query
request.setQuery(new TermQueryBuilder("user", "kimchy")); // <1>
// end::delete-by-query-request-query
// tag::delete-by-query-request-size
request.setSize(10); // <1>
// end::delete-by-query-request-size
// tag::delete-by-query-request-maxDocs
request.setMaxDocs(10); // <1>
// end::delete-by-query-request-maxDocs
// tag::delete-by-query-request-scrollSize
request.setBatchSize(100); // <1>
// end::delete-by-query-request-scrollSize
Expand Down
4 changes: 2 additions & 2 deletions docs/java-rest/high-level/document/delete-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ include-tagged::{doc-tests-file}[{api}-request-query]
--------------------------------------------------
<1> Only copy documents which have field `user` set to `kimchy`

It’s also possible to limit the number of processed documents by setting size.
It’s also possible to limit the number of processed documents by setting `maxDocs`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-size]
include-tagged::{doc-tests-file}[{api}-request-maxDocs]
--------------------------------------------------
<1> Only copy 10 documents

Expand Down
6 changes: 3 additions & 3 deletions docs/java-rest/high-level/document/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ include-tagged::{doc-tests-file}[{api}-request-query]
--------------------------------------------------
<1> Only copy documents which have field `user` set to `kimchy`

It’s also possible to limit the number of processed documents by setting size.
It’s also possible to limit the number of processed documents by setting `maxDocs`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-size]
include-tagged::{doc-tests-file}[{api}-request-maxDocs]
--------------------------------------------------
<1> Only copy 10 documents

Expand All @@ -90,7 +90,7 @@ include-tagged::{doc-tests-file}[{api}-request-pipeline]
<1> set pipeline to `my_pipeline`

If you want a particular set of documents from the source index you’ll need to use sort. If possible, prefer a more
selective query to size and sort.
selective query to maxDocs and sort.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
Expand Down
4 changes: 2 additions & 2 deletions docs/java-rest/high-level/document/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ include-tagged::{doc-tests-file}[{api}-request-query]
--------------------------------------------------
<1> Only copy documents which have field `user` set to `kimchy`

It’s also possible to limit the number of processed documents by setting size.
It’s also possible to limit the number of processed documents by setting `maxDocs`.

["source","java",subs="attributes,callouts,macros"]
--------------------------------------------------
include-tagged::{doc-tests-file}[{api}-request-size]
include-tagged::{doc-tests-file}[{api}-request-maxDocs]
--------------------------------------------------
<1> Only copy 10 documents

Expand Down
10 changes: 5 additions & 5 deletions docs/reference/docs/delete-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -571,11 +571,11 @@ sub-request proportionally.
* Due to the nature of `slices` each sub-request won't get a perfectly even
portion of the documents. All documents will be addressed, but some slices may
be larger than others. Expect larger slices to have a more even distribution.
* Parameters like `requests_per_second` and `size` on a request with `slices`
are distributed proportionally to each sub-request. Combine that with the point
above about distribution being uneven and you should conclude that the using
`size` with `slices` might not result in exactly `size` documents being
deleted.
* Parameters like `requests_per_second` and `max_docs` on a request with
slices` are distributed proportionally to each sub-request. Combine that with
the point above about distribution being uneven and you should conclude that
using `max_docs` with `slices` might not result in exactly `max_docs` documents
being deleted.
* Each sub-request gets a slightly different snapshot of the source index
though these are all taken at approximately the same time.

Expand Down
20 changes: 10 additions & 10 deletions docs/reference/docs/reindex.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ not a good idea to rely on this behavior. Instead, make sure that IDs are unique
using a script.

It's also possible to limit the number of processed documents by setting
`size`. This will only copy a single document from `twitter` to
`max_docs`. This will only copy a single document from `twitter` to
`new_twitter`:

[source,js]
--------------------------------------------------
POST _reindex
{
"size": 1,
"max_docs": 1,
"source": {
"index": "twitter"
},
Expand All @@ -211,14 +211,14 @@ POST _reindex

If you want a particular set of documents from the `twitter` index you'll
need to use `sort`. Sorting makes the scroll less efficient but in some contexts
it's worth it. If possible, prefer a more selective query to `size` and `sort`.
it's worth it. If possible, prefer a more selective query to `max_docs` and `sort`.
This will copy 10000 documents from `twitter` into `new_twitter`:

[source,js]
--------------------------------------------------
POST _reindex
{
"size": 10000,
"max_docs": 10000,
"source": {
"index": "twitter",
"sort": { "date": "desc" }
Expand Down Expand Up @@ -1115,11 +1115,11 @@ sub-request proportionally.
* Due to the nature of `slices` each sub-request won't get a perfectly even
portion of the documents. All documents will be addressed, but some slices may
be larger than others. Expect larger slices to have a more even distribution.
* Parameters like `requests_per_second` and `size` on a request with `slices`
are distributed proportionally to each sub-request. Combine that with the point
above about distribution being uneven and you should conclude that the using
`size` with `slices` might not result in exactly `size` documents being
reindexed.
* Parameters like `requests_per_second` and `max_docs` on a request with
`slices` are distributed proportionally to each sub-request. Combine that with
the point above about distribution being uneven and you should conclude that
using `max_docs` with `slices` might not result in exactly `max_docs` documents
being reindexed.
* Each sub-request gets a slightly different snapshot of the source index,
though these are all taken at approximately the same time.

Expand Down Expand Up @@ -1236,7 +1236,7 @@ to load only the existing data into the new index and rename any fields if neede
----------------------------------------------------------------
POST _reindex
{
"size": 10,
"max_docs": 10,
"source": {
"index": "twitter",
"query": {
Expand Down
10 changes: 5 additions & 5 deletions docs/reference/docs/update-by-query.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,11 @@ sub-request proportionally.
* Due to the nature of `slices` each sub-request won't get a perfectly even
portion of the documents. All documents will be addressed, but some slices may
be larger than others. Expect larger slices to have a more even distribution.
* Parameters like `requests_per_second` and `size` on a request with `slices`
are distributed proportionally to each sub-request. Combine that with the point
above about distribution being uneven and you should conclude that the using
`size` with `slices` might not result in exactly `size` documents being
updated.
* Parameters like `requests_per_second` and `max_docs` on a request with
`slices` are distributed proportionally to each sub-request. Combine that with
the point above about distribution being uneven and you should conclude that
using `max_docs` with `slices` might not result in exactly `max_docs` documents
being updated.
* Each sub-request gets a slightly different snapshot of the source index
though these are all taken at approximately the same time.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.action.bulk.BackoffPolicy.exponentialBackoff;
import static org.elasticsearch.common.unit.TimeValue.timeValueNanos;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.SIZE_ALL_MATCHES;
import static org.elasticsearch.index.reindex.AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES;
import static org.elasticsearch.rest.RestStatus.CONFLICT;
import static org.elasticsearch.search.sort.SortBuilders.fieldSort;

Expand Down Expand Up @@ -263,8 +263,8 @@ void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, Scrollabl
return;
}
long total = response.getTotalHits();
if (mainRequest.getSize() > 0) {
total = min(total, mainRequest.getSize());
if (mainRequest.getMaxDocs() > 0) {
total = min(total, mainRequest.getMaxDocs());
}
worker.setTotal(total);
AbstractRunnable prepareBulkRequestRunnable = new AbstractRunnable() {
Expand Down Expand Up @@ -304,9 +304,9 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon
}
worker.countBatch();
List<? extends ScrollableHitSource.Hit> hits = response.getHits();
if (mainRequest.getSize() != SIZE_ALL_MATCHES) {
// Truncate the hits if we have more than the request size
long remaining = max(0, mainRequest.getSize() - worker.getSuccessfullyProcessed());
if (mainRequest.getMaxDocs() != MAX_DOCS_ALL_MATCHES) {
// Truncate the hits if we have more than the request max docs
long remaining = max(0, mainRequest.getMaxDocs() - worker.getSuccessfullyProcessed());
if (remaining < hits.size()) {
hits = hits.subList(0, (int) remaining);
}
Expand Down Expand Up @@ -395,7 +395,7 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
return;
}

if (mainRequest.getSize() != SIZE_ALL_MATCHES && worker.getSuccessfullyProcessed() >= mainRequest.getSize()) {
if (mainRequest.getMaxDocs() != MAX_DOCS_ALL_MATCHES && worker.getSuccessfullyProcessed() >= mainRequest.getMaxDocs()) {
// We've processed all the requested docs.
refreshAndFinish(emptyList(), emptyList(), false);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.index.reindex;

import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
Expand Down Expand Up @@ -105,6 +105,11 @@ protected Request setCommonOptions(RestRequest restRequest, Request request) {
if (requestsPerSecond != null) {
request.setRequestsPerSecond(requestsPerSecond);
}

if (restRequest.hasParam("max_docs")) {
setMaxDocsValidateIdentical(request, restRequest.paramAsInt("max_docs", -1));
}

return request;
}

Expand Down Expand Up @@ -170,4 +175,13 @@ public static Float parseRequestsPerSecond(RestRequest request) {
}
return requestsPerSecond;
}

static void setMaxDocsValidateIdentical(AbstractBulkByScrollRequest<?> request, int maxDocs) {
if (request.getMaxDocs() != AbstractBulkByScrollRequest.MAX_DOCS_ALL_MATCHES && request.getMaxDocs() != maxDocs) {
throw new IllegalArgumentException("[max_docs] set to two different values [" + request.getMaxDocs() + "]" +
" and [" + maxDocs + "]");
} else {
request.setMaxDocs(maxDocs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
Expand Down Expand Up @@ -52,7 +53,7 @@ protected void parseInternalRequest(Request internal, RestRequest restRequest,
SearchRequest searchRequest = internal.getSearchRequest();

try (XContentParser parser = extractRequestSpecificFields(restRequest, bodyConsumers)) {
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, internal::setSize);
RestSearchAction.parseSearchRequest(searchRequest, restRequest, parser, size -> setMaxDocsFromSearchSize(internal, size));
}

searchRequest.source().size(restRequest.paramAsInt("scroll_size", searchRequest.source().size()));
Expand Down Expand Up @@ -94,4 +95,9 @@ private XContentParser extractRequestSpecificFields(RestRequest restRequest,
parser.getDeprecationHandler(), BytesReference.bytes(builder.map(body)).streamInput());
}
}

private void setMaxDocsFromSearchSize(Request request, int size) {
LoggingDeprecationHandler.INSTANCE.usedDeprecatedName("size", "max_docs");
setMaxDocsValidateIdentical(request, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ protected DeleteByQueryRequest buildRequest(RestRequest request) throws IOExcept

Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq

PARSER.declareField(sourceParser::parse, new ParseField("source"), ValueType.OBJECT);
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), c), new ParseField("dest"), ValueType.OBJECT);
PARSER.declareInt(ReindexRequest::setSize, new ParseField("size"));
PARSER.declareInt(RestReindexAction::setMaxDocsValidateIdentical, new ParseField("max_docs", "size"));
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p)), new ParseField("script"),
ValueType.OBJECT);
PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ protected UpdateByQueryRequest buildRequest(RestRequest request) throws IOExcept
Map<String, Consumer<Object>> consumers = new HashMap<>();
consumers.put("conflicts", o -> internal.setConflicts((String) o));
consumers.put("script", o -> internal.setScript(parseScript(o)));
consumers.put("max_docs", s -> setMaxDocsValidateIdentical(internal, ((Number) s).intValue()));

parseInternalRequest(internal, request, consumers);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testUpdateByQuery() {
new UpdateByQueryRequestBuilder(client, UpdateByQueryAction.INSTANCE);
updateByQuery.source("source_index")
.filter(QueryBuilders.termQuery("level", "awesome"))
.size(1000)
.maxDocs(1000)
.script(new Script(ScriptType.INLINE,
"ctx._source.awesome = 'absolutely'",
"painless",
Expand All @@ -139,7 +139,7 @@ public void testUpdateByQuery() {
UpdateByQueryRequestBuilder updateByQuery =
new UpdateByQueryRequestBuilder(client, UpdateByQueryAction.INSTANCE);
updateByQuery.source("source_index")
.size(100)
.maxDocs(100)
.source()
.addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();
Expand Down
Loading