Skip to content

Commit 294e9cc

Browse files
authored
Always include the matching node when resolving point in time (#61658)
If shards are relocated to new nodes, then searches with a point in time will fail, although a pit keeps search contexts open. This commit solves this problem by reducing info used by SearchShardIterator and always including the matching nodes when resolving a point in time. Closes #61627
1 parent a70c00a commit 294e9cc

19 files changed

+346
-207
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+14-15
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.action.support.TransportActions;
3333
import org.elasticsearch.cluster.ClusterState;
3434
import org.elasticsearch.cluster.routing.GroupShardsIterator;
35-
import org.elasticsearch.cluster.routing.ShardRouting;
3635
import org.elasticsearch.common.Nullable;
3736
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
3837
import org.elasticsearch.common.util.concurrent.AtomicArray;
@@ -212,7 +211,7 @@ void skipShard(SearchShardIterator iterator) {
212211
successfulShardExecution(iterator);
213212
}
214213

215-
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final ShardRouting shard) {
214+
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
216215
/*
217216
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
218217
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
@@ -221,16 +220,16 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
221220
* we can continue (cf. InitialSearchPhase#maybeFork).
222221
*/
223222
if (shard == null) {
224-
fork(() -> onShardFailure(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
223+
fork(() -> onShardFailure(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
225224
} else {
226225
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
227-
pendingExecutionsPerNode.computeIfAbsent(shard.currentNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
226+
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
228227
: null;
229228
Runnable r = () -> {
230229
final Thread thread = Thread.currentThread();
231230
try {
232231
executePhaseOnShard(shardIt, shard,
233-
new SearchActionListener<Result>(shardIt.newSearchShardTarget(shard.currentNodeId()), shardIndex) {
232+
new SearchActionListener<Result>(shard, shardIndex) {
234233
@Override
235234
public void innerOnResponse(Result result) {
236235
try {
@@ -243,7 +242,7 @@ public void innerOnResponse(Result result) {
243242
@Override
244243
public void onFailure(Exception t) {
245244
try {
246-
onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, t);
245+
onShardFailure(shardIndex, shard, shardIt, t);
247246
} finally {
248247
executeNext(pendingExecutions, thread);
249248
}
@@ -255,7 +254,7 @@ public void onFailure(Exception t) {
255254
* It is possible to run into connection exceptions here because we are getting the connection early and might
256255
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
257256
*/
258-
fork(() -> onShardFailure(shardIndex, shard, shard.currentNodeId(), shardIt, e));
257+
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
259258
} finally {
260259
executeNext(pendingExecutions, thread);
261260
}
@@ -275,7 +274,9 @@ public void onFailure(Exception t) {
275274
* @param shard the shard routing to send the request for
276275
* @param listener the listener to notify on response
277276
*/
278-
protected abstract void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard, SearchActionListener<Result> listener);
277+
protected abstract void executePhaseOnShard(SearchShardIterator shardIt,
278+
SearchShardTarget shard,
279+
SearchActionListener<Result> listener);
279280

280281
private void fork(final Runnable runnable) {
281282
executor.execute(new AbstractRunnable() {
@@ -370,18 +371,16 @@ ShardSearchFailure[] buildShardFailures() {
370371
return failures;
371372
}
372373

373-
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,
374-
final SearchShardIterator shardIt, Exception e) {
374+
private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
375375
// we always add the shard failure for a specific shard instance
376376
// we do make sure to clean it on a successful response from a shard
377-
SearchShardTarget shardTarget = shardIt.newSearchShardTarget(nodeId);
378-
onShardFailure(shardIndex, shardTarget, e);
379-
final ShardRouting nextShard = shardIt.nextOrNull();
377+
onShardFailure(shardIndex, shard, e);
378+
final SearchShardTarget nextShard = shardIt.nextOrNull();
380379
final boolean lastShard = nextShard == null;
381380
logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]",
382-
shard != null ? shard.shortSummary() : shardIt.shardId(), request, lastShard), e);
381+
shard != null ? shard : shardIt.shardId(), request, lastShard), e);
383382
if (lastShard) {
384-
onShardGroupFailure(shardIndex, shardTarget, e);
383+
onShardGroupFailure(shardIndex, shard, e);
385384
}
386385
final int totalOps = this.totalOps.incrementAndGet();
387386
if (totalOps == expectedTotalOps) {

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.routing.GroupShardsIterator;
26-
import org.elasticsearch.cluster.routing.ShardRouting;
2726
import org.elasticsearch.search.SearchService.CanMatchResponse;
27+
import org.elasticsearch.search.SearchShardTarget;
2828
import org.elasticsearch.search.builder.SearchSourceBuilder;
2929
import org.elasticsearch.search.internal.AliasFilter;
3030
import org.elasticsearch.search.sort.FieldSortBuilder;
@@ -77,9 +77,9 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
7777
}
7878

7979
@Override
80-
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
80+
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
8181
SearchActionListener<CanMatchResponse> listener) {
82-
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
82+
getSearchTransport().sendCanMatch(getConnection(shard.getClusterAlias(), shard.getNodeId()),
8383
buildShardSearchRequest(shardIt), getTask(), listener);
8484
}
8585

server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.elasticsearch.action.ActionListener;
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.routing.GroupShardsIterator;
26-
import org.elasticsearch.cluster.routing.ShardRouting;
26+
import org.elasticsearch.search.SearchShardTarget;
2727
import org.elasticsearch.search.builder.SearchSourceBuilder;
2828
import org.elasticsearch.search.dfs.AggregatedDfs;
2929
import org.elasticsearch.search.dfs.DfsSearchResult;
@@ -65,9 +65,9 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
6565
}
6666

6767
@Override
68-
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
68+
protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard,
6969
final SearchActionListener<DfsSearchResult> listener) {
70-
getSearchTransport().sendExecuteDfs(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
70+
getSearchTransport().sendExecuteDfs(getConnection(shard.getClusterAlias(), shard.getNodeId()),
7171
buildShardSearchRequest(shardIt) , getTask(), listener);
7272
}
7373

server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.elasticsearch.action.ActionListener;
2525
import org.elasticsearch.cluster.ClusterState;
2626
import org.elasticsearch.cluster.routing.GroupShardsIterator;
27-
import org.elasticsearch.cluster.routing.ShardRouting;
2827
import org.elasticsearch.search.SearchPhaseResult;
2928
import org.elasticsearch.search.SearchShardTarget;
3029
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -75,11 +74,11 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh
7574
SearchProgressListener.buildSearchShards(toSkipShardsIts), clusters, sourceBuilder == null || sourceBuilder.size() != 0);
7675
}
7776

78-
protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard,
77+
protected void executePhaseOnShard(final SearchShardIterator shardIt,
78+
final SearchShardTarget shard,
7979
final SearchActionListener<SearchPhaseResult> listener) {
8080
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt));
81-
getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
82-
request, getTask(), listener);
81+
getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener);
8382
}
8483

8584
@Override

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

+51-41
Original file line numberDiff line numberDiff line change
@@ -21,49 +21,56 @@
2121

2222
import org.elasticsearch.action.OriginalIndices;
2323
import org.elasticsearch.cluster.routing.PlainShardIterator;
24-
import org.elasticsearch.cluster.routing.ShardIterator;
2524
import org.elasticsearch.cluster.routing.ShardRouting;
2625
import org.elasticsearch.common.Nullable;
2726
import org.elasticsearch.common.unit.TimeValue;
27+
import org.elasticsearch.common.util.Countable;
28+
import org.elasticsearch.common.util.PlainIterator;
2829
import org.elasticsearch.index.shard.ShardId;
2930
import org.elasticsearch.search.SearchShardTarget;
3031
import org.elasticsearch.search.internal.ShardSearchContextId;
3132

33+
import java.util.Comparator;
3234
import java.util.List;
3335
import java.util.Objects;
36+
import java.util.stream.Collectors;
3437

3538
/**
3639
* Extension of {@link PlainShardIterator} used in the search api, which also holds the {@link OriginalIndices}
3740
* of the search request (useful especially with cross-cluster search, as each cluster has its own set of original indices) as well as
3841
* the cluster alias.
3942
* @see OriginalIndices
4043
*/
41-
public final class SearchShardIterator extends PlainShardIterator {
44+
public final class SearchShardIterator implements Comparable<SearchShardIterator>, Countable {
4245

4346
private final OriginalIndices originalIndices;
4447
private final String clusterAlias;
48+
private final ShardId shardId;
4549
private boolean skip = false;
4650

4751
private final ShardSearchContextId searchContextId;
4852
private final TimeValue searchContextKeepAlive;
53+
private final PlainIterator<String> targetNodesIterator;
4954

5055
/**
5156
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
5257
* this the a given <code>shardId</code>.
5358
*
54-
* @param clusterAlias the alias of the cluster where the shard is located
55-
* @param shardId shard id of the group
56-
* @param shards shards to iterate
59+
* @param clusterAlias the alias of the cluster where the shard is located
60+
* @param shardId shard id of the group
61+
* @param shards shards to iterate
5762
* @param originalIndices the indices that the search request originally related to (before any rewriting happened)
5863
*/
5964
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) {
60-
this(clusterAlias, shardId, shards, originalIndices, null, null);
65+
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).collect(Collectors.toList()),
66+
originalIndices, null, null);
6167
}
6268

6369
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId,
64-
List<ShardRouting> shards, OriginalIndices originalIndices,
70+
List<String> targetNodeIds, OriginalIndices originalIndices,
6571
ShardSearchContextId searchContextId, TimeValue searchContextKeepAlive) {
66-
super(shardId, shards);
72+
this.shardId = shardId;
73+
this.targetNodesIterator = new PlainIterator<>(targetNodeIds);
6774
this.originalIndices = originalIndices;
6875
this.clusterAlias = clusterAlias;
6976
this.searchContextId = searchContextId;
@@ -86,12 +93,16 @@ public String getClusterAlias() {
8693
return clusterAlias;
8794
}
8895

89-
/**
90-
* Creates a new shard target from this iterator, pointing at the node identified by the provided identifier.
91-
* @see SearchShardTarget
92-
*/
93-
SearchShardTarget newSearchShardTarget(String nodeId) {
94-
return new SearchShardTarget(nodeId, shardId(), clusterAlias, originalIndices);
96+
SearchShardTarget nextOrNull() {
97+
final String nodeId = targetNodesIterator.nextOrNull();
98+
if (nodeId != null) {
99+
return new SearchShardTarget(nodeId, shardId, clusterAlias, originalIndices);
100+
}
101+
return null;
102+
}
103+
104+
int remaining() {
105+
return targetNodesIterator.remaining();
95106
}
96107

97108
/**
@@ -105,6 +116,10 @@ TimeValue getSearchContextKeepAlive() {
105116
return searchContextKeepAlive;
106117
}
107118

119+
List<String> getTargetNodeIds() {
120+
return targetNodesIterator.asList();
121+
}
122+
108123
/**
109124
* Reset the iterator and mark it as skippable
110125
* @see #skip()
@@ -114,49 +129,44 @@ void resetAndSkip() {
114129
skip = true;
115130
}
116131

132+
void reset() {
133+
targetNodesIterator.reset();
134+
}
135+
117136
/**
118137
* Returns <code>true</code> if the search execution should skip this shard since it can not match any documents given the query.
119138
*/
120139
boolean skip() {
121140
return skip;
122141
}
123142

143+
144+
@Override
145+
public int size() {
146+
return targetNodesIterator.size();
147+
}
148+
149+
ShardId shardId() {
150+
return shardId;
151+
}
152+
124153
@Override
125154
public boolean equals(Object o) {
126-
if (this == o) {
127-
return true;
128-
}
129-
if (o == null || getClass() != o.getClass()) {
130-
return false;
131-
}
132-
if (super.equals(o) == false) {
133-
return false;
134-
}
155+
if (this == o) return true;
156+
if (o == null || getClass() != o.getClass()) return false;
135157
SearchShardIterator that = (SearchShardIterator) o;
136-
return Objects.equals(clusterAlias, that.clusterAlias);
158+
return shardId.equals(that.shardId) && Objects.equals(clusterAlias, that.clusterAlias);
137159
}
138160

139161
@Override
140162
public int hashCode() {
141-
return Objects.hash(super.hashCode(), clusterAlias);
163+
return Objects.hash(clusterAlias, shardId);
142164
}
143165

144166
@Override
145-
public int compareTo(ShardIterator o) {
146-
int superCompareTo = super.compareTo(o);
147-
if (superCompareTo != 0 || (o instanceof SearchShardIterator == false)) {
148-
return superCompareTo;
149-
}
150-
SearchShardIterator searchShardIterator = (SearchShardIterator)o;
151-
if (clusterAlias == null && searchShardIterator.getClusterAlias() == null) {
152-
return 0;
153-
}
154-
if (clusterAlias == null) {
155-
return -1;
156-
}
157-
if (searchShardIterator.getClusterAlias() == null) {
158-
return 1;
159-
}
160-
return clusterAlias.compareTo(searchShardIterator.getClusterAlias());
167+
public int compareTo(SearchShardIterator o) {
168+
return Comparator.comparing(SearchShardIterator::shardId)
169+
.thenComparing(SearchShardIterator::getClusterAlias, Comparator.nullsFirst(String::compareTo))
170+
.compare(this, o);
161171
}
162172
}

0 commit comments

Comments
 (0)