Skip to content

Commit 2fe2c80

Browse files
authored
Revert "Make nodePaths() singular (#72514)" (#78801)
This reverts commit 2dc796a. The revert was mostly clean, but it required adjusting NodeEnvironment.upgradeLegacyNodeFolders to retain #74921. relates #78525 relates #71205
1 parent f2360e8 commit 2fe2c80

File tree

9 files changed

+186
-95
lines changed

9 files changed

+186
-95
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/ElasticsearchNodeCommand.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.Arrays;
4444
import java.util.EnumSet;
4545
import java.util.Map;
46+
import java.util.Objects;
4647

4748
public abstract class ElasticsearchNodeCommand extends EnvironmentAwareCommand {
4849
private static final Logger logger = LogManager.getLogger(ElasticsearchNodeCommand.class);
@@ -130,11 +131,12 @@ public static Tuple<Long, ClusterState> loadTermAndClusterState(PersistedCluster
130131
protected void processNodePaths(Terminal terminal, OptionSet options, Environment env) throws IOException, UserException {
131132
terminal.println(Terminal.Verbosity.VERBOSE, "Obtaining lock for node");
132133
try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, env, Files::exists)) {
133-
final NodeEnvironment.NodePath dataPath = lock.getNodePath();
134-
if (dataPath == null) {
134+
final Path[] dataPaths =
135+
Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
136+
if (dataPaths.length == 0) {
135137
throw new ElasticsearchException(NO_NODE_FOLDER_FOUND_MSG);
136138
}
137-
processNodePaths(terminal, new Path[] { dataPath.path }, options, env);
139+
processNodePaths(terminal, dataPaths, options, env);
138140
} catch (LockObtainFailedException e) {
139141
throw new ElasticsearchException(FAILED_TO_OBTAIN_NODE_LOCK_MSG, e);
140142
}

server/src/main/java/org/elasticsearch/env/NodeEnvironment.java

Lines changed: 61 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,8 @@ public NodeLock(final Logger logger,
220220
}
221221
}
222222

223-
public NodePath getNodePath() {
224-
return nodePaths[0];
223+
public NodePath[] getNodePaths() {
224+
return nodePaths;
225225
}
226226

227227
@Override
@@ -352,71 +352,73 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings
352352
}
353353

354354
// move contents from legacy path to new path
355+
assert nodeLock.getNodePaths().length == legacyNodeLock.getNodePaths().length;
355356
try {
356357
final List<CheckedRunnable<IOException>> upgradeActions = new ArrayList<>();
357-
final NodePath legacyNodePath = legacyNodeLock.getNodePath();
358-
final NodePath nodePath = nodeLock.getNodePath();
359-
360-
// determine folders to move and check that there are no extra files/folders
361-
final Set<String> folderNames = new HashSet<>();
362-
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(
363-
364-
// node state directory, containing MetadataStateFormat-based node metadata as well as cluster state
365-
MetadataStateFormat.STATE_DIR_NAME,
366-
367-
// indices
368-
INDICES_FOLDER,
369-
370-
// searchable snapshot cache Lucene index
371-
SNAPSHOT_CACHE_FOLDER
372-
));
373-
374-
final Set<String> ignoredFileNames = new HashSet<>(Arrays.asList(
375-
NODE_LOCK_FILENAME,
376-
TEMP_FILE_NAME,
377-
TEMP_FILE_NAME + ".tmp",
378-
TEMP_FILE_NAME + ".final"
379-
));
380-
381-
try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
382-
for (Path subFolderPath : stream) {
383-
final String fileName = subFolderPath.getFileName().toString();
384-
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
385-
// ignore
386-
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
387-
if (expectedFolderNames.contains(fileName) == false) {
388-
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
358+
for (int i = 0; i < legacyNodeLock.getNodePaths().length; i++) {
359+
final NodePath legacyNodePath = legacyNodeLock.getNodePaths()[i];
360+
final NodePath nodePath = nodeLock.getNodePaths()[i];
361+
362+
// determine folders to move and check that there are no extra files/folders
363+
final Set<String> folderNames = new HashSet<>();
364+
final Set<String> expectedFolderNames = new HashSet<>(Arrays.asList(
365+
366+
// node state directory, containing MetadataStateFormat-based node metadata as well as cluster state
367+
MetadataStateFormat.STATE_DIR_NAME,
368+
369+
// indices
370+
INDICES_FOLDER,
371+
372+
// searchable snapshot cache Lucene index
373+
SNAPSHOT_CACHE_FOLDER
374+
));
375+
376+
final Set<String> ignoredFileNames = new HashSet<>(Arrays.asList(
377+
NODE_LOCK_FILENAME,
378+
TEMP_FILE_NAME,
379+
TEMP_FILE_NAME + ".tmp",
380+
TEMP_FILE_NAME + ".final"
381+
));
382+
383+
try (DirectoryStream<Path> stream = Files.newDirectoryStream(legacyNodePath.path)) {
384+
for (Path subFolderPath : stream) {
385+
final String fileName = subFolderPath.getFileName().toString();
386+
if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) {
387+
// ignore
388+
} else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) {
389+
if (expectedFolderNames.contains(fileName) == false) {
390+
throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " +
391+
subFolderPath);
392+
}
393+
final Path targetSubFolderPath = nodePath.path.resolve(fileName);
394+
if (Files.exists(targetSubFolderPath)) {
395+
throw new IllegalStateException("target folder already exists during data folder upgrade: " +
396+
targetSubFolderPath);
397+
}
398+
folderNames.add(fileName);
399+
} else if (ignoredFileNames.contains(fileName) == false) {
400+
throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " +
389401
subFolderPath);
390402
}
391-
final Path targetSubFolderPath = nodePath.path.resolve(fileName);
392-
if (Files.exists(targetSubFolderPath)) {
393-
throw new IllegalStateException("target folder already exists during data folder upgrade: " +
394-
targetSubFolderPath);
395-
}
396-
folderNames.add(fileName);
397-
} else if (ignoredFileNames.contains(fileName) == false) {
398-
throw new IllegalStateException("unexpected file/folder encountered during data folder upgrade: " +
399-
subFolderPath);
400403
}
401404
}
402-
}
403405

