Skip to content

Commit ee5f8c4

Browse files
authored
Consolidate some reindex utility classes (elastic#22666)
Everything that extended `AbstractAsyncBulkByScrollAction` also extended `AbstractAsyncBulkIndexByScrollAction` so this removes `AbstractAsyncBulkIndexByScrollAction`, merging it into `AbstractAsyncBulkByScrollAction`.
1 parent 51e80e7 commit ee5f8c4

13 files changed

+591
-638
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 505 additions & 2 deletions
Large diffs are not rendered by default.

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkIndexByScrollAction.java

Lines changed: 0 additions & 548 deletions
This file was deleted.

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportDeleteByQueryAction.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ public void doExecute(Task task, DeleteByQueryRequest request, ActionListener<Bu
5959
} else {
6060
ClusterState state = clusterService.state();
6161
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
62-
new AsyncDeleteBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService,
63-
state).start();
62+
new AsyncDeleteBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
63+
listener).start();
6464
}
6565
}
6666

@@ -72,12 +72,11 @@ protected void doExecute(DeleteByQueryRequest request, ActionListener<BulkIndexB
7272
/**
7373
* Implementation of delete-by-query using scrolling and bulk.
7474
*/
75-
static class AsyncDeleteBySearchAction extends AbstractAsyncBulkIndexByScrollAction<DeleteByQueryRequest> {
76-
75+
static class AsyncDeleteBySearchAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest> {
7776
public AsyncDeleteBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
78-
ThreadPool threadPool, DeleteByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener,
79-
ScriptService scriptService, ClusterState clusterState) {
80-
super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
77+
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
78+
ActionListener<BulkIndexByScrollResponse> listener) {
79+
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
8180
}
8281

8382
@Override
@@ -107,8 +106,7 @@ protected RequestWrapper<DeleteRequest> buildRequest(ScrollableHitSource.Hit doc
107106
}
108107

109108
/**
110-
* Overrides the parent {@link AbstractAsyncBulkIndexByScrollAction#copyMetadata(RequestWrapper, ScrollableHitSource.Hit)}
111-
* method that is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
109+
* Overrides the parent's implementation is much more Update/Reindex oriented and so also copies things like timestamp/ttl which we
112110
* don't care for a deletion.
113111
*/
114112
@Override

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportReindexAction.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,8 @@ protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkI
113113
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
114114
indexNameExpressionResolver, autoCreateIndex, state);
115115
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
116-
new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService,
117-
state).start();
116+
new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
117+
listener).start();
118118
}
119119
}
120120

