Skip to content

[RollupV2] Implement search resolution #67783

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 68 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
b768fbf
Rollup V2 Search Resolution Setup
talevy Nov 12, 2020
793b807
Merge remote-tracking branch 'elastic/master' into rollupv2search
talevy Nov 12, 2020
8ececef
fix compile errors
talevy Nov 12, 2020
c099033
Merge branch 'master' into rollupv2-search
csoulios Dec 3, 2020
0d88290
Merge branch 'master' into rollupv2-search
csoulios Dec 8, 2020
7824cef
Merged with master
csoulios Dec 8, 2020
49d321e
Merge branch 'master' into rollupv2-search
csoulios Dec 14, 2020
0ac4438
Merge branch 'master' into rollupv2-search
csoulios Dec 15, 2020
c74629d
Merge branch 'master' into rollupv2-search
csoulios Dec 16, 2020
8f73c19
Merge branch 'master' into rollupv2-search
csoulios Dec 16, 2020
c889bb7
Fix build errors after merge with master
csoulios Dec 17, 2020
7489dd7
WIP
csoulios Dec 17, 2020
d1ad022
Merge branch 'master' into rollupv2-search
csoulios Jan 7, 2021
bf82de1
Remove preFilterRollup param
csoulios Jan 7, 2021
7512370
Merge branch 'master' into rollupv2-search
csoulios Jan 14, 2021
54d78b2
WIP
csoulios Jan 19, 2021
4e2ba19
resolve merge
talevy Jan 20, 2021
a21d676
add simple javaRestTest for rollup search
talevy Jan 20, 2021
54c51c1
Merge remote-tracking branch 'elastic/master' into rollupv2-search
talevy Jan 21, 2021
c8fda40
Changed license to test file
csoulios Jan 21, 2021
5de29cb
Merge branch 'master' into rollupv2-search
csoulios Jan 25, 2021
9c0132d
Seems that there is a bug in the rollup action when indexing
csoulios Jan 27, 2021
6f563f1
prepend rollup indices to backing datastream indices list
talevy Jan 27, 2021
b24f8ed
Merge remote-tracking branch 'elastic/master' into rollupv2-search
talevy Jan 27, 2021
39ead9c
Added more tests
csoulios Jan 29, 2021
bff0eab
Added more tests
csoulios Feb 1, 2021
3e5db50
Refactored RollupGroup class to include rollup metrics
csoulios Feb 2, 2021
7a5fca6
Moved rollup metadata inside the datastream metadata.
csoulios Feb 3, 2021
e741f22
Merge branch 'master' into rollup-metadata-ds
csoulios Feb 3, 2021
a22ade9
Moved rollup metadata inside the datastream metadata - Part 2.
csoulios Feb 3, 2021
387c587
Merge branch 'master' into rollup-metadata-ds
csoulios Feb 4, 2021
314618c
Checkstyle fixes
csoulios Feb 4, 2021
44defef
Made instance var final
csoulios Feb 4, 2021
454d4ed
Minor cleanup
csoulios Feb 4, 2021
ba70d3a
Merge branch 'master' into rollupv2-search
csoulios Feb 4, 2021
ca6f6cf
Changed license
csoulios Feb 5, 2021
787ad2d
Merge branch 'master' into rollupv2-search
csoulios Feb 9, 2021
50ba142
Merge branch 'rollup-metadata-ds' into rollupv2-search-change-meta
csoulios Feb 10, 2021
0a826d3
Merge branch 'master' into rollupv2-search-change-meta
csoulios Feb 10, 2021
9494006
Merge branch 'master' into rollupv2-search-change-meta
csoulios Feb 11, 2021
1c18dcd
Moved resolving optimal index to the coordinator part
csoulios Feb 11, 2021
7136a1b
Merge branch 'master' into rollupv2-search
csoulios Feb 16, 2021
f43e821
Added test for rolluping up datastream indices
csoulios Feb 16, 2021
2d61b3c
Code cleanup - Added more tests
csoulios Feb 16, 2021
e87c66f
Merge branch 'master' into rollupv2-search
csoulios Feb 16, 2021
f688baf
cleanup
csoulios Feb 16, 2021
28b7035
Removed method that was not used in AggregatorFactories
csoulios Feb 17, 2021
e4f85d6
Deleted RollupMetadata and RollupGroup classes
csoulios Feb 17, 2021
712ca29
Addressed reviewer comments
csoulios Feb 17, 2021
cc2bb5c
Added some unit tests for RollupShardDecider (more to follow)
csoulios Feb 17, 2021
53dd12a
checkstyle
csoulios Feb 17, 2021
9d5cefd
Added more unit tests for RollupShardDecider
csoulios Feb 18, 2021
69fe706
Merge branch 'master' into rollupv2-search
csoulios Feb 18, 2021
381fbed
Merge branch 'master' into rollupv2-search
csoulios Mar 1, 2021
50b0b89
Minor change
csoulios Mar 2, 2021
ad1c045
Modify can_match phase so that index metadata
csoulios Mar 2, 2021
14a537d
Modified CanMatchSearchPhaseResults class
csoulios Mar 2, 2021
b2aff7e
Added some unit tests for RollupShardDecider (more to follow)
csoulios Mar 2, 2021
697de67
Merge branch 'master' into rollupv2-search
csoulios Mar 10, 2021
7e7b3b4
Merge branch 'master' into rollupv2-search
csoulios Mar 11, 2021
3d832dc
Merge branch 'master' into rollupv2-search
csoulios Mar 16, 2021
ab5a1a9
Removed RollupIndexMetadata class
csoulios Mar 17, 2021
2ebbffa
Merge branch 'master' into rollupv2-search
csoulios Mar 22, 2021
a58df7a
Use IndexMetadata settings for determing rollup index
csoulios Mar 22, 2021
1b29db2
Replaced index name with index uuid
csoulios Mar 22, 2021
ea4b5b6
Merge branch 'master' into rollupv2-search
csoulios Mar 22, 2021
879a0fe
Fix typo
csoulios Mar 22, 2021
56c10be
Fix broken test: CanMatchPreFilterSearchPhaseTests.testSortShards()
csoulios Mar 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
protected final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
protected final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import org.apache.lucene.util.FixedBitSet;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.index.query.CoordinatorRewriteContext;
import org.elasticsearch.index.query.CoordinatorRewriteContextProvider;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -27,6 +30,7 @@
import org.elasticsearch.transport.Transport;

