Skip to content

Commit 4e5264c

Browse files
committed
Reroute shards automatically when high disk watermark is exceeded
This adds a Listener interface to the ClusterInfoService, this is used by the DiskThresholdDecider, which adds a listener to check for nodes passing the high watermark. If a node is past the high watermark an empty reroute is issued so shards can be reallocated if desired. A reroute will only be issued once every `cluster.routing.allocation.disk.reroute_interval`, which is "60s" by default. Refactors InternalClusterInfoService to delegate the nodes stats and indices stats gathering into separate methods so they have be overriden by extending classes. Each stat gathering method returns a CountDownLatch that can be used to wait until processing for that part is successful before calling the listeners. Fixes #8146
1 parent 909502b commit 4e5264c

File tree

11 files changed

+600
-21
lines changed

11 files changed

+600
-21
lines changed
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action;
21+
22+
import java.util.concurrent.CountDownLatch;
23+
24+
/**
25+
* An action listener that allows passing in a {@link CountDownLatch} that
26+
* will be counted down after onResponse or onFailure is called
27+
*/
28+
public final class LatchedActionListener<T> implements ActionListener<T> {
29+
30+
private final ActionListener<T> delegate;
31+
private final CountDownLatch latch;
32+
33+
public LatchedActionListener(ActionListener<T> delegate, CountDownLatch latch) {
34+
this.delegate = delegate;
35+
this.latch = latch;
36+
}
37+
38+
@Override
39+
public void onResponse(T t) {
40+
try {
41+
delegate.onResponse(t);
42+
} finally {
43+
latch.countDown();
44+
}
45+
}
46+
47+
@Override
48+
public void onFailure(Throwable e) {
49+
try {
50+
delegate.onFailure(e);
51+
} finally {
52+
latch.countDown();
53+
}
54+
}
55+
}

src/main/java/org/elasticsearch/cluster/ClusterInfoService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,25 @@
1919

2020
package org.elasticsearch.cluster;
2121

22+
/**
23+
* Interface for a class used to gather information about a cluster at
24+
* regular intervals
25+
*/
2226
public interface ClusterInfoService {
2327

2428
public static ClusterInfoService EMPTY = EmptyClusterInfoService.getInstance();
2529

30+
/** The latest cluster information */
2631
public ClusterInfo getClusterInfo();
2732

33+
/** Add a listener that will be called every time new information is gathered */
34+
public void addListener(Listener listener);
35+
36+
/**
37+
* Interface for listeners to implement in order to perform actions when
38+
* new information about the cluster has been gathered
39+
*/
40+
public interface Listener {
41+
public void onNewInfo(ClusterInfo info);
42+
}
2843
}

