Skip to content

Stateless real-time mget #96763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 15, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ routing:
index:
number_of_shards: 5
number_of_routing_shards: 5
number_of_replicas: 0
Copy link
Member Author

@pxsalehi pxsalehi Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Stateless, these tests need a search shard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, but why do we remove the cluster health check below and in the other .yml file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's basically waiting for a green index which is not needed. See #94385 for more detail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we do need to wait for a green index here, since otherwise the mget could fail in stateless in case the search shard is not yet available. AFAICS, the default is to wait for one active shard.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is if I add wait for green, in stateful the test would never pass since default replica is 1 and we have a one node cluster. To make the test work for both stateful and stateless we need to do this. I've done the same change for a very similar (5 shard) test case for get. Please see 5010402. So far I haven't seen any failures. If it turns out to be an issue I think we'd need to clone the test or play with some related settings.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use auto-expand replicas 0-1 instead then? I think that would work in both setups.

I think this does introduce fragility into testing and we should try to avoid that if we can.

auto_expand_replicas: 0-1

- do:
cluster.health:
Expand Down Expand Up @@ -51,11 +51,15 @@ requires routing:
settings:
index:
number_of_shards: 5
number_of_replicas: 0
auto_expand_replicas: 0-1
mappings:
_routing:
required: true

- do:
cluster.health:
wait_for_status: green

- do:
index:
index: test_1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
settings:
index:
refresh_interval: -1
number_of_replicas: 0
auto_expand_replicas: 0-1

- do:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, I think we need the wait for green here for it to work in stateless.

cluster.health:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes
private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener)
throws IOException {
ShardId shardId = indexShard.shardId();
DiscoveryNode node = getCurrentNodeOfPrimary(shardId);
var node = getCurrentNodeOfPrimary(clusterService.state(), shardId);
if (request.refresh()) {
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
var refreshRequest = new BasicReplicationRequest(shardId);
Expand Down Expand Up @@ -226,8 +226,7 @@ private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexSh
}
}