404-
assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
405-
"expected indices and/or state dir folder but was " + folderNames;
406-
407-
upgradeActions.add(() -> {
408-
for (String folderName : folderNames) {
409-
final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName);
410-
final Path targetSubFolderPath = nodePath.path.resolve(folderName);
411-
Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE);
412-
logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath);
413-
}
414-
IOUtils.fsync(nodePath.path, true);
415-
});
406+
assert Sets.difference(folderNames, expectedFolderNames).isEmpty() :
407+
"expected indices and/or state dir folder but was " + folderNames;
416408

409+
upgradeActions.add(() -> {
410+
for (String folderName : folderNames) {
411+
final Path sourceSubFolderPath = legacyNodePath.path.resolve(folderName);
412+
final Path targetSubFolderPath = nodePath.path.resolve(folderName);
413+
Files.move(sourceSubFolderPath, targetSubFolderPath, StandardCopyOption.ATOMIC_MOVE);
414+
logger.info("data folder upgrade: moved from [{}] to [{}]", sourceSubFolderPath, targetSubFolderPath);
415+
}
416+
IOUtils.fsync(nodePath.path, true);
417+
});
418+
}
417419
// now do the actual upgrade. start by upgrading the node metadata file before moving anything, since a downgrade in an
418420
// intermediate state would be pretty disastrous
419-
loadNodeMetadata(settings, logger, legacyNodeLock.getNodePath());
421+
loadNodeMetadata(settings, logger, legacyNodeLock.getNodePaths());
420422
for (CheckedRunnable<IOException> upgradeAction : upgradeActions) {
421423
upgradeAction.run();
422424
}
@@ -924,12 +926,12 @@ public String nodeId() {
924926
/**
925927
* Returns an array of all of the {@link NodePath}s.
926928
*/
927-
public NodePath nodePath() {
929+
public NodePath[] nodePaths() {
928930
assertEnvIsLocked();
929931
if (nodePaths == null || locks == null) {
930932
throw new IllegalStateException("node is not configured to store local location");
931933
}
932-
return nodePaths[0];
934+
return nodePaths;
933935
}
934936

935937
/**

server/src/main/java/org/elasticsearch/index/shard/ShardPath.java

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,20 @@
99

1010
import org.apache.logging.log4j.Logger;
1111
import org.apache.logging.log4j.util.Strings;
12+
import org.elasticsearch.core.internal.io.IOUtils;
1213
import org.elasticsearch.cluster.metadata.IndexMetadata;
1314
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
14-
import org.elasticsearch.core.internal.io.IOUtils;
1515
import org.elasticsearch.env.NodeEnvironment;
1616
import org.elasticsearch.env.ShardLock;
1717
import org.elasticsearch.index.IndexSettings;
1818

1919
import java.io.IOException;
20+
import java.math.BigInteger;
21+
import java.nio.file.FileStore;
2022
import java.nio.file.Files;
2123
import java.nio.file.Path;
24+
import java.util.Arrays;
25+
import java.util.HashMap;
2226
import java.util.Map;
2327
import java.util.Objects;
2428
import java.util.function.Consumer;
@@ -188,13 +192,85 @@ public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shard
188192

189193
if (indexSettings.hasCustomDataPath()) {
190194
dataPath = env.resolveCustomLocation(indexSettings.customDataPath(), shardId);
191-
statePath = env.nodePath().resolve(shardId);
195+
statePath = env.nodePaths()[0].resolve(shardId);
192196
} else {
193-
dataPath = statePath = env.nodePath().resolve(shardId);
197+
BigInteger totFreeSpace = BigInteger.ZERO;
198+
for (NodeEnvironment.NodePath nodePath : env.nodePaths()) {
199+
totFreeSpace = totFreeSpace.add(BigInteger.valueOf(nodePath.fileStore.getUsableSpace()));
200+
}
201+
202+
// TODO: this is a hack!! We should instead keep track of incoming (relocated) shards since we know
203+
// how large they will be once they're done copying, instead of a silly guess for such cases:
204+
205+
// Very rough heuristic of how much disk space we expect the shard will use over its lifetime, the max of current average
206+
// shard size across the cluster and 5% of the total available free space on this node:
207+
BigInteger estShardSizeInBytes = BigInteger.valueOf(avgShardSizeInBytes).max(totFreeSpace.divide(BigInteger.valueOf(20)));
208+
209+
// TODO - do we need something more extensible? Yet, this does the job for now...
210+
final NodeEnvironment.NodePath[] paths = env.nodePaths();
211+
212+
// If no better path is chosen, use the one with the most space by default
213+
NodeEnvironment.NodePath bestPath = getPathWithMostFreeSpace(env);
214+
215+
if (paths.length != 1) {
216+
Map<NodeEnvironment.NodePath, Long> pathToShardCount = env.shardCountPerPath(shardId.getIndex());
217+
218+
// Compute how much space there is on each path
219+
final Map<NodeEnvironment.NodePath, BigInteger> pathsToSpace = new HashMap<>(paths.length);
220+
for (NodeEnvironment.NodePath nodePath : paths) {
221+
FileStore fileStore = nodePath.fileStore;
222+
BigInteger usableBytes = BigInteger.valueOf(fileStore.getUsableSpace());
223+
pathsToSpace.put(nodePath, usableBytes);
224+
}
225+
226+
bestPath = Arrays.stream(paths)
227+
// Filter out paths that have enough space
228+
.filter((path) -> pathsToSpace.get(path).subtract(estShardSizeInBytes).compareTo(BigInteger.ZERO) > 0)
229+
// Sort by the number of shards for this index
230+
.sorted((p1, p2) -> {
231+
int cmp = Long.compare(pathToShardCount.getOrDefault(p1, 0L),
232+
pathToShardCount.getOrDefault(p2, 0L));
233+
if (cmp == 0) {
234+
// if the number of shards is equal, tie-break with the number of total shards
235+
cmp = Integer.compare(dataPathToShardCount.getOrDefault(p1.path, 0),
236+
dataPathToShardCount.getOrDefault(p2.path, 0));
237+
if (cmp == 0) {
238+
// if the number of shards is equal, tie-break with the usable bytes
239+
cmp = pathsToSpace.get(p2).compareTo(pathsToSpace.get(p1));
240+
}
241+
}
242+
return cmp;
243+
})
244+
// Return the first result
245+
.findFirst()
246+
// Or the existing best path if there aren't any that fit the criteria
247+
.orElse(bestPath);
248+
}
249+
250+
statePath = bestPath.resolve(shardId);
251+
dataPath = statePath;
194252
}
195253
return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId);
196254
}
197255

256+
static NodeEnvironment.NodePath getPathWithMostFreeSpace(NodeEnvironment env) throws IOException {
257+
final NodeEnvironment.NodePath[] paths = env.nodePaths();
258+
NodeEnvironment.NodePath bestPath = null;
259+
long maxUsableBytes = Long.MIN_VALUE;
260+
for (NodeEnvironment.NodePath nodePath : paths) {
261+
FileStore fileStore = nodePath.fileStore;
262+
long usableBytes = fileStore.getUsableSpace(); // NB usable bytes doesn't account for reserved space (e.g. incoming recoveries)
263+
assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes;
264+
265+
if (bestPath == null || usableBytes > maxUsableBytes) {
266+
// This path has been determined to be "better" based on the usable bytes
267+
maxUsableBytes = usableBytes;
268+
bestPath = nodePath;
269+
}
270+
}
271+
return bestPath;
272+
}
273+
198274
@Override
199275
public boolean equals(Object o) {
200276
if (this == o) {

server/src/main/java/org/elasticsearch/monitor/fs/FsProbe.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,22 @@ public FsInfo stats(FsInfo previous) throws IOException {
4242
if (nodeEnv.hasNodeFile() == false) {
4343
return new FsInfo(System.currentTimeMillis(), null, new FsInfo.Path[0]);
4444
}
45-
NodePath dataLocation = nodeEnv.nodePath();
46-
FsInfo.Path pathInfo = getFSInfo(dataLocation);
45+
NodePath[] dataLocations = nodeEnv.nodePaths();
46+
FsInfo.Path[] paths = new FsInfo.Path[dataLocations.length];
47+
for (int i = 0; i < dataLocations.length; i++) {
48+
paths[i] = getFSInfo(dataLocations[i]);
49+
}
4750
FsInfo.IoStats ioStats = null;
4851
if (Constants.LINUX) {
4952
Set<Tuple<Integer, Integer>> devicesNumbers = new HashSet<>();
50-
if (dataLocation.majorDeviceNumber != -1 && dataLocation.minorDeviceNumber != -1) {
51-
devicesNumbers.add(Tuple.tuple(dataLocation.majorDeviceNumber, dataLocation.minorDeviceNumber));
53+
for (int i = 0; i < dataLocations.length; i++) {
54+
if (dataLocations[i].majorDeviceNumber != -1 && dataLocations[i].minorDeviceNumber != -1) {
55+
devicesNumbers.add(Tuple.tuple(dataLocations[i].majorDeviceNumber, dataLocations[i].minorDeviceNumber));
56+
}
5257
}
5358
ioStats = ioStats(devicesNumbers, previous);
5459
}
55-
return new FsInfo(System.currentTimeMillis(), ioStats, new FsInfo.Path[] { pathInfo });
60+
return new FsInfo(System.currentTimeMillis(), ioStats, paths);
5661
}
5762

5863
final FsInfo.IoStats ioStats(final Set<Tuple<Integer, Integer>> devicesNumbers, final FsInfo previous) {

server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void testDeleteSafe() throws Exception {
212212
SetOnce<Path> listener = new SetOnce<>();
213213
env.deleteShardDirectorySafe(new ShardId(index, 1), idxSettings, listener::set);
214214
Path deletedPath = listener.get();
215-
assertThat(deletedPath, equalTo(env.nodePath().resolve(index).resolve("1")));
215+
assertThat(deletedPath, equalTo(env.nodePaths()[0].resolve(index).resolve("1")));
216216
}
217217

218218
path = env.indexPath(index);

server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@
5555
import java.io.IOException;
5656
import java.nio.file.Files;
5757
import java.nio.file.Path;
58+
import java.util.Arrays;
5859
import java.util.Collections;
60+
import java.util.Objects;
5961
import java.util.Set;
6062
import java.util.regex.Matcher;
6163
import java.util.regex.Pattern;
@@ -131,8 +133,8 @@ public void setup() throws IOException {
131133
clusterState = ClusterState.builder(ClusterName.DEFAULT).metadata(Metadata.builder().put(indexMetadata, false).build()).build();
132134

133135
try (NodeEnvironment.NodeLock lock = new NodeEnvironment.NodeLock(logger, environment, Files::exists)) {
134-
final NodeEnvironment.NodePath dataPath = lock.getNodePath();
135-
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(new Path[] { dataPath.path }, nodeId,
136+
final Path[] dataPaths = Arrays.stream(lock.getNodePaths()).filter(Objects::nonNull).map(p -> p.path).toArray(Path[]::new);
137+
try (PersistedClusterStateService.Writer writer = new PersistedClusterStateService(dataPaths, nodeId,
136138
xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE,
137139
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L).createWriter()) {
138140
writer.writeFullStateAndCommit(1L, clusterState);

x-pack/plugin/autoscaling/src/internalClusterTest/java/org/elasticsearch/xpack/autoscaling/action/TransportGetAutoscalingCapacityActionIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class TransportGetAutoscalingCapacityActionIT extends AutoscalingIntegTes
3737
public void testCurrentCapacity() throws Exception {
3838
assertThat(capacity().results().keySet(), Matchers.empty());
3939
long memory = OsProbe.getInstance().getTotalPhysicalMemorySize();
40-
long storage = internalCluster().getInstance(NodeEnvironment.class).nodePath().fileStore.getTotalSpace();
40+
long storage = internalCluster().getInstance(NodeEnvironment.class).nodePaths()[0].fileStore.getTotalSpace();
4141
assertThat(memory, greaterThan(0L));
4242
assertThat(storage, greaterThan(0L));
4343
putAutoscalingPolicy("test");

0 commit comments

Comments
 (0)