Skip to content

Commit cf6f3fc

Browse files
Reindex resolve indices early
Resolve indices before starting to reindex. This ensures that the list of indices does not change when failing over (TBD). The one exception to this is aliases, which we still need to access through the alias. In addition, resolved index patterns are sorted by create-date and otherwise the listed order is preserved. This ensures that once we reindex one index at a time, we will get reasonable time locality for time based indices. The resolved list of indices will also by used to do searching one index (or index group) at a time, improving search performance (since we use sort) and allowing us to do more fine-grained checkpoint and track progress (TBD). Relates elastic#42612
1 parent 901a53a commit cf6f3fc

File tree

6 files changed

+427
-4
lines changed

6 files changed

+427
-4
lines changed

libs/x-content/src/main/java/org/elasticsearch/common/xcontent/AbstractObjectParser.java

+1
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ private static <T> List<T> parseArray(XContentParser parser, IOSupplier<T> suppl
225225
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
226226
if (parser.currentToken().isValue()
227227
|| parser.currentToken() == XContentParser.Token.VALUE_NULL
228+
|| parser.currentToken() == XContentParser.Token.START_ARRAY
228229
|| parser.currentToken() == XContentParser.Token.START_OBJECT) {
229230
list.add(supplier.get());
230231
} else {

libs/x-content/src/test/java/org/elasticsearch/common/xcontent/ObjectParserTests.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -585,30 +585,39 @@ class TestStruct {
585585
public void testArraysOfGenericValues() throws IOException {
586586
XContentParser parser = createParser(
587587
JsonXContent.jsonXContent,
588-
"{ \"test_array\": [ 1, null, \"3\", 4.2], \"int_array\": [ 1, 2, 3] }"
588+
"{ \"test_array\": [ 1, null, \"3\", 4.2], \"int_array\": [ 1, 2, 3], \"multi_array\": [ [1, 2], [3, 4] ] }"
589589
);
590590
class TestStruct {
591591
List<Object> testArray = new ArrayList<>();
592592

593593
List<Integer> ints = new ArrayList<>();
594594

595+
List<List<Integer> > multis = new ArrayList<>();
596+
595597
public void setInts(List<Integer> ints) {
596598
this.ints = ints;
597599
}
598600

599601
public void setArray(List<Object> testArray) {
600602
this.testArray = testArray;
601603
}
604+
605+
public void setMultis(List<List<Integer>> multis) {
606+
this.multis = multis;
607+
}
602608
}
603609
ObjectParser<TestStruct, Void> objectParser = new ObjectParser<>("foo");
604610
TestStruct s = new TestStruct();
605611

606612
objectParser.declareFieldArray(TestStruct::setArray, (p, c) -> XContentParserUtils.parseFieldsValue(p),
607613
new ParseField("test_array"), ValueType.VALUE_ARRAY);
608614
objectParser.declareIntArray(TestStruct::setInts, new ParseField("int_array"));
615+
objectParser.declareFieldArray(TestStruct::setMultis, (p, c) -> (List<Integer>) XContentParserUtils.parseFieldsValue(p),
616+
new ParseField("multi_array"), ValueType.OBJECT_ARRAY);
609617
objectParser.parse(parser, s, null);
610618
assertEquals(s.testArray, Arrays.asList(1, null, "3", 4.2));
611619
assertEquals(s.ints, Arrays.asList(1, 2, 3));
620+
assertEquals(s.multis, Arrays.asList(Arrays.asList(1, 2), Arrays.asList(3, 4)));
612621

613622
parser = createParser(JsonXContent.jsonXContent, "{\"test_array\": 42}");
614623
s = new TestStruct();

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexTaskParams.java

+16-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.elasticsearch.persistent.PersistentTaskParams;
3030

3131
import java.io.IOException;
32+
import java.util.Collection;
33+
import java.util.List;
3234
import java.util.Map;
3335

3436
public class ReindexTaskParams implements PersistentTaskParams {
@@ -37,26 +39,32 @@ public class ReindexTaskParams implements PersistentTaskParams {
3739

3840
@SuppressWarnings("unchecked")
3941
public static final ConstructingObjectParser<ReindexTaskParams, Void> PARSER
40-
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (Map<String, String>) a[1]));
42+
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (List<List<String>>) a[1],
43+
(Map<String, String>) a[2]));
4144

4245
private static String STORE_RESULT = "store_result";
4346
private static String HEADERS = "headers";
47+
private static String INDEX_GROUPS = "index_groups";
4448

4549
static {
4650
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), new ParseField(STORE_RESULT));
51+
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), (p, c) -> p.list(), new ParseField(INDEX_GROUPS));
4752
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), new ParseField(HEADERS));
4853
}
4954

