Skip to content

Commit 80f638b

Browse files
committed
Support scheduled commands in current context
Adds support for scheduling commands to run at a later time on another thread pool in the current thread's context: ```java Runnable someCommand = () -> {System.err.println("Demo");}; someCommand = threadPool.getThreadContext().preserveContext(someCommand); threadPool.schedule(timeValueMinutes(1), Names.GENERAL, someCommand); ``` This happens automatically for calls to `threadPool.execute` but `schedule` and `scheduleWithFixedDelay` don't do that, presumably because scheduled tasks are usually context-less. Rather than preserve the current context on all scheduled tasks this just makes it possible to preserve it using the syntax above. To make this all go it moves the Runnables that wrap the commands from EsThreadPoolExecutor into ThreadContext. This, or something like it, is required to support reindex throttling.
1 parent 0543d46 commit 80f638b

File tree

5 files changed

+235
-107
lines changed

5 files changed

+235
-107
lines changed

buildSrc/src/main/resources/checkstyle_suppressions.xml

-1
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,6 @@
390390
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]CollectionUtils.java" checks="LineLength" />
391391
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]ExtensionPoint.java" checks="LineLength" />
392392
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]concurrent[/\\]EsExecutors.java" checks="LineLength" />
393-
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]concurrent[/\\]EsThreadPoolExecutor.java" checks="LineLength" />
394393
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]concurrent[/\\]PrioritizedEsThreadPoolExecutor.java" checks="LineLength" />
395394
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]concurrent[/\\]ThreadBarrier.java" checks="LineLength" />
396395
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]concurrent[/\\]ThreadContext.java" checks="LineLength" />

core/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.java

+7-106
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,14 @@ public class EsThreadPoolExecutor extends ThreadPoolExecutor {
4040
*/
4141
private final String name;
4242

43-
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
43+
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
44+
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
4445
this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
4546
}
4647

47-
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) {
48+
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
49+
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
50+
ThreadContext contextHolder) {
4851
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
4952
this.name = name;
5053
this.contextHolder = contextHolder;
@@ -133,112 +136,10 @@ public String toString() {
133136
}
134137

135138
protected Runnable wrapRunnable(Runnable command) {
136-
final Runnable wrappedCommand;
137-
if (command instanceof AbstractRunnable) {
138-
wrappedCommand = new FilterAbstractRunnable(contextHolder, (AbstractRunnable) command);
139-
} else {
140-
wrappedCommand = new FilterRunnable(contextHolder, command);
141-
}
142-
return wrappedCommand;
139+
return contextHolder.preserveContext(command);
143140
}
144141

145142
protected Runnable unwrap(Runnable runnable) {
146-
if (runnable instanceof FilterAbstractRunnable) {
147-
return ((FilterAbstractRunnable) runnable).in;
148-
} else if (runnable instanceof FilterRunnable) {
149-
return ((FilterRunnable) runnable).in;
150-
}
151-
return runnable;
143+
return contextHolder.unwrap(runnable);
152144
}
153-
154-
private class FilterAbstractRunnable extends AbstractRunnable {
155-
private final ThreadContext contextHolder;
156-
private final AbstractRunnable in;
157-
private final ThreadContext.StoredContext ctx;
158-
159-
FilterAbstractRunnable(ThreadContext contextHolder, AbstractRunnable in) {
160-
this.contextHolder = contextHolder;
161-
ctx = contextHolder.newStoredContext();
162-
this.in = in;
163-
}
164-
165-
@Override
166-
public boolean isForceExecution() {
167-
return in.isForceExecution();
168-
}
169-
170-
@Override
171-
public void onAfter() {
172-
in.onAfter();
173-
}
174-
175-
@Override
176-
public void onFailure(Throwable t) {
177-
in.onFailure(t);
178-
}
179-
180-
@Override
181-
public void onRejection(Throwable t) {
182-
in.onRejection(t);
183-
}
184-
185-
@Override
186-
protected void doRun() throws Exception {
187-
boolean whileRunning = false;
188-
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
189-
ctx.restore();
190-
whileRunning = true;
191-
in.doRun();
192-
whileRunning = false;
193-
} catch (IllegalStateException ex) {
194-
if (whileRunning || isShutdown() == false) {
195-
throw ex;
196-
}
197-
// if we hit an ISE here we have been shutting down
198-
// this comes from the threadcontext and barfs if
199-
// our threadpool has been shutting down
200-
}
201-
}
202-
203-
@Override
204-
public String toString() {
205-
return in.toString();
206-
}
207-
208-
}
209-
210-
private class FilterRunnable implements Runnable {
211-
private final ThreadContext contextHolder;
212-
private final Runnable in;
213-
private final ThreadContext.StoredContext ctx;
214-
215-
FilterRunnable(ThreadContext contextHolder, Runnable in) {
216-
this.contextHolder = contextHolder;
217-
ctx = contextHolder.newStoredContext();
218-
this.in = in;
219-
}
220-
221-
@Override
222-
public void run() {
223-
boolean whileRunning = false;
224-
try (ThreadContext.StoredContext ingore = contextHolder.stashContext()){
225-
ctx.restore();
226-
whileRunning = true;
227-
in.run();
228-
whileRunning = false;
229-
} catch (IllegalStateException ex) {
230-
if (whileRunning || isShutdown() == false) {
231-
throw ex;
232-
}
233-
// if we hit an ISE here we have been shutting down
234-
// this comes from the threadcontext and barfs if
235-
// our threadpool has been shutting down
236-
}
237-
}
238-
@Override
239-
public String toString() {
240-
return in.toString();
241-
}
242-
}
243-
244145
}

