Skip to content

Commit bb92bb2

Browse files
retoos1monw
authored andcommitted
Use a dedicated ThreadGroup in rest sniffer (#26897)
This change adds a dedicated thread group, configures threads with a corresponding thread name and starts all threads as daemon threads.
1 parent 80b1e2c commit bb92bb2

File tree

1 file changed

+37
-1
lines changed
  • client/sniffer/src/main/java/org/elasticsearch/client/sniff

1 file changed

+37
-1
lines changed

client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java

+37-1
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,16 @@
2727

2828
import java.io.Closeable;
2929
import java.io.IOException;
30+
import java.security.AccessController;
31+
import java.security.PrivilegedAction;
3032
import java.util.List;
3133
import java.util.concurrent.Executors;
3234
import java.util.concurrent.ScheduledExecutorService;
3335
import java.util.concurrent.ScheduledFuture;
36+
import java.util.concurrent.ThreadFactory;
3437
import java.util.concurrent.TimeUnit;
3538
import java.util.concurrent.atomic.AtomicBoolean;
39+
import java.util.concurrent.atomic.AtomicInteger;
3640

3741
/**
3842
* Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of
@@ -45,6 +49,7 @@
4549
public class Sniffer implements Closeable {
4650

4751
private static final Log logger = LogFactory.getLog(Sniffer.class);
52+
private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer";
4853

4954
private final Task task;
5055

@@ -79,7 +84,8 @@ private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffInterva
7984
this.restClient = restClient;
8085
this.sniffIntervalMillis = sniffIntervalMillis;
8186
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis;
82-
this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
87+
SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME);
88+
this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
8389
scheduleNextRun(0);
8490
}
8591

@@ -151,4 +157,34 @@ synchronized void shutdown() {
151157
public static SnifferBuilder builder(RestClient restClient) {
152158
return new SnifferBuilder(restClient);
153159
}
160+
161+
private static class SnifferThreadFactory implements ThreadFactory {
162+
163+
private final AtomicInteger threadNumber = new AtomicInteger(1);
164+
private final String namePrefix;
165+
private final ThreadFactory originalThreadFactory;
166+
167+
private SnifferThreadFactory(String namePrefix) {
168+
this.namePrefix = namePrefix;
169+
this.originalThreadFactory = AccessController.doPrivileged(new PrivilegedAction<ThreadFactory>() {
170+
@Override
171+
public ThreadFactory run() {
172+
return Executors.defaultThreadFactory();
173+
}
174+
});
175+
}
176+
177+
@Override
178+
public Thread newThread(final Runnable r) {
179+
return AccessController.doPrivileged(new PrivilegedAction<Thread>() {
180+
@Override
181+
public Thread run() {
182+
Thread t = originalThreadFactory.newThread(r);
183+
t.setName(namePrefix + "[T#" + threadNumber.getAndIncrement() + "]");
184+
t.setDaemon(true);
185+
return t;
186+
}
187+
});
188+
}
189+
}
154190
}

0 commit comments

Comments
 (0)