Skip to content

Commit b598944

Browse files
committed
[Initial DRAFT] Adds a FsHealthService that periodically tries to write to all paths and emits a stats is_writable as a part of node stats API.
FsReadOnlyMonitor pulls up the stats and tries to remove the node if not all paths are found to be writable. Addresses elastic#45286.
1 parent 64815f1 commit b598944

14 files changed

+446
-56
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java

+18-3
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable {
4444
final ImmutableOpenMap<String, Long> shardSizes;
4545
public static final ClusterInfo EMPTY = new ClusterInfo();
4646
final ImmutableOpenMap<ShardRouting, String> routingToDataPath;
47+
final ImmutableOpenMap<String, Boolean> nodeAllPathsWritable;
4748

4849
protected ClusterInfo() {
49-
this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
50+
this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of());
5051
}
5152

5253
/**
@@ -60,24 +61,27 @@ protected ClusterInfo() {
6061
*/
6162
public ClusterInfo(ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsage,
6263
ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsage, ImmutableOpenMap<String, Long> shardSizes,
63-
ImmutableOpenMap<ShardRouting, String> routingToDataPath) {
64+
ImmutableOpenMap<ShardRouting, String> routingToDataPath, ImmutableOpenMap<String, Boolean> nodeAllPathsWritable) {
6465
this.leastAvailableSpaceUsage = leastAvailableSpaceUsage;
6566
this.shardSizes = shardSizes;
6667
this.mostAvailableSpaceUsage = mostAvailableSpaceUsage;
6768
this.routingToDataPath = routingToDataPath;
69+
this.nodeAllPathsWritable = nodeAllPathsWritable;
6870
}
6971

7072
public ClusterInfo(StreamInput in) throws IOException {
7173
Map<String, DiskUsage> leastMap = in.readMap(StreamInput::readString, DiskUsage::new);
7274
Map<String, DiskUsage> mostMap = in.readMap(StreamInput::readString, DiskUsage::new);
75+
Map<String, Boolean> allPathsWritable = in.readMap(StreamInput::readString, StreamInput::readBoolean);
7376
Map<String, Long> sizeMap = in.readMap(StreamInput::readString, StreamInput::readLong);
7477
Map<ShardRouting, String> routingMap = in.readMap(ShardRouting::new, StreamInput::readString);
75-
7678
ImmutableOpenMap.Builder<String, DiskUsage> leastBuilder = ImmutableOpenMap.builder();
7779
this.leastAvailableSpaceUsage = leastBuilder.putAll(leastMap).build();
7880
ImmutableOpenMap.Builder<String, DiskUsage> mostBuilder = ImmutableOpenMap.builder();
7981
this.mostAvailableSpaceUsage = mostBuilder.putAll(mostMap).build();
8082
ImmutableOpenMap.Builder<String, Long> sizeBuilder = ImmutableOpenMap.builder();
83+
ImmutableOpenMap.Builder<String, Boolean> allPathsWritableBuilder = ImmutableOpenMap.builder();
84+
this.nodeAllPathsWritable = allPathsWritableBuilder.putAll(allPathsWritable).build();
8185
this.shardSizes = sizeBuilder.putAll(sizeMap).build();
8286
ImmutableOpenMap.Builder<ShardRouting, String> routingBuilder = ImmutableOpenMap.builder();
8387
this.routingToDataPath = routingBuilder.putAll(routingMap).build();
@@ -95,6 +99,11 @@ public void writeTo(StreamOutput out) throws IOException {
9599
out.writeString(c.key);
96100
c.value.writeTo(out);
97101
}
102+
out.writeVInt(this.nodeAllPathsWritable.size());
103+
for (ObjectObjectCursor<String, Boolean> c : this.nodeAllPathsWritable) {
104+
out.writeString(c.key);
105+
out.writeBoolean(c.value);
106+
}
98107
out.writeVInt(this.shardSizes.size());
99108
for (ObjectObjectCursor<String, Long> c : this.shardSizes) {
100109
out.writeString(c.key);
@@ -127,6 +136,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
127136
}
128137
}
129138
builder.endObject(); // end "most_available"
139+
builder.field("all_path_writable", this.nodeAllPathsWritable.get(c.key));
130140
}
131141
builder.endObject(); // end $nodename
132142
}
@@ -161,6 +171,11 @@ public ImmutableOpenMap<String, DiskUsage> getNodeMostAvailableDiskUsages() {
161171
return this.mostAvailableSpaceUsage;
162172
}
163173