src/main/java/org/elasticsearch/cluster/ClusterModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
public class ClusterModule extends AbstractModule implements SpawnModules {
4848

4949
private final Settings settings;
50+
public static final String CLUSTER_SERVICE_IMPL = "cluster.info.service.type";
5051

5152
private Set<Class<? extends IndexTemplateFilter>> indexTemplateFilters = new HashSet<>();
5253

@@ -87,7 +88,7 @@ protected void configure() {
8788
bind(NodeMappingRefreshAction.class).asEagerSingleton();
8889
bind(MappingUpdatedAction.class).asEagerSingleton();
8990

90-
bind(ClusterInfoService.class).to(InternalClusterInfoService.class).asEagerSingleton();
91+
bind(ClusterInfoService.class).to(settings.getAsClass(CLUSTER_SERVICE_IMPL, InternalClusterInfoService.class)).asEagerSingleton();
9192

9293
Multibinder<IndexTemplateFilter> mbinder = Multibinder.newSetBinder(binder(), IndexTemplateFilter.class);
9394
for (Class<? extends IndexTemplateFilter> indexTemplateFilter : indexTemplateFilters) {

src/main/java/org/elasticsearch/cluster/DiskUsage.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,6 @@ public long getUsedBytes() {
5555
}
5656

5757
public String toString() {
58-
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "]";
58+
return "[" + nodeId + "] free: " + getFreeBytes() + "[" + getFreeDiskAsPercentage() + "%]";
5959
}
6060
}

src/main/java/org/elasticsearch/cluster/EmptyClusterInfoService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,9 @@ public static EmptyClusterInfoService getInstance() {
4646
public ClusterInfo getClusterInfo() {
4747
return emptyClusterInfo;
4848
}
49+
50+
@Override
51+
public void addListener(Listener listener) {
52+
// no-op, no new info is ever gathered, so adding listeners is useless
53+
}
4954
}

src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java

Lines changed: 59 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import com.google.common.collect.ImmutableMap;
2323
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.LatchedActionListener;
2425
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
2526
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsRequest;
2627
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
@@ -42,8 +43,9 @@
4243
import org.elasticsearch.node.settings.NodeSettingsService;
4344
import org.elasticsearch.threadpool.ThreadPool;
4445

45-
import java.util.HashMap;
46-
import java.util.Map;
46+
import java.util.*;
47+
import java.util.concurrent.CountDownLatch;
48+
import java.util.concurrent.TimeUnit;
4749

4850
/**
4951
* InternalClusterInfoService provides the ClusterInfoService interface,
@@ -56,7 +58,7 @@
5658
* Every time the timer runs, gathers information about the disk usage and
5759
* shard sizes across the cluster.
5860
*/
59-
public final class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
61+
public class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener {
6062

6163
public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval";
6264

@@ -70,6 +72,7 @@ public final class InternalClusterInfoService extends AbstractComponent implemen
7072
private final TransportIndicesStatsAction transportIndicesStatsAction;
7173
private final ClusterService clusterService;
7274
private final ThreadPool threadPool;
75+
private final Set<Listener> listeners = Collections.synchronizedSet(new HashSet<Listener>());
7376

7477
@Inject
7578
public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSettingsService,
@@ -188,6 +191,11 @@ public ClusterInfo getClusterInfo() {
188191
return new ClusterInfo(usages, shardSizes);
189192
}
190193

194+
@Override
195+
public void addListener(Listener listener) {
196+
this.listeners.add(listener);
197+
}
198+
191199
/**
192200
* Class used to submit {@link ClusterInfoUpdateJob}s on the
193201
* {@link InternalClusterInfoService} threadpool, these jobs will
@@ -210,6 +218,34 @@ public void run() {
210218
}
211219
}
212220

221+
/**
222+
* Retrieve the latest nodes stats, calling the listener when complete
223+
* @return a latch that can be used to wait for the nodes stats to complete if desired
224+
*/
225+
protected CountDownLatch updateNodeStats(final ActionListener<NodesStatsResponse> listener) {
226+
final CountDownLatch latch = new CountDownLatch(1);
227+
final NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
228+
nodesStatsRequest.clear();
229+
nodesStatsRequest.fs(true);
230+
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));
231+
232+
transportNodesStatsAction.execute(nodesStatsRequest, new LatchedActionListener<>(listener, latch));
233+
return latch;
234+
}
235+
236+
/**
237+
* Retrieve the latest indices stats, calling the listener when complete
238+
* @return a latch that can be used to wait for the indices stats to complete if desired
239+
*/
240+
protected CountDownLatch updateIndicesStats(final ActionListener<IndicesStatsResponse> listener) {
241+
final CountDownLatch latch = new CountDownLatch(1);
242+
final IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
243+
indicesStatsRequest.clear();
244+
indicesStatsRequest.store(true);
245+
246+
transportIndicesStatsAction.execute(indicesStatsRequest, new LatchedActionListener<>(listener, latch));
247+
return latch;
248+
}
213249

214250
/**
215251
* Runnable class that performs a {@Link NodesStatsRequest} to retrieve
@@ -252,12 +288,7 @@ public void run() {
252288
return;
253289
}
254290

255-
NodesStatsRequest nodesStatsRequest = new NodesStatsRequest("data:true");
256-
nodesStatsRequest.clear();
257-
nodesStatsRequest.fs(true);
258-
nodesStatsRequest.timeout(TimeValue.timeValueSeconds(15));
259-
260-
transportNodesStatsAction.execute(nodesStatsRequest, new ActionListener<NodesStatsResponse>() {
291+
CountDownLatch nodeLatch = updateNodeStats(new ActionListener<NodesStatsResponse>() {
261292
@Override
262293
public void onResponse(NodesStatsResponse nodeStatses) {
263294
Map<String, DiskUsage> newUsages = new HashMap<>();
@@ -294,10 +325,7 @@ public void onFailure(Throwable e) {
294325
}
295326
});
296327

297-
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
298-
indicesStatsRequest.clear();
299-
indicesStatsRequest.store(true);
300-
transportIndicesStatsAction.execute(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
328+
CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<IndicesStatsResponse>() {
301329
@Override
302330
public void onResponse(IndicesStatsResponse indicesStatsResponse) {
303331
ShardStats[] stats = indicesStatsResponse.getShards();
@@ -325,8 +353,24 @@ public void onFailure(Throwable e) {
325353
}
326354
});
327355

328-
if (logger.isTraceEnabled()) {
329-
logger.trace("Finished ClusterInfoUpdateJob");
356+
try {
357+
nodeLatch.await(15, TimeUnit.SECONDS);
358+
} catch (InterruptedException e) {
359+
logger.warn("Failed to update node information for ClusterInfoUpdateJob within 15s timeout");
360+
}
361+
362+
try {
363+
indicesLatch.await(15, TimeUnit.SECONDS);
364+
} catch (InterruptedException e) {
365+
logger.warn("Failed to update shard information for ClusterInfoUpdateJob within 15s timeout");
366+
}
367+
368+
for (Listener l : listeners) {
369+
try {
370+
l.onNewInfo(getClusterInfo());
371+
} catch (Exception e) {
372+
logger.info("Failed executing ClusterInfoService listener", e);
373+
}
330374
}
331375
}
332376
}

0 commit comments

Comments
 (0)