Skip to content

Commit 0bedc86

Browse files
committed
Ignore disk watermarks on partial shards (elastic#68673)
Today the disk threshold decider applies even to partially-restored shards, which makes no sense since these shards effectively consume no disk space of their own. With this commit the disk threshold decider now freely permits the allocation of these shards.
1 parent 8237426 commit 0bedc86

File tree

6 files changed

+142
-4
lines changed

6 files changed

+142
-4
lines changed

server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ public class DiskThresholdDecider extends AllocationDecider {
7272
public static final Setting<Boolean> ENABLE_FOR_SINGLE_DATA_NODE =
7373
Setting.boolSetting("cluster.routing.allocation.disk.watermark.enable_for_single_data_node", false, Setting.Property.NodeScope);
7474

75+
public static final Setting<Boolean> SETTING_IGNORE_DISK_WATERMARKS =
76+
Setting.boolSetting("index.routing.allocation.disk.watermark.ignore", false,
77+
Setting.Property.IndexScope, Setting.Property.PrivateIndex);
78+
7579
private final DiskThresholdSettings diskThresholdSettings;
7680
private final boolean enableForSingleDataNode;
7781

@@ -133,6 +137,9 @@ public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShar
133137
private static final Decision YES_UNALLOCATED_PRIMARY_BETWEEN_WATERMARKS = Decision.single(Decision.Type.YES, NAME, "the node " +
134138
"is above the low watermark, but less than the high watermark, and this primary shard has never been allocated before");
135139

140+
private static final Decision YES_DISK_WATERMARKS_IGNORED = Decision.single(Decision.Type.YES, NAME,
141+
"disk watermarks are ignored on this index");
142+
136143
@Override
137144
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
138145
ImmutableOpenMap<String, DiskUsage> usages = allocation.clusterInfo().getNodeMostAvailableDiskUsages();
@@ -141,6 +148,10 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
141148
return decision;
142149
}
143150

151+
if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) {
152+
return YES_DISK_WATERMARKS_IGNORED;
153+
}
154+
144155
final double usedDiskThresholdLow = 100.0 - diskThresholdSettings.getFreeDiskThresholdLow();
145156
final double usedDiskThresholdHigh = 100.0 - diskThresholdSettings.getFreeDiskThresholdHigh();
146157

@@ -308,6 +319,10 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
308319
return decision;
309320
}
310321

322+
if (SETTING_IGNORE_DISK_WATERMARKS.get(allocation.metadata().index(shardRouting.index()).getSettings())) {
323+
return YES_DISK_WATERMARKS_IGNORED;
324+
}
325+
311326
// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
312327
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
313328
final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, true);

server/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
1212
import org.elasticsearch.cluster.routing.UnassignedInfo;
1313
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
14+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
1415
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
1516
import org.elasticsearch.cluster.routing.allocation.decider.MaxRetryAllocationDecider;
1617
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
@@ -162,6 +163,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
162163
IndexSettings.FINAL_PIPELINE,
163164
MetadataIndexStateService.VERIFIED_BEFORE_CLOSE_SETTING,
164165
ExistingShardsAllocator.EXISTING_SHARDS_ALLOCATOR_SETTING,
166+
DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS,
165167