import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -107,6 +111,11 @@ private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhase
}
possibleMatches.set(shardIndexToQuery);
}

if (RollupV2.isEnabled()) {
possibleMatches = chooseOptimalRollupShards(results, shardsIts, possibleMatches);
}

SearchSourceBuilder source = getRequest().source();
int i = 0;
for (SearchShardIterator iter : shardsIts) {
Expand All @@ -123,6 +132,69 @@ private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhase
return new GroupShardsIterator<>(sortShards(shardsIts, results.minAndMaxes, fieldSort.order()));
}

/**
* Iterate over the shard iterator and search for overlapping rollup indices. If rollup indices exist,
* the method includes the shards from the optimal rollup index that can match the query and ignores
* the shards from other overlapping indices.
*
* @return A {@link FixedBitSet} with the shards of the optimal indices.
*/
private FixedBitSet chooseOptimalRollupShards(CanMatchSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts,
FixedBitSet possibleMatches) {
// Map with key the index UUID that will be replaced by an optimal rollup index. The optimal index
// will be its substitute. The value is a tuple containing the name of the substitute index
// and its priority. In the end all indices contained in the keys will be ignored.
Map<String, Tuple<String, Long>> indexSubstitute = new HashMap<>(shardsIts.size());

int i = 0;
for (SearchShardIterator iter : shardsIts) {
CanMatchResponse result = results.getResult(i++);
if (result.canMatch()) {
String indexName = iter.shardId().getIndexName();
String indexUuid = iter.shardId().getIndex().getUUID();
Long priority = result.priority();
String sourceIndexUuid = result.sourceIndexUuid();
IndexAbstraction originalIndex = clusterState.getMetadata().getIndicesLookup().get(indexName);

// If index is not a member of a data stream or is not a rollup index, it will not be replaced
// Also, if an index has already been marked to be replaced, it should be skipped.
if (originalIndex.getParentDataStream() == null || sourceIndexUuid == null || priority == null
|| indexSubstitute.containsKey(indexUuid)) {
continue;
}

if (indexSubstitute.containsKey(sourceIndexUuid)) {
// Retrieve the previously optimal index and compare it with the current index
// Find a new optimal index and replace source index and suboptimal rollup index
// with the new optimal index.
Tuple<String, Long> previousOptimalIndex = indexSubstitute.get(sourceIndexUuid);
String newOptimalIndex = previousOptimalIndex.v2() >= priority ? previousOptimalIndex.v1() : indexUuid;
Tuple<String, Long> optimalTuple = Tuple.tuple(newOptimalIndex, Math.max(priority, previousOptimalIndex.v2()));

// Replace original index with the new optimal index
indexSubstitute.put(sourceIndexUuid, optimalTuple);

// Replace suboptimal index with the new optimal index
String suboptimalIndex = indexUuid.equals(newOptimalIndex) == false ? indexUuid : previousOptimalIndex.v1();
indexSubstitute.put(suboptimalIndex, optimalTuple);
} else {
indexSubstitute.put(sourceIndexUuid, Tuple.tuple(indexUuid, priority));
}
}
}

FixedBitSet newMatches = possibleMatches.clone();
i = 0;
for (SearchShardIterator iter : shardsIts) {
if (newMatches.get(i) && indexSubstitute.containsKey(iter.shardId().getIndex().getUUID())) {
newMatches.clear(i);
}
i++;
}
return newMatches;
}

