Skip to content

Commit 6367839

Browse files
committed
real time get
1 parent 8411182 commit 6367839

File tree

2 files changed

+102
-5
lines changed

2 files changed

+102
-5
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportGetAction.java

Lines changed: 92 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,32 @@
99
package org.elasticsearch.action.get;
1010

1111
import org.elasticsearch.action.ActionListener;
12+
import org.elasticsearch.action.ActionListenerResponseHandler;
1213
import org.elasticsearch.action.ActionRunnable;
14+
import org.elasticsearch.action.NoShardAvailableActionException;
15+
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
1316
import org.elasticsearch.action.support.ActionFilters;
17+
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
1418
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
19+
import org.elasticsearch.client.internal.node.NodeClient;
1520
import org.elasticsearch.cluster.ClusterState;
1621
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.cluster.routing.PlainShardIterator;
1724
import org.elasticsearch.cluster.routing.ShardIterator;
25+
import org.elasticsearch.cluster.routing.ShardRouting;
1826
import org.elasticsearch.cluster.service.ClusterService;
1927
import org.elasticsearch.common.inject.Inject;
2028
import org.elasticsearch.common.io.stream.Writeable;
2129
import org.elasticsearch.index.IndexService;
2230
import org.elasticsearch.index.get.GetResult;
2331
import org.elasticsearch.index.shard.IndexShard;
2432
import org.elasticsearch.index.shard.ShardId;
33+
import org.elasticsearch.index.shard.ShardNotFoundException;
2534
import org.elasticsearch.indices.ExecutorSelector;
2635
import org.elasticsearch.indices.IndicesService;
36+
import org.elasticsearch.logging.LogManager;
37+
import org.elasticsearch.logging.Logger;
2738
import org.elasticsearch.threadpool.ThreadPool;
2839
import org.elasticsearch.transport.TransportService;
2940

@@ -34,8 +45,11 @@
3445
*/
3546
public class TransportGetAction extends TransportSingleShardAction<GetRequest, GetResponse> {
3647

48+
private static final Logger logger = LogManager.getLogger(TransportGetAction.class);
49+
3750
private final IndicesService indicesService;
3851
private final ExecutorSelector executorSelector;
52+
private final NodeClient client;
3953

4054
@Inject
4155
public TransportGetAction(
@@ -45,7 +59,8 @@ public TransportGetAction(
4559
ThreadPool threadPool,
4660
ActionFilters actionFilters,
4761
IndexNameExpressionResolver indexNameExpressionResolver,
48-
ExecutorSelector executorSelector
62+
ExecutorSelector executorSelector,
63+
NodeClient client
4964
) {
5065
super(
5166
GetAction.NAME,
@@ -59,6 +74,7 @@ public TransportGetAction(
5974
);
6075
this.indicesService = indicesService;
6176
this.executorSelector = executorSelector;
77+
this.client = client;
6278
// register the internal TransportGetFromTranslogAction
6379
new TransportGetFromTranslogAction(transportService, indicesService, actionFilters);
6480
}
@@ -78,7 +94,10 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {
7894
request.request().routing(),
7995
request.request().preference()
8096
);
81-
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator);
97+
if (iterator == null) {
98+
return null;
99+
}
100+
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
82101
}
83102

84103
@Override
@@ -91,6 +110,16 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {
91110
protected void asyncShardOperation(GetRequest request, ShardId shardId, ActionListener<GetResponse> listener) throws IOException {
92111
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
93112
IndexShard indexShard = indexService.getShard(shardId.id());
113+
if (indexShard.routingEntry() == null) {
114+
listener.onFailure(new ShardNotFoundException(shardId));
115+
return;
116+
}
117+
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
118+
handleGetOnUnpromotableShard(request, indexShard, listener);
119+
return;
120+
}
121+
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
122+
: "A TransportGetAction should always be handled by a search shard in Stateless";
94123
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
95124
asyncGet(request, shardId, listener);
96125
} else {
@@ -148,6 +177,67 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes
148177
}
149178
}
150179

