Skip to content

Commit 07f3ddb

Browse files
authored
Extract reindexing logic from transport action (#46033)
This commit extracts the reindexing logic from the transport action so that it can be incorporated into the persistent reindex work without requiring the usage of the client.
1 parent ff1acf3 commit 07f3ddb

15 files changed

+531
-445
lines changed

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.elasticsearch.action.index.IndexRequest;
3737
import org.elasticsearch.action.support.TransportAction;
3838
import org.elasticsearch.client.ParentTaskAssigningClient;
39+
import org.elasticsearch.common.Nullable;
3940
import org.elasticsearch.common.unit.ByteSizeValue;
4041
import org.elasticsearch.common.unit.TimeValue;
4142
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -90,8 +91,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
9091
protected final BulkByScrollTask task;
9192
protected final WorkerBulkByScrollTaskState worker;
9293
protected final ThreadPool threadPool;
94+
protected final ScriptService scriptService;
95+
protected final ReindexSslConfig sslConfig;
9396

94-
protected final Action mainAction;
9597
/**
9698
* The request for this action. Named mainRequest because we create lots of <code>request</code> variables all representing child
9799
* requests of this mainRequest.
@@ -114,12 +116,13 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
114116
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
115117
private int lastBatchSize;
116118

117-
public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
118-
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
119-
ThreadPool threadPool, Action mainAction, Request mainRequest,
120-
ActionListener<BulkByScrollResponse> listener) {
121-
119+
AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
120+
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
121+
ThreadPool threadPool, Request mainRequest, ActionListener<BulkByScrollResponse> listener,
122+
@Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig) {
122123
this.task = task;
124+
this.scriptService = scriptService;
125+
this.sslConfig = sslConfig;
123126
if (!task.isWorker()) {
124127
throw new IllegalArgumentException("Given task [" + task.getId() + "] must have a child worker");
125128
}
@@ -128,7 +131,6 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourc
128131
this.logger = logger;
129132
this.client = client;
130133
this.threadPool = threadPool;
131-
this.mainAction = mainAction;
132134
this.mainRequest = mainRequest;
133135
this.listener = listener;
134136
BackoffPolicy backoffPolicy = buildBackoffPolicy();
@@ -158,7 +160,7 @@ public BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>>
158160
// The default script applier executes a no-op
159161
return (request, searchHit) -> request;
160162
}
161-
163+
162164
/**
163165
* Build the {@link RequestWrapper} for a single search hit. This shouldn't handle
164166
* metadata or scripting. That will be handled by copyMetadata and

modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@
3232
public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<DeleteByQueryRequest, TransportDeleteByQueryAction> {
3333

3434
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
35-
ThreadPool threadPool, TransportDeleteByQueryAction action, DeleteByQueryRequest request,
36-
ScriptService scriptService, ActionListener<BulkByScrollResponse> listener) {
37-
super(task, false, true, logger, client, threadPool, action, request, listener);
35+
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
36+
ActionListener<BulkByScrollResponse> listener) {
37+
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
3838
}
3939

4040
@Override
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.reindex;
21+
22+
import org.apache.lucene.util.automaton.Automata;
23+
import org.apache.lucene.util.automaton.Automaton;
24+
import org.apache.lucene.util.automaton.CharacterRunAutomaton;
25+
import org.apache.lucene.util.automaton.MinimizationOperations;
26+
import org.apache.lucene.util.automaton.Operations;
27+
import org.elasticsearch.action.ActionRequestValidationException;
28+
import org.elasticsearch.action.index.IndexRequest;
29+
import org.elasticsearch.action.search.SearchRequest;
30+
import org.elasticsearch.action.support.AutoCreateIndex;
31+
import org.elasticsearch.cluster.ClusterState;
32+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
33+
import org.elasticsearch.cluster.service.ClusterService;
34+
import org.elasticsearch.common.Strings;
35+
import org.elasticsearch.common.regex.Regex;
36+
import org.elasticsearch.common.settings.Settings;
37+
38+
import java.util.List;
39+
40+
class ReindexValidator {
41+
42+
private final CharacterRunAutomaton remoteWhitelist;
43+
private final ClusterService clusterService;
44+
private final IndexNameExpressionResolver resolver;
45+
private final AutoCreateIndex autoCreateIndex;
46+
47+
ReindexValidator(Settings settings, ClusterService clusterService, IndexNameExpressionResolver resolver,
48+
AutoCreateIndex autoCreateIndex) {
49+
this.remoteWhitelist = buildRemoteWhitelist(TransportReindexAction.REMOTE_CLUSTER_WHITELIST.get(settings));
50+
this.clusterService = clusterService;
51+
this.resolver = resolver;
52+
this.autoCreateIndex = autoCreateIndex;
53+
}
54+
55+
void initialValidation(ReindexRequest request) {
56+
checkRemoteWhitelist(remoteWhitelist, request.getRemoteInfo());
57+
ClusterState state = clusterService.state();
58+
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(), resolver, autoCreateIndex,
59+
state);
60+
}
61+
62+
static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
63+
if (remoteInfo == null) {
64+
return;
65+
}
66+
String check = remoteInfo.getHost() + ':' + remoteInfo.getPort();
67+
if (whitelist.run(check)) {
68+
return;
69+
}
70+
String whiteListKey = TransportReindexAction.REMOTE_CLUSTER_WHITELIST.getKey();
71+
throw new IllegalArgumentException('[' + check + "] not whitelisted in " + whiteListKey);
72+
}
73+
74+
/**
75+
* Build the {@link CharacterRunAutomaton} that represents the reindex-from-remote whitelist and make sure that it doesn't whitelist
76+
* the world.
77+
*/
78+
static CharacterRunAutomaton buildRemoteWhitelist(List<String> whitelist) {
79+
if (whitelist.isEmpty()) {
80+
return new CharacterRunAutomaton(Automata.makeEmpty());
81+
}
82+
Automaton automaton = Regex.simpleMatchToAutomaton(whitelist.toArray(Strings.EMPTY_ARRAY));
83+
automaton = MinimizationOperations.minimize(automaton, Operations.DEFAULT_MAX_DETERMINIZED_STATES);
84+
if (Operations.isTotal(automaton)) {
85+
throw new IllegalArgumentException("Refusing to start because whitelist " + whitelist + " accepts all addresses. "
86+
+ "This would allow users to reindex-from-remote any URL they like effectively having Elasticsearch make HTTP GETs "
87+
+ "for them.");
88+
}
89+
return new CharacterRunAutomaton(automaton);
90+
}
91+
92+
/**
93+
* Throws an ActionRequestValidationException if the request tries to index
94+
* back into the same index or into an index that points to two indexes.
95+
* This cannot be done during request validation because the cluster state
96+
* isn't available then. Package private for testing.
97+
*/
98+
static void validateAgainstAliases(SearchRequest source, IndexRequest destination, RemoteInfo remoteInfo,
99+
IndexNameExpressionResolver indexNameExpressionResolver, AutoCreateIndex autoCreateIndex,
100+
ClusterState clusterState) {
101+
if (remoteInfo != null) {
102+
return;
103+
}
104+
String target = destination.index();
105+
if (false == autoCreateIndex.shouldAutoCreate(target, clusterState)) {
106+
/*
107+
* If we're going to autocreate the index we don't need to resolve
108+
* it. This is the same sort of dance that TransportIndexRequest
109+
* uses to decide to autocreate the index.
110+
*/
111+
target = indexNameExpressionResolver.concreteWriteIndex(clusterState, destination).getName();
112+
}
113+
for (String sourceIndex : indexNameExpressionResolver.concreteIndexNames(clusterState, source)) {
114+
if (sourceIndex.equals(target)) {
115+
ActionRequestValidationException e = new ActionRequestValidationException();
116+
e.addValidationError("reindex cannot write into an index its reading from [" + target + ']');
117+
throw e;
118+
}
119+
}
120+
}
121+
}

0 commit comments

Comments
 (0)