-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Node level can match action #78765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Node level can match action #78765
Changes from all commits
Commits
Show all changes
38 commits
Select commit
Hold shift + click to select a range
76b467a
WIP
ywelsch 07e9f51
happy path
ywelsch 53d1071
more work
ywelsch 2263bbd
more edits
ywelsch 66d2095
more stuff
ywelsch 4a8bf13
CCS multi-version test passing
ywelsch a69bcbc
unit tests
ywelsch fe56d5a
simpler diff?
ywelsch 75e2bb5
more renaming
ywelsch 3e1d6e1
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch 44b5783
single list
ywelsch a65bd63
reset prefiltter settings
ywelsch 41b61bd
fix serialzation
ywelsch dd93c93
thread pool
ywelsch 1a718cf
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch 36bb4a1
javadoc
ywelsch 9bf0259
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch da5707f
merge conflict
ywelsch ea79ae2
docs
ywelsch 9c3196f
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch 5a81c07
remove dead code
ywelsch e26160f
avoid cast
ywelsch 02c7193
simpler and rename
ywelsch ca49aa8
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch 9670258
checkstyle
ywelsch 803068b
cosmetics
ywelsch a3457ef
original indices
ywelsch d032fef
Add can match qa test
dnhatn 24baa8b
implement indicesrequest
ywelsch 171e530
Merge branch 'node-level-can-match' of github.com:ywelsch/elasticsear…
ywelsch 259f049
merge originalindices
ywelsch 16f45ed
better
ywelsch c0e35ee
radnomize use of can-match phase
ywelsch bf6b194
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch ade3b7b
Merge branch 'master' into node-level-can-match
elasticmachine cc83808
rename
ywelsch aada13a
Merge remote-tracking branch 'elastic/master' into node-level-can-match
ywelsch c1cc951
Merge branch 'node-level-can-match' of github.com:ywelsch/elasticsear…
ywelsch File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
219 changes: 219 additions & 0 deletions
219
server/src/main/java/org/elasticsearch/action/search/CanMatchNodeRequest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,219 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.action.search; | ||
|
||
import org.elasticsearch.action.IndicesRequest; | ||
import org.elasticsearch.action.OriginalIndices; | ||
import org.elasticsearch.action.support.IndicesOptions; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.io.stream.Writeable; | ||
import org.elasticsearch.core.Nullable; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.search.Scroll; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.search.internal.AliasFilter; | ||
import org.elasticsearch.search.internal.ShardSearchContextId; | ||
import org.elasticsearch.search.internal.ShardSearchRequest; | ||
import org.elasticsearch.tasks.Task; | ||
import org.elasticsearch.tasks.TaskId; | ||
import org.elasticsearch.transport.TransportRequest; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* Node-level request used during can-match phase | ||
*/ | ||
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest { | ||
|
||
private final SearchSourceBuilder source; | ||
private final List<Shard> shards; | ||
private final SearchType searchType; | ||
private final Boolean requestCache; | ||
private final boolean allowPartialSearchResults; | ||
private final Scroll scroll; | ||
private final int numberOfShards; | ||
private final long nowInMillis; | ||
@Nullable | ||
private final String clusterAlias; | ||
private final String[] indices; | ||
private final IndicesOptions indicesOptions; | ||
private final TimeValue waitForCheckpointsTimeout; | ||
|
||
public static class Shard implements Writeable { | ||
private final String[] indices; | ||
private final ShardId shardId; | ||
private final int shardRequestIndex; | ||
private final AliasFilter aliasFilter; | ||
private final float indexBoost; | ||
private final ShardSearchContextId readerId; | ||
private final TimeValue keepAlive; | ||
private final long waitForCheckpoint; | ||
|
||
public Shard(String[] indices, | ||
ShardId shardId, | ||
int shardRequestIndex, | ||
AliasFilter aliasFilter, | ||
float indexBoost, | ||
ShardSearchContextId readerId, | ||
TimeValue keepAlive, | ||
long waitForCheckpoint) { | ||
this.indices = indices; | ||
this.shardId = shardId; | ||
this.shardRequestIndex = shardRequestIndex; | ||
this.aliasFilter = aliasFilter; | ||
this.indexBoost = indexBoost; | ||
this.readerId = readerId; | ||
this.keepAlive = keepAlive; | ||
this.waitForCheckpoint = waitForCheckpoint; | ||
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive; | ||
} | ||
|
||
public Shard(StreamInput in) throws IOException { | ||
indices = in.readStringArray(); | ||
shardId = new ShardId(in); | ||
shardRequestIndex = in.readVInt(); | ||
aliasFilter = new AliasFilter(in); | ||
indexBoost = in.readFloat(); | ||
readerId = in.readOptionalWriteable(ShardSearchContextId::new); | ||
keepAlive = in.readOptionalTimeValue(); | ||
waitForCheckpoint = in.readLong(); | ||
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive; | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeStringArray(indices); | ||
shardId.writeTo(out); | ||
out.writeVInt(shardRequestIndex); | ||
aliasFilter.writeTo(out); | ||
out.writeFloat(indexBoost); | ||
out.writeOptionalWriteable(readerId); | ||
out.writeOptionalTimeValue(keepAlive); | ||
out.writeLong(waitForCheckpoint); | ||
} | ||
|
||
public int getShardRequestIndex() { | ||
return shardRequestIndex; | ||
} | ||
|
||
public String[] getOriginalIndices() { | ||
return indices; | ||
} | ||
|
||
public ShardId shardId() { | ||
return shardId; | ||
} | ||
} | ||
|
||
public CanMatchNodeRequest( | ||
SearchRequest searchRequest, | ||
IndicesOptions indicesOptions, | ||
List<Shard> shards, | ||
int numberOfShards, | ||
long nowInMillis, | ||
@Nullable String clusterAlias | ||
) { | ||
this.source = searchRequest.source(); | ||
this.indicesOptions = indicesOptions; | ||
this.shards = new ArrayList<>(shards); | ||
this.searchType = searchRequest.searchType(); | ||
this.requestCache = searchRequest.requestCache(); | ||
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted | ||
// at this stage. Any NPEs in the above are therefore an error in request preparation logic. | ||
assert searchRequest.allowPartialSearchResults() != null; | ||
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults(); | ||
this.scroll = searchRequest.scroll(); | ||
this.numberOfShards = numberOfShards; | ||
this.nowInMillis = nowInMillis; | ||
this.clusterAlias = clusterAlias; | ||
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout(); | ||
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct() | ||
.toArray(String[]::new); | ||
} | ||
|
||
public CanMatchNodeRequest(StreamInput in) throws IOException { | ||
super(in); | ||
source = in.readOptionalWriteable(SearchSourceBuilder::new); | ||
indicesOptions = IndicesOptions.readIndicesOptions(in); | ||
searchType = SearchType.fromId(in.readByte()); | ||
scroll = in.readOptionalWriteable(Scroll::new); | ||
requestCache = in.readOptionalBoolean(); | ||
allowPartialSearchResults = in.readBoolean(); | ||
numberOfShards = in.readVInt(); | ||
nowInMillis = in.readVLong(); | ||
clusterAlias = in.readOptionalString(); | ||
waitForCheckpointsTimeout = in.readTimeValue(); | ||
shards = in.readList(Shard::new); | ||
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct() | ||
.toArray(String[]::new); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
super.writeTo(out); | ||
out.writeOptionalWriteable(source); | ||
indicesOptions.writeIndicesOptions(out); | ||
out.writeByte(searchType.id()); | ||
out.writeOptionalWriteable(scroll); | ||
out.writeOptionalBoolean(requestCache); | ||
out.writeBoolean(allowPartialSearchResults); | ||
out.writeVInt(numberOfShards); | ||
out.writeVLong(nowInMillis); | ||
out.writeOptionalString(clusterAlias); | ||
out.writeTimeValue(waitForCheckpointsTimeout); | ||
out.writeList(shards); | ||
} | ||
|
||
public List<Shard> getShardLevelRequests() { | ||
return shards; | ||
} | ||
|
||
public List<ShardSearchRequest> createShardSearchRequests() { | ||
return shards.stream().map(this::createShardSearchRequest).collect(Collectors.toList()); | ||
} | ||
|
||
public ShardSearchRequest createShardSearchRequest(Shard r) { | ||
ShardSearchRequest shardSearchRequest = new ShardSearchRequest( | ||
new OriginalIndices(r.indices, indicesOptions), r.shardId, r.shardRequestIndex, numberOfShards, searchType, | ||
source, requestCache, r.aliasFilter, r.indexBoost, allowPartialSearchResults, scroll, | ||
nowInMillis, clusterAlias, r.readerId, r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout | ||
); | ||
shardSearchRequest.setParentTask(getParentTask()); | ||
return shardSearchRequest; | ||
} | ||
|
||
@Override | ||
public String[] indices() { | ||
return indices; | ||
} | ||
|
||
@Override | ||
public IndicesOptions indicesOptions() { | ||
return indicesOptions; | ||
} | ||
|
||
@Override | ||
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) { | ||
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers); | ||
} | ||
|
||
@Override | ||
public String getDescription() { | ||
// Shard id is enough here, the request itself can be found by looking at the parent task description | ||
return "shardIds[" + shards.stream().map(slr -> slr.shardId).collect(Collectors.toList()) + "]"; | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.