Skip to content

Commit c68da92

Browse files
Fix SearchableSnapshotAllocator for Replica Shards (#67135) (#67204)
The assertions in the existing `SnapshotSizeInfo` logic tripped for non-primary shards. This commit adds a path to the size data without those assertions and a test that the allocator works correctly for replica shards.
1 parent 7256e8b commit c68da92

File tree

2 files changed

+54
-2
lines changed

2 files changed

+54
-2
lines changed

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocationIntegTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,22 @@
77
package org.elasticsearch.xpack.searchablesnapshots;
88

99
import org.elasticsearch.cluster.ClusterState;
10+
import org.elasticsearch.cluster.metadata.IndexMetadata;
1011
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.unit.ByteSizeUnit;
1314
import org.elasticsearch.common.unit.ByteSizeValue;
15+
import org.elasticsearch.common.util.set.Sets;
1416
import org.elasticsearch.test.ESIntegTestCase;
1517
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
1618

1719
import java.util.Collections;
20+
import java.util.Set;
1821

1922
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
2023
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
24+
import static org.hamcrest.Matchers.in;
25+
import static org.hamcrest.Matchers.is;
2126

2227
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
2328
public class SearchableSnapshotAllocationIntegTests extends BaseSearchableSnapshotsIntegTestCase {
@@ -62,6 +67,46 @@ public void testAllocatesToBestAvailableNodeOnRestart() throws Exception {
6267
);
6368
}
6469

70+
public void testAllocatesReplicaToBestAvailableNodeOnRestart() throws Exception {
71+
internalCluster().startMasterOnlyNode();
72+
final String firstDataNode = internalCluster().startDataOnlyNode();
73+
final String secondDataNode = internalCluster().startDataOnlyNode();
74+
final String index = "test-idx";
75+
createIndexWithContent(index, indexSettingsNoReplicas(1).put(INDEX_SOFT_DELETES_SETTING.getKey(), true).build());
76+
final String repoName = "test-repo";
77+
createRepository(repoName, "fs");
78+
final String snapshotName = "test-snapshot";
79+
createSnapshot(repoName, snapshotName, Collections.singletonList(index));
80+
assertAcked(client().admin().indices().prepareDelete(index));
81+
final String restoredIndex = mountSnapshot(
82+
repoName,
83+
snapshotName,
84+
index,
85+
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build()
86+
);
87+
ensureGreen(restoredIndex);
88+
internalCluster().startDataOnlyNodes(randomIntBetween(1, 4));
89+
90+
setAllocation(EnableAllocationDecider.Allocation.NONE);
91+
92+
internalCluster().getInstance(CacheService.class, firstDataNode).synchronizeCache();
93+
internalCluster().getInstance(CacheService.class, secondDataNode).synchronizeCache();
94+
internalCluster().restartNode(firstDataNode);
95+
internalCluster().restartNode(secondDataNode);
96+
ensureStableCluster(internalCluster().numDataAndMasterNodes());
97+
98+
setAllocation(EnableAllocationDecider.Allocation.ALL);
99+
ensureGreen(restoredIndex);
100+
101+
final ClusterState state = client().admin().cluster().prepareState().get().getState();
102+
final Set<String> nodesWithCache = Sets.newHashSet(
103+
state.nodes().resolveNode(firstDataNode).getId(),
104+
state.nodes().resolveNode(secondDataNode).getId()
105+
);
106+
assertThat(state.routingTable().index(restoredIndex).shard(0).primaryShard().currentNodeId(), is(in(nodesWithCache)));
107+
assertThat(state.routingTable().index(restoredIndex).shard(0).replicaShards().get(0).currentNodeId(), is(in(nodesWithCache)));
108+
}
109+
65110
private void setAllocation(EnableAllocationDecider.Allocation allocation) {
66111
logger.info("--> setting allocation to [{}]", allocation);
67112
assertAcked(

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotAllocator.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.routing.allocation.NodeAllocationResult;
2727
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
2828
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
29+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
2930
import org.elasticsearch.common.Nullable;
3031
import org.elasticsearch.common.Priority;
3132
import org.elasticsearch.common.collect.Tuple;
@@ -104,7 +105,6 @@ public void allocateUnassigned(
104105
&& (shardRouting.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE
105106
|| shardRouting.recoverySource().getType() == RecoverySource.Type.EMPTY_STORE)) {
106107
// we always force snapshot recovery source to use the snapshot-based recovery process on the node
107-
108108
final Settings indexSettings = allocation.metadata().index(shardRouting.index()).getSettings();
109109
final IndexId indexId = new IndexId(
110110
SNAPSHOT_INDEX_NAME_SETTING.get(indexSettings),
@@ -136,7 +136,14 @@ public void allocateUnassigned(
136136
unassignedAllocationHandler.initialize(
137137
allocateUnassignedDecision.getTargetNode().getId(),
138138
allocateUnassignedDecision.getAllocationId(),
139-
allocation.snapshotShardSizeInfo().getShardSize(shardRouting, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE),
139+
DiskThresholdDecider.getExpectedShardSize(
140+
shardRouting,
141+
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE,
142+
allocation.clusterInfo(),
143+
allocation.snapshotShardSizeInfo(),
144+
allocation.metadata(),
145+
allocation.routingTable()
146+
),
140147
allocation.changes()
141148
);
142149
} else {

0 commit comments

Comments
 (0)