180+
private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
181+
throws IOException {
182+
ShardId shardId = indexShard.shardId();
183+
DiscoveryNode node = getCurrentNodeOfPrimary(shardId);
184+
if (request.refresh()) {
185+
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
186+
var refreshRequest = new BasicReplicationRequest(shardId);
187+
refreshRequest.setParentTask(request.getParentTask());
188+
client.executeLocally(
189+
TransportShardRefreshAction.TYPE,
190+
refreshRequest,
191+
ActionListener.wrap(replicationResponse -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
192+
);
193+
} else if (request.realtime()) {
194+
TransportGetFromTranslogAction.Request getFromTranslogRequest = new TransportGetFromTranslogAction.Request(request, shardId);
195+
getFromTranslogRequest.setParentTask(request.getParentTask());
196+
transportService.sendRequest(
197+
node,
198+
TransportGetFromTranslogAction.NAME,
199+
getFromTranslogRequest,
200+
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
201+
if (r.getResult() != null) {
202+
logger.debug("received result for real-time get for id '{}' from promotable shard", request.id());
203+
l.onResponse(new GetResponse(r.getResult()));
204+
} else {
205+
logger.debug(
206+
"no result for real-time get for id '{}' from promotable shard (segment generation to wait for: {})",
207+
request.id(),
208+
r.segmentGeneration()
209+
);
210+
if (r.segmentGeneration() == -1) {
211+
// Nothing to wait for (no previous unsafe generation), just handle the Get locally.
212+
ActionRunnable.supply(listener, () -> shardOperation(request, shardId)).run();
213+
} else {
214+
assert r.segmentGeneration() > -1L;
215+
indexShard.waitForSegmentGeneration(
216+
r.segmentGeneration(),
217+
ActionListener.wrap(aLong -> super.asyncShardOperation(request, shardId, listener), listener::onFailure)
218+
);
219+
}
220+
}
221+
}), TransportGetFromTranslogAction.Response::new, getExecutor(request, shardId))
222+
);
223+
} else {
224+
// A non-real-time get with no explicit refresh requested.
225+
super.asyncShardOperation(request, shardId, listener);
226+
}
227+
}
228+
229+
private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) {
230+
var shardRoutingTable = clusterService.state().routingTable().shardRoutingTable(shardId);
231+
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {
232+
throw new NoShardAvailableActionException(shardId, "primary shard is not active");
233+
}
234+
DiscoveryNode node = clusterService.state().nodes().get(shardRoutingTable.primaryShard().currentNodeId());
235+
if (node == null) {
236+
throw new NoShardAvailableActionException(shardId);
237+
}
238+
return node;
239+
}
240+
151241
private IndexShard getIndexShard(ShardId shardId) {
152242
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
153243
return indexService.getShard(shardId.id());

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ public class InternalEngine extends Engine {
214214

215215
private final ByteSizeValue totalDiskSpace;
216216

217+
protected static final String REAL_TIME_GET_REFRESH_SOURCE = "realtime_get";
218+
protected static final String UNSAFE_VERSION_MAP_REFRESH_SOURCE = "unsafe_version_map";
219+
217220
public InternalEngine(EngineConfig engineConfig) {
218221
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
219222
}
@@ -848,7 +851,7 @@ protected GetResult realtimeGetUnderLock(
848851
}
849852
}
850853
assert versionValue.seqNo >= 0 : versionValue;
851-
refreshIfNeeded("realtime_get", versionValue.seqNo);
854+
refreshIfNeeded(REAL_TIME_GET_REFRESH_SOURCE, versionValue.seqNo);
852855
}
853856
if (getFromSearcherIfNotInTranslog) {
854857
return getFromSearcher(get, acquireSearcher("realtime_get", SearcherScope.INTERNAL, searcherWrapper), false);
@@ -960,7 +963,7 @@ private VersionValue getVersionFromMap(BytesRef id) {
960963
// map so once we pass this point we can safely lookup from the version map.
961964
if (versionMap.isUnsafe()) {
962965
lastUnsafeSegmentGenerationForGets.set(lastCommittedSegmentInfos.getGeneration() + 1);
963-
refresh("unsafe_version_map", SearcherScope.INTERNAL, true);
966+
refreshInternalSearcher(UNSAFE_VERSION_MAP_REFRESH_SOURCE, true);
964967
}
965968
versionMap.enforceSafeAccess();
966969
}
@@ -1929,6 +1932,10 @@ public RefreshResult maybeRefresh(String source) throws EngineException {
19291932
return refresh(source, SearcherScope.EXTERNAL, false);
19301933
}
19311934

1935+
protected RefreshResult refreshInternalSearcher(String source, boolean block) throws EngineException {
1936+
return refresh(source, SearcherScope.INTERNAL, block);
1937+
}
1938+
19321939
final RefreshResult refresh(String source, SearcherScope scope, boolean block) throws EngineException {
19331940
// both refresh types will result in an internal refresh but only the external will also
19341941
// pass the new reader reference to the external reader manager.
@@ -3052,7 +3059,7 @@ protected final void refreshIfNeeded(String source, long requestingSeqNo) {
30523059
if (lastRefreshedCheckpoint() < requestingSeqNo) {
30533060
synchronized (refreshIfNeededMutex) {
30543061
if (lastRefreshedCheckpoint() < requestingSeqNo) {
3055-
refresh(source, SearcherScope.INTERNAL, true);
3062+
refreshInternalSearcher(source, true);
30563063
}
30573064
}
30583065
}

0 commit comments

Comments
 (0)