166168
// validate that built-in similarities don't get redefined
167169
Setting.groupSetting("index.similarity.", (s) -> {

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.cluster.routing.RecoverySource;
4747
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
4848
import org.elasticsearch.cluster.routing.ShardRouting;
49+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
4950
import org.elasticsearch.common.Booleans;
5051
import org.elasticsearch.common.CheckedConsumer;
5152
import org.elasticsearch.common.CheckedFunction;
@@ -1085,9 +1086,16 @@ public GetStats getStats() {
10851086

10861087
public StoreStats storeStats() {
10871088
try {
1088-
final RecoveryState recoveryState = this.recoveryState;
1089-
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
1090-
return store.stats(bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover);
1089+
final long reservedBytes;
1090+
if (DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(indexSettings.getSettings())) {
1091+
// if this shard has no disk footprint then it also needs no reserved space
1092+
reservedBytes = 0L;
1093+
} else {
1094+
final RecoveryState recoveryState = this.recoveryState;
1095+
final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover();
1096+
reservedBytes = bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover;
1097+
}
1098+
return store.stats(reservedBytes);
10911099
} catch (IOException e) {
10921100
failShard("Failing shard because of exception during storeStats", e);
10931101
throw new ElasticsearchException("io exception while building 'store stats'", e);

server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -463,4 +463,59 @@ public void testDiskUsageWithRelocations() {
463463
assertThat(new DiskThresholdDecider.DiskUsageWithRelocations(
464464
new DiskUsage("n", "n", "/dev/null", Long.MAX_VALUE, Long.MAX_VALUE), -10).getFreeBytes(), equalTo(Long.MAX_VALUE));
465465
}
466+
467+
public void testDecidesYesIfWatermarksIgnored() {
468+
ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
469+
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss);
470+
471+
Metadata metadata = Metadata.builder()
472+
.put(IndexMetadata.builder("test")
473+
.settings(settings(Version.CURRENT).put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true))
474+
.numberOfShards(1)
475+
.numberOfReplicas(1))
476+
.build();
477+
478+
final Index index = metadata.index("test").getIndex();
479+
480+
ShardRouting test_0 = ShardRouting.newUnassigned(new ShardId(index, 0), true, EmptyStoreRecoverySource.INSTANCE,
481+
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
482+
DiscoveryNode node_0 = new DiscoveryNode("node_0", buildNewFakeTransportAddress(), Collections.emptyMap(),
483+
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT);
484+
DiscoveryNode node_1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Collections.emptyMap(),
485+
new HashSet<>(DiscoveryNodeRole.BUILT_IN_ROLES), Version.CURRENT);
486+
487+
RoutingTable routingTable = RoutingTable.builder()
488+
.addAsNew(metadata.index("test"))
489+
.build();
490+
491+
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
492+
.metadata(metadata).routingTable(routingTable).build();
493+
494+
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
495+
.add(node_0)
496+
.add(node_1)
497+
).build();
498+
499+
// actual test -- after all that bloat :)
500+
ImmutableOpenMap.Builder<String, DiskUsage> allFullUsages = ImmutableOpenMap.builder();
501+
allFullUsages.put("node_0", new DiskUsage("node_0", "node_0", "_na_", 100, 0)); // all full
502+
allFullUsages.put("node_1", new DiskUsage("node_1", "node_1", "_na_", 100, 0)); // all full
503+
504+
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
505+
shardSizes.put("[test][0][p]", 10L); // 10 bytes
506+
final ImmutableOpenMap<String, DiskUsage> usages = allFullUsages.build();
507+
final ClusterInfo clusterInfo = new ClusterInfo(usages, usages, shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
508+
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)),
509+
clusterState.getRoutingNodes(), clusterState, clusterInfo, null, System.nanoTime());
510+
allocation.debugDecision(true);
511+
final RoutingNode routingNode = new RoutingNode("node_0", node_0);
512+
Decision decision = decider.canAllocate(test_0, routingNode, allocation);
513+
assertThat(decision.type(), equalTo(Decision.Type.YES));
514+
assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index"));
515+
516+
decision = decider.canRemain(test_0.initialize(node_0.getId(), null, 0L).moveToStarted(), routingNode, allocation);
517+
assertThat(decision.type(), equalTo(Decision.Type.YES));
518+
assertThat(decision.getExplanation(), containsString("disk watermarks are ignored on this index"));
519+
}
520+
466521
}

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2929
import org.elasticsearch.cluster.node.DiscoveryNode;
3030
import org.elasticsearch.cluster.routing.ShardRouting;
31+
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
32+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
3133
import org.elasticsearch.common.Priority;
3234
import org.elasticsearch.common.Strings;
3335
import org.elasticsearch.common.settings.Settings;
@@ -39,11 +41,13 @@
3941
import org.elasticsearch.env.NodeEnvironment;
4042
import org.elasticsearch.index.Index;
4143
import org.elasticsearch.index.IndexModule;
44+
import org.elasticsearch.index.IndexNotFoundException;
4245
import org.elasticsearch.index.IndexSettings;
4346
import org.elasticsearch.index.mapper.DateFieldMapper;
4447
import org.elasticsearch.index.shard.IndexLongFieldRange;
4548
import org.elasticsearch.index.shard.ShardId;
4649
import org.elasticsearch.index.shard.ShardPath;
50+
import org.elasticsearch.indices.IndexClosedException;
4751
import org.elasticsearch.indices.IndicesService;
4852
import org.elasticsearch.indices.recovery.RecoveryState;
4953
import org.elasticsearch.repositories.RepositoryData;
@@ -78,6 +82,7 @@
7882
import java.util.concurrent.CountDownLatch;
7983
import java.util.concurrent.CyclicBarrier;
8084
import java.util.concurrent.TimeUnit;
85+
import java.util.concurrent.atomic.AtomicBoolean;
8186
import java.util.stream.Collectors;
8287
import java.util.stream.IntStream;
8388
import java.util.stream.Stream;
@@ -100,6 +105,7 @@
100105
import static org.hamcrest.Matchers.hasSize;
101106
import static org.hamcrest.Matchers.lessThanOrEqualTo;
102107
import static org.hamcrest.Matchers.not;
108+
import static org.hamcrest.Matchers.oneOf;
103109
import static org.hamcrest.Matchers.sameInstance;
104110