174+
/**
175+
* Returns a node id to writeablity mapping for the path that is not writeable.
176+
*/
177+
public ImmutableOpenMap<String, Boolean> getNodeAllPathsWritable() { return this.nodeAllPathsWritable; }
178+
164179
/**
165180
* Returns the shard size for the given shard routing or <code>null</code> it that metric is not available.
166181
*/

server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

+47-36
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.common.settings.Settings;
4545
import org.elasticsearch.common.unit.TimeValue;
4646
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
47+
import org.elasticsearch.monitor.fs.FsHealthService;
4748
import org.elasticsearch.monitor.fs.FsInfo;
4849
import org.elasticsearch.threadpool.ThreadPool;
4950
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
@@ -80,6 +81,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
8081

8182
private volatile ImmutableOpenMap<String, DiskUsage> leastAvailableSpaceUsages;
8283
private volatile ImmutableOpenMap<String, DiskUsage> mostAvailableSpaceUsages;
84+
private volatile ImmutableOpenMap<String, Boolean> allPathsWritable;
8385
private volatile ImmutableOpenMap<ShardRouting, String> shardRoutingToDataPath;
8486
private volatile ImmutableOpenMap<String, Long> shardSizes;
8587
private volatile boolean isMaster = false;
@@ -94,6 +96,7 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
9496
this.leastAvailableSpaceUsages = ImmutableOpenMap.of();
9597
this.mostAvailableSpaceUsages = ImmutableOpenMap.of();
9698
this.shardRoutingToDataPath = ImmutableOpenMap.of();
99+
this.allPathsWritable = ImmutableOpenMap.of();
97100
this.shardSizes = ImmutableOpenMap.of();
98101
this.clusterService = clusterService;
99102
this.threadPool = threadPool;
@@ -105,16 +108,16 @@ public InternalClusterInfoService(Settings settings, ClusterService clusterServi
105108
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_TIMEOUT_SETTING, this::setFetchTimeout);
106109
clusterSettings.addSettingsUpdateConsumer(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL_SETTING, this::setUpdateFrequency);
107110
clusterSettings.addSettingsUpdateConsumer(DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED_SETTING,
108-
this::setEnabled);
111+
FsHealthService.ENABLED_SETTING, this::setEnabled);
109112

110113
// Add InternalClusterInfoService to listen for Master changes
111114
this.clusterService.addLocalNodeMasterListener(this);
112115
// Add to listen for state changes (when nodes are added)
113116
this.clusterService.addListener(this);
114117
}
115118

116-
private void setEnabled(boolean enabled) {
117-
this.enabled = enabled;
119+
private void setEnabled(boolean diskThresholdEnabled, boolean fsHealthEnabled) {
120+
this.enabled = diskThresholdEnabled || fsHealthEnabled;
118121
}
119122

120123
private void setFetchTimeout(TimeValue fetchTimeout) {
@@ -200,7 +203,7 @@ public void clusterChanged(ClusterChangedEvent event) {
200203

201204
@Override
202205
public ClusterInfo getClusterInfo() {
203-
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath);
206+
return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath, allPathsWritable);
204207
}
205208

206209
/**
@@ -242,7 +245,7 @@ public void run() {
242245
*/
243246
protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
244247
final CountDownLatch latch = new CountDownLatch(1);
245-
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
248+
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest();
246249
nodesStatsRequest.clear();
247250
nodesStatsRequest.fs(true);
248251
nodesStatsRequest.timeout(fetchTimeout);
@@ -293,10 +296,12 @@ public final ClusterInfo refresh() {
293296
public void onResponse(NodesStatsResponse nodesStatsResponse) {
294297
ImmutableOpenMap.Builder<String, DiskUsage> leastAvailableUsagesBuilder = ImmutableOpenMap.builder();
295298
ImmutableOpenMap.Builder<String, DiskUsage> mostAvailableUsagesBuilder = ImmutableOpenMap.builder();
296-
fillDiskUsagePerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()),
297-
leastAvailableUsagesBuilder, mostAvailableUsagesBuilder);
299+
ImmutableOpenMap.Builder<String, Boolean> allPathsWritableBuilder = ImmutableOpenMap.builder();
300+
fillDiskStatsPerNode(logger, adjustNodesStats(nodesStatsResponse.getNodes()), clusterService,
301+
leastAvailableUsagesBuilder, mostAvailableUsagesBuilder, allPathsWritableBuilder);
298302
leastAvailableSpaceUsages = leastAvailableUsagesBuilder.build();
299303
mostAvailableSpaceUsages = mostAvailableUsagesBuilder.build();
304+
allPathsWritable = allPathsWritableBuilder.build();
300305
}
301306

