-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Stateless real-time mget #96763
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stateless real-time mget #96763
Conversation
d6ccc73
to
77e377f
Compare
77e377f
to
8788899
Compare
@@ -8,11 +8,6 @@ routing: | |||
index: | |||
number_of_shards: 5 | |||
number_of_routing_shards: 5 | |||
number_of_replicas: 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Stateless, these tests need a search shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but why do we remove the cluster health check below and in the other .yml file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's basically waiting for a green index which is not needed. See #94385 for more detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need to wait for a green index here, since otherwise the mget could fail in stateless in case the search shard is not yet available. AFAICS, the default is to wait for one active shard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is if I add wait for green, in stateful the test would never pass since default replica is 1 and we have a one node cluster. To make the test work for both stateful and stateless we need to do this. I've done the same change for a very similar (5 shard) test case for get. Please see 5010402. So far I haven't seen any failures. If it turns out to be an issue I think we'd need to clone the test or play with some related settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use auto-expand replicas 0-1 instead then? I think that would work in both setups.
I think this does introduce fragility into testing and we should try to avoid that if we can.
8b9f1b9
to
fd644d6
Compare
Pinging @elastic/es-distributed (Team:Distributed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general looks good to me. Just a couple of questions.
@@ -8,11 +8,6 @@ routing: | |||
index: | |||
number_of_shards: 5 | |||
number_of_routing_shards: 5 | |||
number_of_replicas: 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, but why do we remove the cluster health check below and in the other .yml file?
listener.delegateFailureAndWrap((l, replicationResponse) -> super.asyncShardOperation(request, shardId, l)) | ||
); | ||
} else if (request.realtime()) { | ||
TransportShardMultiGetFomTranslogAction.Request getFromTranslogRequest = new TransportShardMultiGetFomTranslogAction.Request( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit mgetFromTranslogRequest
may be better
} | ||
} | ||
|
||
// Returns the index of entries in response.locations that have a missing result with no failure on the promotable shard. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit the indices
/indexes
@@ -163,6 +303,17 @@ private void asyncShardMultiGet(MultiGetShardRequest request, ShardId shardId, A | |||
} | |||
} | |||
|
|||
private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit You can refactor this with the same one in TransportGetAction
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done in 36d8892 plus the other comments.
indexShard.waitForSegmentGeneration( | ||
r.segmentGeneration(), | ||
listener.delegateFailureAndWrap( | ||
(ll, aLong) -> threadPool.executor(getExecutor(request, shardId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to execute()
asynchronously here? It seems like when execution is here (after the generation has been waited upon) we can also execute the handleLocalGets()
function directly here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is needed as otherwise we'd run handleLocalGets
on a REFRESH
thread.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, left a few comments.
@@ -8,11 +8,6 @@ routing: | |||
index: | |||
number_of_shards: 5 | |||
number_of_routing_shards: 5 | |||
number_of_replicas: 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we do need to wait for a green index here, since otherwise the mget could fail in stateless in case the search shard is not yet available. AFAICS, the default is to wait for one active shard.
@@ -9,11 +9,6 @@ | |||
settings: | |||
index: | |||
refresh_interval: -1 | |||
number_of_replicas: 0 | |||
|
|||
- do: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise, I think we need the wait for green here for it to work in stateless.
@@ -180,7 +182,7 @@ private void asyncGet(GetRequest request, ShardId shardId, ActionListener<GetRes | |||
private void handleGetOnUnpromotableShard(GetRequest request, IndexShard indexShard, ActionListener<GetResponse> listener) | |||
throws IOException { | |||
ShardId shardId = indexShard.shardId(); | |||
DiscoveryNode node = getCurrentNodeOfPrimary(shardId); | |||
var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us grab the state()
only once to avoid the routing table and nodes being out of sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled this and the rest in 2f4cb8b.
private DiscoveryNode getCurrentNodeOfPrimary(ShardId shardId) { | ||
var clusterState = clusterService.state(); | ||
var shardRoutingTable = clusterState.routingTable().shardRoutingTable(shardId); | ||
static DiscoveryNode getCurrentNodeOfPrimary(RoutingTable routingTable, DiscoveryNodes nodes, ShardId shardId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would find it simpler to reason about this method if it received either the ClusterService
or the ClusterState
, since that avoids the possibility of the routingTable
and the nodes
being from different ClusterState
versions.
ActionListener<MultiGetShardResponse> listener | ||
) throws IOException { | ||
ShardId shardId = indexShard.shardId(); | ||
var node = getCurrentNodeOfPrimary(clusterService.state().routingTable(), clusterService.state().nodes(), shardId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us grab the state()
only once.
} | ||
|
||
// Returns the indices of entries in response.locations that have a missing result with no failure on the promotable shard. | ||
private static List<Integer> locationsWithMissingResults(TransportShardMultiGetFomTranslogAction.Response response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to loop over the response twice over collecting this list of Integer objects. I.e., we can first check if we have all results, early terminate if we do not. And then loop again to collect the missing results.
try { | ||
GetResult getResult = indexShard.getService() | ||
.get( | ||
item.id(), | ||
item.storedFields(), | ||
request.realtime(), | ||
item.version(), | ||
item.versionType(), | ||
item.fetchSourceContext(), | ||
request.isForceSyntheticSource() | ||
); | ||
response.add(request.locations.get(l), new GetResponse(getResult)); | ||
} catch (RuntimeException e) { | ||
if (TransportActions.isShardNotAvailableException(e)) { | ||
throw e; | ||
} else { | ||
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); | ||
response.add(request.locations.get(l), new MultiGetResponse.Failure(request.index(), item.id(), e)); | ||
} | ||
} catch (IOException e) { | ||
logger.debug(() -> format("%s failed to execute multi_get for [%s]", shardId, item.id()), e); | ||
response.add(request.locations.get(l), new MultiGetResponse.Failure(request.index(), item.id(), e)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can refactor this into a method shared with the similar code from shardOperation
? Unless I missed a detail, they look identical.
@elasticmachine update branch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -8,11 +8,6 @@ routing: | |||
index: | |||
number_of_shards: 5 | |||
number_of_routing_shards: 5 | |||
number_of_replicas: 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use auto-expand replicas 0-1 instead then? I think that would work in both setups.
I think this does introduce fragility into testing and we should try to avoid that if we can.
@elasticmachine please run elasticsearch-ci/part-3 |
As described in the issue, the change in elastic#96763 has made the MixedClusterClientYamlTestSuiteIT for mget fail very often. For now, let's take the same approach that we have for get. Closes elastic#97236
The mget counterpart of #93976.
Relates ES-5677