core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java

+130
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,36 @@ public <T> T getTransient(String key) {
200200
return (T) threadLocal.get().transientHeaders.get(key);
201201
}
202202

203+
/**
204+
* Saves the current thread context and wraps command in a Runnable that restores that context before running command. If
205+
* <code>command</code> has already been passed through this method then it is returned unaltered rather than wrapped twice.
206+
*/
207+
public Runnable preserveContext(Runnable command) {
208+
if (command instanceof ContextPreservingAbstractRunnable) {
209+
return command;
210+
}
211+
if (command instanceof ContextPreservingRunnable) {
212+
return command;
213+
}
214+
if (command instanceof AbstractRunnable) {
215+
return new ContextPreservingAbstractRunnable((AbstractRunnable) command);
216+
}
217+
return new ContextPreservingRunnable(command);
218+
}
219+
220+
/**
221+
* Unwraps a command that was previously wrapped by {@link #preserveContext(Runnable)}.
222+
*/
223+
public Runnable unwrap(Runnable command) {
224+
if (command instanceof ContextPreservingAbstractRunnable) {
225+
return ((ContextPreservingAbstractRunnable) command).unwrap();
226+
}
227+
if (command instanceof ContextPreservingRunnable) {
228+
return ((ContextPreservingRunnable) command).unwrap();
229+
}
230+
return command;
231+
}
232+
203233
public interface StoredContext extends AutoCloseable {
204234
@Override
205235
void close();
@@ -356,4 +386,104 @@ public void close() {
356386
}
357387
}
358388
}
389+
390+
/**
391+
* Wraps a Runnable to preserve the thread context.
392+
*/
393+
class ContextPreservingRunnable implements Runnable {
394+
private final Runnable in;
395+
private final ThreadContext.StoredContext ctx;
396+
397+
ContextPreservingRunnable(Runnable in) {
398+
ctx = newStoredContext();
399+
this.in = in;
400+
}
401+
402+
@Override
403+
public void run() {
404+
boolean whileRunning = false;
405+
try (ThreadContext.StoredContext ingore = stashContext()){
406+
ctx.restore();
407+
whileRunning = true;
408+
in.run();
409+
whileRunning = false;
410+
} catch (IllegalStateException ex) {
411+
if (whileRunning || threadLocal.closed.get() == false) {
412+
throw ex;
413+
}
414+
// if we hit an ISE here we have been shutting down
415+
// this comes from the threadcontext and barfs if
416+
// our threadpool has been shutting down
417+
}
418+
}
419+
420+
@Override
421+
public String toString() {
422+
return in.toString();
423+
}
424+
425+
public Runnable unwrap() {
426+
return in;
427+
}
428+
}
429+
430+
/**
431+
* Wraps an AbstractRunnable to preserve the thread context.
432+
*/
433+
public class ContextPreservingAbstractRunnable extends AbstractRunnable {
434+
private final AbstractRunnable in;
435+
private final ThreadContext.StoredContext ctx;
436+
437+
private ContextPreservingAbstractRunnable(AbstractRunnable in) {
438+
ctx = newStoredContext();
439+
this.in = in;
440+
}
441+
442+
@Override
443+
public boolean isForceExecution() {
444+
return in.isForceExecution();
445+
}
446+
447+
@Override
448+
public void onAfter() {
449+
in.onAfter();
450+
}
451+
452+
@Override
453+
public void onFailure(Throwable t) {
454+
in.onFailure(t);
455+
}
456+
457+
@Override
458+
public void onRejection(Throwable t) {
459+
in.onRejection(t);
460+
}
461+
462+
@Override
463+
protected void doRun() throws Exception {
464+
boolean whileRunning = false;
465+
try (ThreadContext.StoredContext ingore = stashContext()){
466+
ctx.restore();
467+
whileRunning = true;
468+
in.doRun();
469+
whileRunning = false;
470+
} catch (IllegalStateException ex) {
471+
if (whileRunning || threadLocal.closed.get() == false) {
472+
throw ex;
473+
}
474+
// if we hit an ISE here we have been shutting down
475+
// this comes from the threadcontext and barfs if
476+
// our threadpool has been shutting down
477+
}
478+
}
479+
480+
@Override
481+
public String toString() {
482+
return in.toString();
483+
}
484+
485+
public AbstractRunnable unwrap() {
486+
return in;
487+
}
488+
}
359489
}