5055
private final boolean storeResult;
56+
private final List<? extends Collection<String>> indexGroups;
5157
private final Map<String, String> headers;
5258

53-
public ReindexTaskParams(boolean storeResult, Map<String, String> headers) {
59+
public ReindexTaskParams(boolean storeResult, List<? extends Collection<String>> indexGroups, Map<String, String> headers) {
5460
this.storeResult = storeResult;
61+
this.indexGroups = indexGroups;
5562
this.headers = headers;
5663
}
5764

5865
public ReindexTaskParams(StreamInput in) throws IOException {
5966
storeResult = in.readBoolean();
67+
indexGroups = in.readList(StreamInput::readStringList);
6068
headers = in.readMap(StreamInput::readString, StreamInput::readString);
6169
}
6270

@@ -74,13 +82,15 @@ public Version getMinimalSupportedVersion() {
7482
@Override
7583
public void writeTo(StreamOutput out) throws IOException {
7684
out.writeBoolean(storeResult);
85+
out.writeCollection(indexGroups, StreamOutput::writeStringCollection);
7786
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
7887
}
7988

8089
@Override
8190
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
8291
builder.startObject();
8392
builder.field(STORE_RESULT, storeResult);
93+
builder.field(INDEX_GROUPS, indexGroups);
8494
builder.field(HEADERS, headers);
8595
return builder.endObject();
8696
}
@@ -89,6 +99,10 @@ public boolean shouldStoreResult() {
8999
return storeResult;
90100
}
91101

102+
public List<? extends Collection<String>> getIndexGroups() {
103+
return indexGroups;
104+
}
105+
92106
public Map<String, String> getHeaders() {
93107
return headers;
94108
}

modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportStartReindexTaskAction.java

+177-1
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,17 @@
2626
import org.elasticsearch.action.support.AutoCreateIndex;
2727
import org.elasticsearch.action.support.HandledTransportAction;
2828
import org.elasticsearch.client.Client;
29+
import org.elasticsearch.cluster.ClusterState;
30+
import org.elasticsearch.cluster.metadata.AliasOrIndex;
31+
import org.elasticsearch.cluster.metadata.IndexMetaData;
2932
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
3033
import org.elasticsearch.cluster.service.ClusterService;
3134
import org.elasticsearch.common.UUIDs;
3235
import org.elasticsearch.common.collect.Tuple;
3336
import org.elasticsearch.common.inject.Inject;
3437
import org.elasticsearch.common.settings.Settings;
3538
import org.elasticsearch.common.util.concurrent.ThreadContext;
39+
import org.elasticsearch.common.util.set.Sets;
3640
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3741
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
3842
import org.elasticsearch.persistent.PersistentTasksService;
@@ -41,17 +45,27 @@
4145
import org.elasticsearch.threadpool.ThreadPool;
4246
import org.elasticsearch.transport.TransportService;
4347

48+
import java.util.ArrayList;
49+
import java.util.Arrays;
50+
import java.util.Collections;
51+
import java.util.Comparator;
4452
import java.util.List;
4553
import java.util.Map;
54+
import java.util.Objects;
55+
import java.util.Set;
56+
import java.util.SortedMap;
4657
import java.util.function.Predicate;
4758
import java.util.stream.Collectors;
59+
import java.util.stream.Stream;
4860

4961
public class TransportStartReindexTaskAction
5062
extends HandledTransportAction<StartReindexTaskAction.Request, StartReindexTaskAction.Response> {
5163

5264
private final List<String> headersToInclude;
5365
private final ThreadPool threadPool;
5466
private final PersistentTasksService persistentTasksService;
67+
private final IndexNameExpressionResolver indexNameExpressionResolver;
68+
private final ClusterService clusterService;
5569
private final ReindexValidator reindexValidator;
5670
private final ReindexIndexClient reindexIndexClient;
5771

@@ -63,6 +77,8 @@ public TransportStartReindexTaskAction(Settings settings, Client client, Transpo
6377
super(StartReindexTaskAction.NAME, transportService, actionFilters, StartReindexTaskAction.Request::new);
6478
this.headersToInclude = ReindexHeaders.REINDEX_INCLUDED_HEADERS.get(settings);
6579
this.threadPool = threadPool;
80+
this.indexNameExpressionResolver = indexNameExpressionResolver;
81+
this.clusterService = clusterService;
6682
this.reindexValidator = new ReindexValidator(settings, clusterService, indexNameExpressionResolver, autoCreateIndex);
6783
this.persistentTasksService = persistentTasksService;
6884
this.reindexIndexClient = new ReindexIndexClient(client, clusterService, xContentRegistry);
@@ -87,7 +103,7 @@ protected void doExecute(Task task, StartReindexTaskAction.Request request, Acti
87103

88104
// In the current implementation, we only need to store task results if we do not wait for completion
89105
boolean storeTaskResult = request.getWaitForCompletion() == false;
90-
ReindexTaskParams job = new ReindexTaskParams(storeTaskResult, included);
106+
ReindexTaskParams job = new ReindexTaskParams(storeTaskResult, resolveIndexPatterns(request.getReindexRequest()), included);
91107

92108
ReindexTaskStateDoc reindexState = new ReindexTaskStateDoc(request.getReindexRequest());
93109
reindexIndexClient.createReindexTaskDoc(generatedId, reindexState, new ActionListener<>() {
@@ -212,4 +228,164 @@ private boolean isDone(ReindexPersistentTaskState state) {
212228
return state != null && state.isDone();
213229
}
214230
}
231+
232+
/**
233+
* Resolve index patterns to ensure they do not start resolving differently during reindex failovers.
234+
* Do not resolve aliases, since accessing the underlying indices is not semantically equivalent to accessing the alias.
235+
* Within each index pattern, sort the resolved indices by create date, since this ensures that if we reindex from a pattern of indices,
236+
* destination will receive oldest data first. This is in particular important if destination does rollover and it is time-based data.
237+
*
238+
* @return list of groups of indices/aliases that must be searched together.
239+
*/
240+
private List<Set<String>> resolveIndexPatterns(ReindexRequest request) {
241+
return resolveIndexPatterns(request, clusterService.state(), indexNameExpressionResolver);
242+
}
243+
244+
// visible for testing
245+
static List<Set<String>> resolveIndexPatterns(ReindexRequest request, ClusterState clusterState,
246+
IndexNameExpressionResolver indexNameResolver) {
247+
if (request.getRemoteInfo() == null) {
248+
return resolveIndexPatterns(request.getSearchRequest().indices(), clusterState, indexNameResolver);
249+
} else {
250+
return Collections.emptyList();
251+
}
252+
}
253+
254+
private static List<Set<String>> resolveIndexPatterns(String[] indices, ClusterState clusterState,
255+
IndexNameExpressionResolver indexNameResolver) {
256+
Set<String> resolvedNames = indexNameResolver.resolveExpressions(clusterState, indices);
257+
258+
List<IndexGroup> groups = Arrays.stream(indices)
259+
.flatMap(expression -> resolveSingleIndexExpression(expression, resolvedNames::contains,clusterState, indexNameResolver))
260+
.collect(Collectors.toList());
261+
262+
return resolveGroups(groups).stream().map(IndexGroup::newResolvedGroup).collect(Collectors.toList());
263+
}
264+
265+
private static List<IndexGroup> resolveGroups(List<IndexGroup> groups) {
266+
List<IndexGroup> result = new ArrayList<>(groups);
267+
268+
// n^2, but OK since data volume is low.
269+
// reverse order since we bubble data towards the lower index end.
270+
for (int i = result.size() - 1; i >= 0; --i) {
271+
IndexGroup current = result.get(i);
272+
for (int j = i - 1; current != null && j >= 0; --j) {
273+
IndexGroup earlier = result.get(j);
274+
Tuple<IndexGroup, IndexGroup> collapsed = earlier.collapse(current);
275+
result.set(j, collapsed.v1());
276+
current = collapsed.v2();
277+
}
278+
result.set(i, current);
279+
}
280+
281+
return result.stream().filter(Objects::nonNull).collect(Collectors.toList());
282+
}
283+
284+
private static Stream<IndexGroup> resolveSingleIndexExpression(String expression, Predicate<String> predicate,
285+
ClusterState clusterState,
286+
IndexNameExpressionResolver indexNameExpressionResolver) {
287+
SortedMap<String, AliasOrIndex> lookup = clusterState.getMetaData().getAliasAndIndexLookup();
288+
Comparator<AliasOrIndex> createDateIndexOrder = (i1, i2) -> {
289+
if (i1.isAlias() && i2.isAlias()) {
290+
return ((AliasOrIndex.Alias) i1).getAliasName().compareTo(((AliasOrIndex.Alias) i2).getAliasName());
291+
}
292+
if (i1.isAlias() != i2.isAlias()) {
293+
return Boolean.compare(i1.isAlias(), i2.isAlias());
294+
}
295+
296+
assert i1.getIndices().size() == 1;
297+
assert i2.getIndices().size() == 1;
298+
IndexMetaData indexMetaData1 = i1.getIndices().get(0);
299+
IndexMetaData indexMetaData2 = i2.getIndices().get(0);
300+
int compare = Long.compare(indexMetaData1.getCreationDate(), indexMetaData2.getCreationDate());
301+
return compare != 0 ? compare : indexMetaData1.getIndex().getName().compareTo(indexMetaData2.getIndex().getName());
302+
};
303+
304+
return indexNameExpressionResolver.resolveExpressions(clusterState, expression).stream()
305+
.filter(predicate).map(lookup::get).sorted(createDateIndexOrder).map(IndexGroup::create);
306+
}
307+
308+
/**
309+
* Immutable group of indices and aliases.
310+
*/
311+
private static class IndexGroup {
312+
private final Set<String> indices;
313+
private final Set<String> allIndices;
314+
private final Set<AliasOrIndex.Alias> aliases;
315+
private final List<String> orderedIndices;
316+
317+
private IndexGroup(List<String> indices, Set<AliasOrIndex.Alias> aliases) {
318+
orderedIndices = indices;
319+
this.indices = indices.stream().collect(Collectors.toUnmodifiableSet());
320+
this.aliases = aliases;
321+
this.allIndices = Stream.concat(indices.stream(),
322+
aliases.stream().flatMap(aliasOrIndex -> aliasOrIndex.getIndices().stream())
323+
.map(imd -> imd.getIndex().getName())
324+
).collect(Collectors.toUnmodifiableSet());
325+
}
326+
327+
private IndexGroup(IndexGroup group1, IndexGroup group2) {
328+
this(Stream.concat(group1.orderedIndices.stream(), group2.orderedIndices.stream()).collect(Collectors.toList()),
329+
Stream.concat(group1.aliases.stream(), group2.aliases.stream())
330+
.collect(Collectors.toSet()));
331+
}
332+
333+
public static IndexGroup create(AliasOrIndex aliasOrIndex) {
334+
if (aliasOrIndex.isAlias()) {
335+
return new IndexGroup(Collections.emptyList(), Collections.singleton((AliasOrIndex.Alias) aliasOrIndex));
336+
} else {
337+
return new IndexGroup(Collections.singletonList(aliasOrIndex.getIndices().get(0).getIndex().getName()),
338+
Collections.emptySet());
339+
}
340+
}
341+
342+
private Tuple<IndexGroup, IndexGroup> collapse(IndexGroup other) {
343+
if (other == this) {
344+
return Tuple.tuple(this, this);
345+
}
346+
347+
if (aliasOverlap(this.aliases, other.allIndices) || aliasOverlap(other.aliases, this.allIndices)) {
348+
return Tuple.tuple(new IndexGroup(this, other), null);
349+
}
350+
351+
Set<String> intersection = Sets.intersection(indices, other.indices);
352+
assert intersection.isEmpty() == false || Sets.intersection(allIndices, other.allIndices).isEmpty();
353+
assert intersection.isEmpty() || Sets.intersection(allIndices, other.allIndices).isEmpty() == false;
354+
return Tuple.tuple(this.add(intersection, orderedIndices), other.subtract(intersection));
355+
}
356+
357+
private IndexGroup add(Set<String> intersection, List<String> order) {
358+
if (intersection.isEmpty()) {
359+
return this;
360+
}
361+
362+
List<String> indices =
363+
Stream.concat(this.orderedIndices.stream(), order.stream().filter(intersection::contains)).collect(Collectors.toList());
364+
return new IndexGroup(indices, aliases);
365+
}
366+
367+
private IndexGroup subtract(Set<String> intersection) {
368+
if (intersection.isEmpty()) {
369+
return this;
370+
}
371+
372+
List<String> indices =
373+
this.orderedIndices.stream().filter(Predicate.not(intersection::contains)).collect(Collectors.toList());
374+
375+
if (indices.isEmpty()) {
376+
return null;
377+
}
378+
return new IndexGroup(indices, aliases);
379+
}
380+
381+
private static boolean aliasOverlap(Set<AliasOrIndex.Alias> aliases, Set<String> indices) {
382+
return aliases.stream()
383+
.flatMap(aliasOrIndex -> aliasOrIndex.getIndices().stream()).map(imd -> imd.getIndex().getName())
384+
.anyMatch(indices::contains);
385+
}
386+
387+
public Set<String> newResolvedGroup() {
388+
return Stream.concat(indices.stream(), aliases.stream().map(AliasOrIndex.Alias::getAliasName)).collect(Collectors.toSet());
389+
}
390+
}
215391
}

0 commit comments

Comments
 (0)