@Override
protected void performPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard) {
CoordinatorRewriteContext coordinatorRewriteContext =
Expand Down Expand Up @@ -193,17 +265,19 @@ private static final class CanMatchSearchPhaseResults extends SearchPhaseResults
private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
private int numPossibleMatches;
private final CanMatchResponse[] results;

CanMatchSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
minAndMaxes = new MinAndMax[size];
results = new CanMatchResponse[size];
}

@Override
void consumeResult(CanMatchResponse result, Runnable next) {
try {
consumeResult(result.getShardIndex(), result.canMatch(), result.estimatedMinAndMax());
consumeResult(result.getShardIndex(), result);
} finally {
next.run();
}
Expand All @@ -217,15 +291,16 @@ boolean hasResult(int shardIndex) {
@Override
void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
consumeResult(shardIndex, true, null);
consumeResult(shardIndex, new CanMatchResponse(true, null, null, null));
}

synchronized void consumeResult(int shardIndex, boolean canMatch, MinAndMax<?> minAndMax) {
if (canMatch) {
synchronized void consumeResult(int shardIndex, CanMatchResponse result) {
if (result.canMatch()) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
minAndMaxes[shardIndex] = minAndMax;
minAndMaxes[shardIndex] = result.estimatedMinAndMax();
results[shardIndex] = result;
}

synchronized int getNumPossibleMatches() {
Expand All @@ -240,5 +315,9 @@ synchronized FixedBitSet getPossibleMatches() {
Stream<CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}

synchronized CanMatchResponse getResult(int shardIndex) {
return results[shardIndex];
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -44,6 +46,7 @@
import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.rollup.RollupV2;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
Expand Down Expand Up @@ -710,9 +713,26 @@ static boolean shouldPreFilterSearchShards(ClusterState clusterState,
} else if (preFilterShardSize == null) {
preFilterShardSize = SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE;
}
if (RollupV2.isEnabled() && hasRollupDatastream(indices, searchRequest.indices(), clusterState)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer that we always run the can match phase if a datastream is present. No need to check the existence of rollups, the can match phase was made to handle datastreams before they existed ;).

return true;
}
return searchRequest.searchType() == QUERY_THEN_FETCH // we can't do this for DFS it needs to fan out to all shards all the time
&& (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source))
&& preFilterShardSize < numShards;
&& (SearchService.canRewriteToMatchNone(source) || hasPrimaryFieldSort(source))
&& preFilterShardSize < numShards;
}

private static boolean hasRollupDatastream(String[] indices, String[] requestIndices, ClusterState clusterState) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not the logic that we need imo.
We want to know if a datastream is explicitly requested so it should useSearchRequest#indices to match datastreams ?

Set<String> requestIndicesSet = Set.of(requestIndices);
for (String index : indices) {
IndexAbstraction originalIndex = clusterState.getMetadata().getIndicesLookup().get(index);
DataStream datastream = originalIndex.getParentDataStream() != null
? originalIndex.getParentDataStream().getDataStream() : null;
IndexMetadata indexMetadata = clusterState.getMetadata().index(index);
if (datastream != null && indexMetadata.isRollupIndex() && requestIndicesSet.contains(index) == false) {
return true;
}
}
return false;
}

private static boolean hasReadOnlyIndices(String[] indices, ClusterState clusterState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ public Index getResizeSourceIndex() {
public static final Setting<String> INDEX_ROLLUP_SOURCE_NAME = Setting.simpleString(INDEX_ROLLUP_SOURCE_NAME_KEY,
Property.IndexScope, Property.PrivateIndex);

public boolean isRollupIndex() {
return INDEX_ROLLUP_SOURCE_UUID.exists(settings);
}

ImmutableOpenMap<String, DiffableStringMap> getCustomData() {
return this.customData;
}
Expand Down
Loading