Skip to content

Commit 1b62782

Browse files
committed
Extract and use scheduleUnlessShuttingDown
1 parent 1ff3e0f commit 1b62782

File tree

3 files changed

+36
-40
lines changed

3 files changed

+36
-40
lines changed

server/src/main/java/org/elasticsearch/cluster/coordination/ElectionSchedulerFactory.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.common.unit.TimeValue;
3030
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
31-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3231
import org.elasticsearch.threadpool.ThreadPool;
3332
import org.elasticsearch.threadpool.ThreadPool.Names;
3433

@@ -181,15 +180,7 @@ public String toString() {
181180
};
182181

183182
logger.debug("scheduling {}", runnable);
184-
try {
185-
threadPool.schedule(TimeValue.timeValueMillis(delayMillis), Names.GENERIC, runnable);
186-
} catch (EsRejectedExecutionException e) {
187-
if (e.isExecutorShutdown()) {
188-
logger.debug("couldn't schedule next election, executor is shutting down", e);
189-
} else {
190-
throw e;
191-
}
192-
}
183+
threadPool.scheduleUnlessShuttingDown(TimeValue.timeValueMillis(delayMillis), Names.GENERIC, runnable);
193184
}
194185

195186
@Override

server/src/main/java/org/elasticsearch/discovery/PeerFinder.java

Lines changed: 21 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.elasticsearch.common.transport.TransportAddress;
3535
import org.elasticsearch.common.unit.TimeValue;
3636
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
37-
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3837
import org.elasticsearch.threadpool.ThreadPool.Names;
3938
import org.elasticsearch.transport.TransportException;
4039
import org.elasticsearch.transport.TransportRequestOptions;
@@ -249,41 +248,33 @@ private boolean handleWakeUp() {
249248
}
250249
});
251250

252-
try {
253-
transportService.getThreadPool().schedule(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
254-
@Override
255-
public boolean isForceExecution() {
256-
return true;
257-
}
251+
transportService.getThreadPool().scheduleUnlessShuttingDown(findPeersInterval, Names.GENERIC, new AbstractRunnable() {
252+
@Override
253+
public boolean isForceExecution() {
254+
return true;
255+
}
258256

259-
@Override
260-
public void onFailure(Exception e) {
261-
assert false : e;
262-
logger.debug("unexpected exception in wakeup", e);
263-
}
257+
@Override
258+
public void onFailure(Exception e) {
259+
assert false : e;
260+
logger.debug("unexpected exception in wakeup", e);
261+
}
264262

265-
@Override
266-
protected void doRun() {
267-
synchronized (mutex) {
268-
if (handleWakeUp() == false) {
269-
return;
270-
}
263+
@Override
264+
protected void doRun() {
265+
synchronized (mutex) {
266+
if (handleWakeUp() == false) {
267+
return;
271268
}
272-
onFoundPeersUpdated();
273269
}
270+
onFoundPeersUpdated();
271+
}
274272

275-
@Override
276-
public String toString() {
277-
return "PeerFinder handling wakeup";
278-
}
279-
});
280-
} catch (EsRejectedExecutionException e) {
281-
if (e.isExecutorShutdown()) {
282-
logger.debug("couldn't schedule new execution of peer finder, executor is shutting down", e);
283-
} else {
284-
throw e;
273+
@Override
274+
public String toString() {
275+
return "PeerFinder handling wakeup";
285276
}
286-
}
277+
});
287278

288279
return peersRemoved;
289280
}

server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.unit.SizeValue;
3333
import org.elasticsearch.common.unit.TimeValue;
3434
import org.elasticsearch.common.util.concurrent.EsExecutors;
35+
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
3536
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
3637
import org.elasticsearch.common.util.concurrent.ThreadContext;
3738
import org.elasticsearch.common.util.concurrent.XRejectedExecutionHandler;
@@ -350,6 +351,19 @@ public ScheduledFuture<?> schedule(TimeValue delay, String executor, Runnable co
350351
return scheduler.schedule(new ThreadPool.LoggingRunnable(command), delay.millis(), TimeUnit.MILLISECONDS);
351352
}
352353

354+
public void scheduleUnlessShuttingDown(TimeValue delay, String executor, Runnable command) {
355+
try {
356+
schedule(delay, executor, command);
357+
} catch (EsRejectedExecutionException e) {
358+
if (e.isExecutorShutdown()) {
359+
logger.debug(new ParameterizedMessage("could not schedule execution of [{}] after [{}] on [{}] as executor is shut down",
360+
command, delay, executor), e);
361+
} else {
362+
throw e;
363+
}
364+
}
365+
}
366+
353367
@Override
354368
public Cancellable scheduleWithFixedDelay(Runnable command, TimeValue interval, String executor) {
355369
return new ReschedulingRunnable(command, interval, executor, this,

0 commit comments

Comments
 (0)