105111
public class SearchableSnapshotsIntegTests extends BaseSearchableSnapshotsIntegTestCase {
@@ -482,6 +488,28 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
482488
expectedDataTiersPreference = getDataTiersPreference(MountSearchableSnapshotRequest.Storage.SHARED_CACHE);
483489
}
484490

491+
final AtomicBoolean statsWatcherRunning = new AtomicBoolean(true);
492+
final Thread statsWatcher = new Thread(() -> {
493+
while (statsWatcherRunning.get()) {
494+
final IndicesStatsResponse indicesStatsResponse;
495+
try {
496+
indicesStatsResponse = client().admin().indices().prepareStats(restoredIndexName).clear().setStore(true).get();
497+
} catch (IndexNotFoundException | IndexClosedException e) {
498+
continue;
499+
// ok
500+
}
501+
502+
for (ShardStats shardStats : indicesStatsResponse.getShards()) {
503+
assertThat(
504+
shardStats.getShardRouting().toString(),
505+
shardStats.getStats().getStore().getReservedSize().getBytes(),
506+
equalTo(0L)
507+
);
508+
}
509+
}
510+
}, "test-stats-watcher");
511+
statsWatcher.start();
512+
485513
final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
486514
restoredIndexName,
487515
fsRepoName,
@@ -496,6 +524,9 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
496524
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
497525
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
498526

527+
statsWatcherRunning.set(false);
528+
statsWatcher.join();
529+
499530
final Settings settings = client().admin()
500531
.indices()
501532
.prepareGetSettings(restoredIndexName)
@@ -511,6 +542,8 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
511542
assertThat(IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING.get(settings).toString(), equalTo("false"));
512543
assertThat(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings), equalTo(expectedReplicas));
513544
assertThat(DataTierAllocationDecider.INDEX_ROUTING_PREFER_SETTING.get(settings), equalTo(expectedDataTiersPreference));
545+
assertTrue(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.get(settings));
546+
assertTrue(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.get(settings));
514547

515548
assertTotalHits(restoredIndexName, originalAllHits, originalBarHits);
516549
assertRecoveryStats(restoredIndexName, false);
@@ -550,6 +583,29 @@ public void testCreateAndRestorePartialSearchableSnapshot() throws Exception {
550583
assertThat(client().admin().indices().prepareGetAliases(aliasName).get().getAliases().size(), equalTo(1));
551584
assertTotalHits(aliasName, originalAllHits, originalBarHits);
552585

586+
final Decision diskDeciderDecision = client().admin()
587+
.cluster()
588+
.prepareAllocationExplain()
589+
.setIndex(restoredIndexName)
590+
.setShard(0)
591+
.setPrimary(true)
592+
.setIncludeYesDecisions(true)
593+
.get()
594+
.getExplanation()
595+
.getShardAllocationDecision()
596+
.getMoveDecision()
597+
.getCanRemainDecision()
598+
.getDecisions()
599+
.stream()
600+
.filter(d -> d.label().equals(DiskThresholdDecider.NAME))
601+
.findFirst()
602+
.orElseThrow(() -> new AssertionError("not found"));
603+
assertThat(diskDeciderDecision.type(), equalTo(Decision.Type.YES));
604+
assertThat(
605+
diskDeciderDecision.getExplanation(),
606+
oneOf("disk watermarks are ignored on this index", "there is only a single data node present")
607+
);
608+
553609
internalCluster().fullRestart();
554610
assertTotalHits(restoredIndexName, originalAllHits, originalBarHits);
555611
assertRecoveryStats(restoredIndexName, false);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/action/TransportMountSearchableSnapshotAction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.cluster.metadata.IndexMetadata;
2121
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2222
import org.elasticsearch.cluster.routing.allocation.ExistingShardsAllocator;
23+
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
2324
import org.elasticsearch.cluster.service.ClusterService;
2425
import org.elasticsearch.common.inject.Inject;
2526
import org.elasticsearch.common.settings.Settings;
@@ -128,7 +129,8 @@ private static Settings buildIndexSettings(
128129
.put(INDEX_RECOVERY_TYPE_SETTING.getKey(), SearchableSnapshotsConstants.SNAPSHOT_RECOVERY_STATE_FACTORY_KEY);
129130

130131
if (storage == MountSearchableSnapshotRequest.Storage.SHARED_CACHE) {
131-
settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true);
132+
settings.put(SearchableSnapshots.SNAPSHOT_PARTIAL_SETTING.getKey(), true)
133+
.put(DiskThresholdDecider.SETTING_IGNORE_DISK_WATERMARKS.getKey(), true);
132134
}
133135

134136
return settings.build();

0 commit comments

Comments
 (0)