Skip to content

Commit 256b838

Browse files
authored
Avoid async cache-size fetch on partial shards (#68644)
Today we perform an async fetch for every searchable snapshot shard while allocating it, so that we can prefer to allocate it to the node that holds the warmest cache for that shard. For partial shards, there is no persistently-cached data to reuse, so we can skip the async fetch.
1 parent 8ad476a commit 256b838

File tree

3 files changed

+71
-5
lines changed

3 files changed

+71
-5
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
5858
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
59+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING;
5960
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_REPOSITORY_NAME_SETTING;
6061
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
6162
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_NAME_SETTING;
@@ -262,6 +263,12 @@ public int getNumberOfInFlightFetches() {
262263
private AsyncShardFetch.FetchResult<NodeCacheFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation) {
263264
final ShardId shardId = shard.shardId();
264265
final Settings indexSettings = allocation.metadata().index(shard.index()).getSettings();
266+
267+
if (SNAPSHOT_PARTIAL_SETTING.get(indexSettings)) {
268+
// cached data for partial indices is not persistent, no need to fetch it
269+
return new AsyncShardFetch.FetchResult<>(shardId, Collections.emptyMap(), Collections.emptySet());
270+
}
271+
265272
final SnapshotId snapshotId = new SnapshotId(
266273
SNAPSHOT_SNAPSHOT_NAME_SETTING.get(indexSettings),
267274
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings)

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/cache/TransportSearchableSnapshotCacheStoresAction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030

3131
import java.io.IOException;
3232
import java.util.List;
33+
import java.util.Optional;
34+
35+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING;
3336

3437
public class TransportSearchableSnapshotCacheStoresAction extends TransportNodesAction<
3538
TransportSearchableSnapshotCacheStoresAction.Request,
@@ -88,6 +91,9 @@ protected NodeCacheFilesMetadata newNodeResponse(StreamInput in) throws IOExcept
8891
@Override
8992
protected NodeCacheFilesMetadata nodeOperation(NodeRequest request, Task task) {
9093
assert cacheService != null;
94+
assert Optional.ofNullable(clusterService.state().metadata().index(request.shardId.getIndex()))
95+
.map(indexMetadata -> SNAPSHOT_PARTIAL_SETTING.get(indexMetadata.getSettings()))
96+
.orElse(false) == false : request.shardId + " is partial, should not be fetching its cached size";
9197
return new NodeCacheFilesMetadata(clusterService.localNode(), cacheService.getCachedSize(request.shardId, request.snapshotId));
9298
}
9399

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocatorTests.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@
4646
import java.util.concurrent.TimeUnit;
4747
import java.util.concurrent.atomic.AtomicInteger;
4848
import java.util.function.Function;
49+
import java.util.function.UnaryOperator;
4950
import java.util.stream.Collectors;
5051

5152
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
53+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING;
5254
import static org.hamcrest.Matchers.empty;
5355

5456
public class SearchableSnapshotAllocatorTests extends ESAllocationTestCase {
@@ -153,7 +155,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
153155
Request request,
154156
ActionListener<Response> listener
155157
) {
156-
throw new AssertionError("Expecting no requests but received [" + action + "]");
158+
throw new AssertionError("Expecting no requests but received [" + action.name() + "]");
157159
}
158160
};
159161

@@ -167,15 +169,66 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
167169
assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned());
168170
}
169171

172+
public void testNoFetchesForPartialIndex() {
173+
final ShardId shardId = new ShardId("test", "_na_", 0);
174+
final List<DiscoveryNode> nodes = randomList(1, 10, () -> newNode("node-" + UUIDs.randomBase64UUID(random())));
175+
final DiscoveryNode localNode = randomFrom(nodes);
176+
final Settings localNodeSettings = Settings.builder().put(NODE_NAME_SETTING.getKey(), localNode.getName()).build();
177+
178+
final DeterministicTaskQueue deterministicTaskQueue = new DeterministicTaskQueue(localNodeSettings, random());
179+
180+
final Metadata metadata = buildSingleShardIndexMetadata(shardId, builder -> builder.put(SNAPSHOT_PARTIAL_SETTING.getKey(), true));
181+
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
182+
routingTableBuilder.addAsRestore(metadata.index(shardId.getIndex()), randomSnapshotSource(shardId));
183+
184+
final ClusterState state = buildClusterState(nodes, metadata, routingTableBuilder);
185+
final RoutingAllocation allocation = buildAllocation(
186+
deterministicTaskQueue,
187+
state,
188+
randomNonNegativeLong(),
189+
yesAllocationDeciders()
190+
);
191+
192+
final Client client = new NoOpNodeClient(deterministicTaskQueue.getThreadPool()) {
193+
@Override
194+
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
195+
ActionType<Response> action,
196+
Request request,
197+
ActionListener<Response> listener
198+
) {
199+
throw new AssertionError("Expecting no requests but received [" + action.name() + "]");
200+
}
201+
};
202+
203+
final SearchableSnapshotAllocator allocator = new SearchableSnapshotAllocator(
204+
client,
205+
(reason, priority, listener) -> { throw new AssertionError("Expecting no reroutes"); }
206+
);
207+
allocateAllUnassigned(allocation, allocator);
208+
assertFalse(allocation.routingNodesChanged());
209+
assertThat(allocation.routingNodes().assignedShards(shardId), empty());
210+
assertTrue(allocation.routingTable().index(shardId.getIndex()).allPrimaryShardsUnassigned());
211+
}
212+
170213
private static Metadata buildSingleShardIndexMetadata(ShardId shardId) {
214+
return buildSingleShardIndexMetadata(shardId, UnaryOperator.identity());
215+
}
216+
217+
private static Metadata buildSingleShardIndexMetadata(ShardId shardId, UnaryOperator<Settings.Builder> extraSettings) {
171218
return Metadata.builder()
172219
.put(
173220
IndexMetadata.builder(shardId.getIndexName())
174221
.settings(
175-
settings(Version.CURRENT).put(
176-
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(),
177-
SearchableSnapshotAllocator.ALLOCATOR_NAME
178-
).put(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY)
222+
extraSettings.apply(
223+
settings(Version.CURRENT).put(
224+
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING.getKey(),
225+
SearchableSnapshotAllocator.ALLOCATOR_NAME
226+
)
227+
.put(
228+
IndexModule.INDEX_STORE_TYPE_SETTING.getKey(),
229+
SearchableSnapshotsConstants.SNAPSHOT_DIRECTORY_FACTORY_KEY
230+
)
231+
)
179232
)
180233
.numberOfShards(1)
181234
.numberOfReplicas(0)

0 commit comments

Comments
 (0)