private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) {
var clusterState = clusterService.state();
static DiscoveryNode getCurrentNodeOfPrimary(ClusterState clusterState, ShardId shardId) {
var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId);
if (shardRoutingTable.primaryShard() == null || shardRoutingTable.primaryShard().active() == false) {
throw new NoShardAvailableActionException(shardId, "primary shard is not active");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,21 @@
package org.elasticsearch.action.get;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.refresh.TransportShardRefreshAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.TransportActions;
import org.elasticsearch.action.support.replication.BasicReplicationRequest;
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.Writeable;
Expand All @@ -26,20 +33,25 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.ExecutorSelector;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;

import static org.elasticsearch.action.get.TransportGetAction.getCurrentNodeOfPrimary;
import static org.elasticsearch.core.Strings.format;

public class TransportShardMultiGetAction extends TransportSingleShardAction<MultiGetShardRequest, MultiGetShardResponse> {

private static final String ACTION_NAME = MultiGetAction.NAME + "[shard]";
public static final ActionType<MultiGetShardResponse> TYPE = new ActionType<>(ACTION_NAME, MultiGetShardResponse::new);
private static final Logger logger = LogManager.getLogger(TransportShardMultiGetAction.class);

private final IndicesService indicesService;
private final ExecutorSelector executorSelector;
private final NodeClient client;

@Inject
public TransportShardMultiGetAction(
Expand All @@ -49,7 +61,8 @@ public TransportShardMultiGetAction(
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ExecutorSelector executorSelector
ExecutorSelector executorSelector,
NodeClient client
) {
super(
ACTION_NAME,
Expand All @@ -63,6 +76,7 @@ public TransportShardMultiGetAction(
);
this.indicesService = indicesService;
this.executorSelector = executorSelector;
this.client = client;
}

@Override
Expand All @@ -84,14 +98,23 @@ protected boolean resolveIndex(MultiGetShardRequest request) {
protected ShardIterator shards(ClusterState state, InternalRequest request) {
ShardIterator iterator = clusterService.operationRouting()
.getShards(state, request.request().index(), request.request().shardId(), request.request().preference());
return clusterService.operationRouting().useOnlyPromotableShardsForStateless(iterator);
if (iterator == null) {
return null;
}
return new PlainShardIterator(iterator.shardId(), iterator.getShardRoutings().stream().filter(ShardRouting::isSearchable).toList());
}

@Override
protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener)
throws IOException {
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.getShard(shardId.id());
if (indexShard.routingEntry().isPromotableToPrimary() == false) {
handleMultiGetOnUnpromotableShard(request, indexShard, listener);
return;
}
assert DiscoveryNode.isStateless(clusterService.getSettings()) == false
: "A TransportShardMultiGetAction should always be handled by a search shard in Stateless";
if (request.realtime()) { // we are not tied to a refresh cycle here anyway
asyncShardMultiGet(request, shardId, listener);
} else {
Expand All @@ -107,35 +130,10 @@ protected void asyncShardOperation(MultiGetShardRequest request, ShardId shardId

@Override
protected MultiGetShardResponse shardOperation(MultiGetShardRequest request, ShardId shardId) {
var indexShard = getIndexShard(shardId);
MultiGetShardResponse response = new MultiGetShardResponse();
for (int i = 0; i < request.locations.size(); i++) {
MultiGetRequest.Item item = request.items.get(i);
try {
GetResult getResult = indexShard.getService()
.get(
item.id(),
item.storedFields(),
request.realtime(),
item.version(),
item.versionType(),
item.fetchSourceContext(),
request.isForceSyntheticSource()
);
response.add(request.locations.get(i), new GetResponse(getResult));
} catch (RuntimeException e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw e;
} else {
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e));
}
} catch (IOException e) {
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
response.add(request.locations.get(i), new MultiGetResponse.Failure(request.index(), item.id(), e));
}
getAndAddToResponse(shardId, i, request, response);
}

return response;
}

Expand All @@ -151,6 +149,110 @@ protected String getExecutor(MultiGetShardRequest request, ShardId shardId) {
}
}

private void handleMultiGetOnUnpromotableShard(
MultiGetShardRequest request,
IndexShard indexShard,
ActionListener<MultiGetShardResponse> listener
) throws IOException {
ShardId shardId = indexShard.shardId();
var node = getCurrentNodeOfPrimary(clusterService.state(), shardId);
if (request.refresh()) {
logger.trace("send refresh action for shard {} to node {}", shardId, node.getId());
var refreshRequest = new BasicReplicationRequest(shardId);
refreshRequest.setParentTask(request.getParentTask());
client.executeLocally(
TransportShardRefreshAction.TYPE,
refreshRequest,
listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l))
);
} else if (request.realtime()) {
TransportShardMultiGetFomTranslogAction.Request mgetFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request(
request,
shardId
);
mgetFromTranslogRequest.setParentTask(request.getParentTask());
transportService.sendRequest(
node,
TransportShardMultiGetFomTranslogAction.NAME,
mgetFromTranslogRequest,
new ActionListenerResponseHandler<>(listener.delegateFailure((l, r) -> {
var responseHasMissingLocations = false;
for (int i = 0; i < r.multiGetShardResponse().locations.size(); i++) {
if (r.multiGetShardResponse().responses.get(i) == null && r.multiGetShardResponse().failures.get(i) == null) {
responseHasMissingLocations = true;
break;
}
}
if (responseHasMissingLocations == false) {
logger.debug("received result of all ids in real-time mget[shard] from the promotable shard.");
l.onResponse(r.multiGetShardResponse());
} else {
logger.debug(
"no result for some ids from the promotable shard (segment generation to wait for: {})",
r.segmentGeneration()
);
if (r.segmentGeneration() == -1) {
// Nothing to wait for (no previous unsafe generation), just handle the rest locally.
ActionRunnable.supply(l, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId)).run();
} else {
assert r.segmentGeneration() > -1L;
indexShard.waitForSegmentGeneration(
r.segmentGeneration(),
listener.delegateFailureAndWrap(
(ll, aLong) -> threadPool.executor(getExecutor(request, shardId))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to execute() asynchronously here? It seems like when execution is here (after the generation has been waited upon) we can also execute the handleLocalGets() function directly here.

Copy link
Member Author

@pxsalehi pxsalehi Jun 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is needed as otherwise we'd run handleLocalGets on a REFRESH thread.

.execute(
ActionRunnable.supply(ll, () -> handleLocalGets(request, r.multiGetShardResponse(), shardId))
)
)
);
}
}
}), TransportShardMultiGetFomTranslogAction.Response::new, getExecutor(request, shardId))
);
} else {
// A non-real-time mget with no explicit refresh requested.
super.asyncShardOperation(request, shardId, listener);
}
}

private MultiGetShardResponse handleLocalGets(MultiGetShardRequest request, MultiGetShardResponse response, ShardId shardId) {
logger.trace("handling local gets for missing locations");
for (int i = 0; i < response.locations.size(); i++) {
if (response.responses.get(i) == null && response.failures.get(i) == null) {
getAndAddToResponse(shardId, i, request, response);
}
}
return response;
}

private void getAndAddToResponse(ShardId shardId, int location, MultiGetShardRequest request, MultiGetShardResponse response) {
var indexShard = getIndexShard(shardId);
MultiGetRequest.Item item = request.items.get(location);
try {
GetResult getResult = indexShard.getService()
.get(
item.id(),
item.storedFields(),
request.realtime(),
item.version(),
item.versionType(),
item.fetchSourceContext(),
request.isForceSyntheticSource()
);
response.add(request.locations.get(location), new GetResponse(getResult));
} catch (RuntimeException e) {
if (TransportActions.isShardNotAvailableException(e)) {
throw e;
} else {
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e));
}
} catch (IOException e) {
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e);
response.add(request.locations.get(location), new MultiGetResponse.Failure(request.index(), item.id(), e));
}
}

private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, ActionListener<MultiGetShardResponse> listener)
throws IOException {
if (request.refresh() && request.realtime() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public TaskManager getTaskManager() {
when(clusterService.localNode()).thenReturn(transportService.getLocalNode());
when(clusterService.state()).thenReturn(clusterState);
when(clusterService.operationRouting()).thenReturn(operationRouting);
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);

shardAction = new TransportShardMultiGetAction(
clusterService,
Expand All @@ -173,7 +174,8 @@ public TaskManager getTaskManager() {
threadPool,
new ActionFilters(emptySet()),
new Resolver(),
EmptySystemIndices.INSTANCE.getExecutorSelector()
EmptySystemIndices.INSTANCE.getExecutorSelector(),
client
) {
@Override
protected void doExecute(Task task, MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {}
Expand Down