@@ -230,7 +230,7 @@ static RestClient buildRestClient(RemoteInfo remoteInfo, long taskId, List<Threa
230230
* but this makes no attempt to do any of them so it can be as simple
231231
* possible.
232232
*/
233-
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest> {
233+
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest> {
234234
/**
235235
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
236236
* {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
@@ -240,9 +240,9 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollActi
240240
private List<Thread> createdThreads = emptyList();
241241

242242
public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
243-
ThreadPool threadPool, ReindexRequest request, ActionListener<BulkIndexByScrollResponse> listener,
244-
ScriptService scriptService, ClusterState clusterState) {
245-
super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
243+
ThreadPool threadPool, ReindexRequest request, ScriptService scriptService, ClusterState clusterState,
244+
ActionListener<BulkIndexByScrollResponse> listener) {
245+
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
246246
}
247247

248248
@Override

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ protected void doExecute(Task task, UpdateByQueryRequest request, ActionListener
7070
} else {
7171
ClusterState state = clusterService.state();
7272
ParentTaskAssigningClient client = new ParentTaskAssigningClient(this.client, clusterService.localNode(), task);
73-
new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, listener, scriptService,
74-
state).start();
73+
new AsyncIndexBySearchAction((WorkingBulkByScrollTask) task, logger, client, threadPool, request, scriptService, state,
74+
listener).start();
7575
}
7676
}
7777

@@ -83,12 +83,11 @@ protected void doExecute(UpdateByQueryRequest request, ActionListener<BulkIndexB
8383
/**
8484
* Simple implementation of update-by-query using scrolling and bulk.
8585
*/
86-
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest> {
87-
86+
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<UpdateByQueryRequest> {
8887
public AsyncIndexBySearchAction(WorkingBulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
89-
ThreadPool threadPool, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener,
90-
ScriptService scriptService, ClusterState clusterState) {
91-
super(task, logger, client, threadPool, request, listener, scriptService, clusterState);
88+
ThreadPool threadPool, UpdateByQueryRequest request, ScriptService scriptService, ClusterState clusterState,
89+
ActionListener<BulkIndexByScrollResponse> listener) {
90+
super(task, logger, client, threadPool, request, scriptService, clusterState, listener);
9291
}
9392

9493
@Override
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919

2020
package org.elasticsearch.index.reindex;
2121

22-
public abstract class AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<
22+
public abstract class AbstractAsyncBulkByScrollActionMetadataTestCase<
2323
Request extends AbstractBulkIndexByScrollRequest<Request>,
2424
Response extends BulkIndexByScrollResponse>
25-
extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
25+
extends AbstractAsyncBulkByScrollActionTestCase<Request, Response> {
2626

2727
protected ScrollableHitSource.BasicHit doc() {
2828
return new ScrollableHitSource.BasicHit("index", "type", "id", 0);
2929
}
3030

31-
protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action();
31+
protected abstract AbstractAsyncBulkByScrollAction<Request> action();
3232
}
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.elasticsearch.action.ActionRequest;
2323
import org.elasticsearch.action.delete.DeleteRequest;
2424
import org.elasticsearch.action.index.IndexRequest;
25-
import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.OpType;
26-
import org.elasticsearch.index.reindex.AbstractAsyncBulkIndexByScrollAction.RequestWrapper;
25+
import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.OpType;
26+
import org.elasticsearch.index.reindex.AbstractAsyncBulkByScrollAction.RequestWrapper;
2727
import org.elasticsearch.script.CompiledScript;
2828
import org.elasticsearch.script.ExecutableScript;
2929
import org.elasticsearch.script.Script;
@@ -40,10 +40,10 @@
4040
import static org.mockito.Mockito.mock;
4141
import static org.mockito.Mockito.when;
4242

43-
public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
43+
public abstract class AbstractAsyncBulkByScrollActionScriptTestCase<
4444
Request extends AbstractBulkIndexByScrollRequest<Request>,
4545
Response extends BulkIndexByScrollResponse>
46-
extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
46+
extends AbstractAsyncBulkByScrollActionTestCase<Request, Response> {
4747

4848
private static final Script EMPTY_SCRIPT = new Script("");
4949

@@ -62,8 +62,8 @@ protected <T extends ActionRequest> T applyScript(Consumer<Map<String, Object>>
6262

6363
when(scriptService.executable(any(CompiledScript.class), Matchers.<Map<String, Object>>any()))
6464
.thenReturn(executableScript);
65-
AbstractAsyncBulkIndexByScrollAction<Request> action = action(scriptService, request().setScript(EMPTY_SCRIPT));
66-
RequestWrapper<?> result = action.buildScriptApplier().apply(AbstractAsyncBulkIndexByScrollAction.wrap(index), doc);
65+
AbstractAsyncBulkByScrollAction<Request> action = action(scriptService, request().setScript(EMPTY_SCRIPT));
66+
RequestWrapper<?> result = action.buildScriptApplier().apply(AbstractAsyncBulkByScrollAction.wrap(index), doc);
6767
return (result != null) ? (T) result.self() : null;
6868
}
6969

@@ -104,5 +104,5 @@ public void testSetOpTypeUnknown() throws Exception {
104104
assertThat(e.getMessage(), equalTo("Operation type [unknown] not allowed, only [noop, index, delete] are allowed"));
105105
}
106106

107-
protected abstract AbstractAsyncBulkIndexByScrollAction<Request> action(ScriptService scriptService, Request request);
107+
protected abstract AbstractAsyncBulkByScrollAction<Request> action(ScriptService scriptService, Request request);
108108
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.junit.After;
2828
import org.junit.Before;
2929

30-
public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
30+
public abstract class AbstractAsyncBulkByScrollActionTestCase<
3131
Request extends AbstractBulkIndexByScrollRequest<Request>,
3232
Response extends BulkIndexByScrollResponse>
3333
extends ESTestCase {

0 commit comments

Comments
 (0)