302307
@Override
@@ -396,51 +401,57 @@ static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpen
396401
}
397402
}
398403

399-
static void fillDiskUsagePerNode(Logger logger, List<NodeStats> nodeStatsArray,
404+
static void fillDiskStatsPerNode(Logger logger, List<NodeStats> nodeStatsArray, ClusterService clusterService,
400405
ImmutableOpenMap.Builder<String, DiskUsage> newLeastAvaiableUsages,
401-
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages) {
406+
ImmutableOpenMap.Builder<String, DiskUsage> newMostAvaiableUsages,
407+
ImmutableOpenMap.Builder<String, Boolean> allPathsWritableBuilder) {
402408
for (NodeStats nodeStats : nodeStatsArray) {
403409
if (nodeStats.getFs() == null) {
404410
logger.warn("Unable to retrieve node FS stats for {}", nodeStats.getNode().getName());
405411
} else {
406412
FsInfo.Path leastAvailablePath = null;
407413
FsInfo.Path mostAvailablePath = null;
408-
for (FsInfo.Path info : nodeStats.getFs()) {
409-
if (leastAvailablePath == null) {
410-
assert mostAvailablePath == null;
411-
mostAvailablePath = leastAvailablePath = info;
412-
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
413-
leastAvailablePath = info;
414-
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
415-
mostAvailablePath = info;
416-
}
417-
}
418414
String nodeId = nodeStats.getNode().getId();
419415
String nodeName = nodeStats.getNode().getName();
420-
if (logger.isTraceEnabled()) {
421-
logger.trace("node: [{}], most available: total disk: {}," +
422-
" available disk: {} / least available: total disk: {}, available disk: {}",
416+
Boolean allPathsWritable = nodeStats.getFs().getTotal().isWritable();
417+
if (clusterService.state().getNodes().getMasterNodes().containsKey(nodeStats.getNode().getId()) == false) {
418+
for (FsInfo.Path info : nodeStats.getFs()) {
419+
if (leastAvailablePath == null) {
420+
assert mostAvailablePath == null;
421+
mostAvailablePath = leastAvailablePath = info;
422+
} else if (leastAvailablePath.getAvailable().getBytes() > info.getAvailable().getBytes()) {
423+
leastAvailablePath = info;
424+
} else if (mostAvailablePath.getAvailable().getBytes() < info.getAvailable().getBytes()) {
425+
mostAvailablePath = info;
426+
}
427+
}
428+
if (logger.isTraceEnabled()) {
429+
logger.trace("node: [{}], most available: total disk: {}," +
430+
" available disk: {} / least available: total disk: {}, available disk: {}",
423431
nodeId, mostAvailablePath.getTotal(), leastAvailablePath.getAvailable(),
424432
leastAvailablePath.getTotal(), leastAvailablePath.getAvailable());
425-
}
426-
if (leastAvailablePath.getTotal().getBytes() < 0) {
427-
if (logger.isTraceEnabled()) {
428-
logger.trace("node: [{}] least available path has less than 0 total bytes of disk [{}], skipping",
433+
}
434+
if (leastAvailablePath.getTotal().getBytes() < 0) {
435+
if (logger.isTraceEnabled()) {
436+
logger.trace("node: [{}] least available path has less than 0 total bytes of disk [{}], skipping",
429437
nodeId, leastAvailablePath.getTotal().getBytes());
438+
}
439+
} else {
440+
newLeastAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, leastAvailablePath.getPath(),
441+
leastAvailablePath.getTotal().getBytes(), leastAvailablePath.getAvailable().getBytes()));
430442
}
431-
} else {
432-
newLeastAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, leastAvailablePath.getPath(),
433-
leastAvailablePath.getTotal().getBytes(), leastAvailablePath.getAvailable().getBytes()));
434-
}
435-
if (mostAvailablePath.getTotal().getBytes() < 0) {
436-
if (logger.isTraceEnabled()) {
437-
logger.trace("node: [{}] most available path has less than 0 total bytes of disk [{}], skipping",
443+
if (mostAvailablePath.getTotal().getBytes() < 0) {
444+
if (logger.isTraceEnabled()) {
445+
logger.trace("node: [{}] most available path has less than 0 total bytes of disk [{}], skipping",
438446
nodeId, mostAvailablePath.getTotal().getBytes());
447+
}
448+
} else {
449+
newMostAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, mostAvailablePath.getPath(),
450+
mostAvailablePath.getTotal().getBytes(), mostAvailablePath.getAvailable().getBytes()));
439451
}
440-
} else {
441-
newMostAvaiableUsages.put(nodeId, new DiskUsage(nodeId, nodeName, mostAvailablePath.getPath(),
442-
mostAvailablePath.getTotal().getBytes(), mostAvailablePath.getAvailable().getBytes()));
452+
443453
}
454+
allPathsWritableBuilder.put(nodeId, allPathsWritable);
444455

445456
}
446457
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+19-3
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.ClusterStateTaskConfig;
3131
import org.elasticsearch.cluster.ClusterStateUpdateTask;
3232
import org.elasticsearch.cluster.LocalClusterUpdateTask;
33+
import org.elasticsearch.cluster.ClusterInfoService;
3334
import org.elasticsearch.cluster.block.ClusterBlocks;
3435
import org.elasticsearch.cluster.coordination.ClusterFormationFailureHelper.ClusterFormationState;
3536
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfigExclusion;
@@ -67,6 +68,8 @@
6768
import org.elasticsearch.discovery.PeerFinder;
6869
import org.elasticsearch.discovery.SeedHostsProvider;
6970
import org.elasticsearch.discovery.SeedHostsResolver;
71+
import org.elasticsearch.monitor.fs.FsReadOnlyMonitor;
72+
import org.elasticsearch.monitor.fs.FsService;
7073
import org.elasticsearch.threadpool.Scheduler;
7174
import org.elasticsearch.threadpool.ThreadPool.Names;
7275
import org.elasticsearch.transport.TransportResponse.Empty;
@@ -149,6 +152,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
149152
private Optional<Join> lastJoin;
150153
private JoinHelper.JoinAccumulator joinAccumulator;
151154
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
155+
private final FsService fsService;
156+
private final FsReadOnlyMonitor fsReadOnlyMonitor;
152157

153158
/**
154159
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -158,7 +163,8 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
158163
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
159164
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
160165
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
161-
RerouteService rerouteService, ElectionStrategy electionStrategy) {
166+
RerouteService rerouteService, ElectionStrategy electionStrategy, FsService fsService,
167+
ClusterInfoService clusterInfoService) {
162168
this.settings = settings;
163169
this.transportService = transportService;
164170
this.masterService = masterService;
@@ -168,7 +174,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
168174
this.electionStrategy = electionStrategy;
169175
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
170176
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
171-
rerouteService);
177+
rerouteService, fsService);
172178
this.persistedStateSupplier = persistedStateSupplier;
173179
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
174180
this.lastKnownLeader = Optional.empty();
@@ -178,7 +184,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
178184
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
179185
this.random = random;
180186
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
181-
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy);
187+
this.preVoteCollector = new PreVoteCollector(transportService, this::startElection, this::updateMaxTermSeen, electionStrategy, fsService);
182188
configuredHostsResolver = new SeedHostsResolver(nodeName, settings, transportService, seedHostsProvider);
183189
this.peerFinder = new CoordinatorPeerFinder(settings, transportService,
184190
new HandshakingTransportAddressConnector(settings, transportService), configuredHostsResolver);
@@ -196,6 +202,10 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
196202
transportService::getLocalNode);
197203
this.clusterFormationFailureHelper = new ClusterFormationFailureHelper(settings, this::getClusterFormationState,
198204
transportService.getThreadPool(), joinHelper::logLastFailedJoinAttempt);
205+
//TODO check if FsReadOnlyMonitor and LagDetector can be implemented as a part of a common interface
206+
this.fsReadOnlyMonitor = new FsReadOnlyMonitor(settings, clusterSettings, this::getStateForMasterService, transportService::getLocalNode,
207+
this::removeNode, clusterInfoService);
208+
this.fsService = fsService;
199209
}
200210

201211
private ClusterFormationState getClusterFormationState() {
@@ -1171,6 +1181,12 @@ public void run() {
11711181
return;
11721182
}
11731183

1184+
if(fsService.stats().getTotal().isWritable() == Boolean.FALSE){
1185+
logger.warn("skip prevoting as local node is not writable: {}",
1186+
lastAcceptedState.coordinationMetaData());
1187+
return;
1188+
}
1189+
11741190
if (prevotingRound != null) {
11751191
prevotingRound.close();
11761192
}

0 commit comments

Comments
 (0)