core/src/main/java/org/elasticsearch/threadpool/ThreadPool.java

+29
Original file line numberDiff line numberDiff line change
@@ -341,10 +341,18 @@ public ThreadPoolStats stats() {
341341
return new ThreadPoolStats(stats);
342342
}
343343

344+
/**
345+
* Get the generic executor. This executor's {@link Executor#execute(Runnable)} method will run the Runnable it is given in
346+
* the {@link ThreadContext} of the thread that queues it.
347+
*/
344348
public Executor generic() {
345349
return executor(Names.GENERIC);
346350
}
347351

352+
/**
353+
* Get the executor with the given name. This executor's {@link Executor#execute(Runnable)} method will run the Runnable it is given in
354+
* the {@link ThreadContext} of the thread that queues it.
355+
*/
348356
public Executor executor(String name) {
349357
Executor executor = executors.get(name).executor();
350358
if (executor == null) {
@@ -357,10 +365,31 @@ public ScheduledExecutorService scheduler() {
357365
return this.scheduler;
358366
}
359367

368+
/**
369+
* Schedules a periodic action that always runs on the scheduler thread.
370+
*
371+
* @param command the action to take
372+
* @param interval the delay interval
373+
* @return a ScheduledFuture who's get will return when the task is complete and throw an exception if it is canceled
374+
*/
360375
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, TimeValue interval) {
361376
return scheduler.scheduleWithFixedDelay(new LoggingRunnable(command), interval.millis(), interval.millis(), TimeUnit.MILLISECONDS);
362377
}
363378

379+
/**
380+
* Schedules a one-shot command to run after a given delay. The command is not run in the context of the calling thread. To preserve the
381+
* context of the calling thread you may call <code>threadPool.getThreadContext().preserveContext</code> on the runnable before passing
382+
* it to this method.
383+
*
384+
* @param delay delay before the task executes
385+
* @param name the name of the thread pool on which to execute this task. SAME means "execute on the scheduler thread" which changes the
386+
* meaning of the ScheduledFuture returned by this method. In that case the ScheduledFuture will complete only when the command
387+
* completes.
388+
* @param command the command to run
389+
* @return a ScheduledFuture who's get will return when the task is has been added to its target thread pool and throw an exception if
390+
* the task is canceled before it was added to its target thread pool. Once the task has been added to its target thread pool
391+
* the ScheduledFuture will cannot interact with it.
392+
*/
364393
public ScheduledFuture<?> schedule(TimeValue delay, String name, Runnable command) {
365394
if (!Names.SAME.equals(name)) {
366395
command = new ThreadedRunnable(command, executor(name));

0 commit comments

Comments
 (0)