Skip to content

Commit da7fcb0

Browse files
mashhursjsvd
authored andcommitted
Name netty threads with plugin id and their purpose (logstash-plugins#229)
1 parent 3c0c18a commit da7fcb0

File tree

6 files changed

+43
-8
lines changed

6 files changed

+43
-8
lines changed

Diff for: CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 6.4.5
2+
- Name netty threads with plugin id and their purpose [229](https://github.com/logstash-plugins/logstash-input-tcp/pull/229)
3+
14
## 6.4.4
25
- update netty to 4.1.115 [#227](https://github.com/logstash-plugins/logstash-input-tcp/pull/227)
36

Diff for: lib/logstash/inputs/tcp.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ def register
181181
validate_ssl_config!
182182

183183
if server?
184-
@loop = InputLoop.new(@host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
184+
@loop = InputLoop.new(@id, @host, @port, DecoderImpl.new(@codec, self), @tcp_keep_alive, java_ssl_context)
185185
end
186186
end
187187

Diff for: logstash-input-tcp.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Gem::Specification.new do |s|
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
77
s.authors = ["Elastic"]
88
s.email = '[email protected]'
9-
s.homepage = "http://www.elastic.co/guide/en/logstash/current/index.html"
9+
s.homepage = "https://elastic.co/logstash"
1010
s.platform = "java"
1111
s.require_paths = ["lib", "vendor/jar-dependencies"]
1212

Diff for: src/main/java/org/logstash/tcp/InputLoop.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.io.IOException;
2222
import java.net.InetSocketAddress;
2323

24+
import static org.logstash.tcp.util.DaemonThreadFactory.daemonThreadFactory;
25+
2426
/**
2527
* Plain TCP Server Implementation.
2628
*/
@@ -66,13 +68,13 @@ public final class InputLoop implements Runnable, Closeable {
6668
* @param decoder {@link Decoder} provided by Jruby
6769
* @param keepAlive set to true to instruct the socket to issue TCP keep alive
6870
*/
69-
public InputLoop(final String host, final int port, final Decoder decoder, final boolean keepAlive,
71+
public InputLoop(final String id, final String host, final int port, final Decoder decoder, final boolean keepAlive,
7072
final SslContext sslContext) {
7173
this.sslContext = sslContext;
7274
this.host = host;
7375
this.port = port;
74-
worker = new NioEventLoopGroup();
75-
boss = new NioEventLoopGroup(1);
76+
boss = new NioEventLoopGroup(1, daemonThreadFactory(id + "-bossGroup"));
77+
worker = new NioEventLoopGroup(daemonThreadFactory(id + "-workGroup"));
7678
serverBootstrap = new ServerBootstrap().group(boss, worker)
7779
.channel(NioServerSocketChannel.class)
7880
.option(ChannelOption.SO_BACKLOG, 1024)
@@ -152,7 +154,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
152154
}
153155

154156
/**
155-
* Listeners that flushes the the JRuby supplied {@link Decoder} when the socket is closed.
157+
* Listeners that flushes the JRuby supplied {@link Decoder} when the socket is closed.
156158
*/
157159
private static final class FlushOnCloseListener implements GenericFutureListener<Future<Void>> {
158160

@@ -199,7 +201,7 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter {
199201
this.decoder = decoder;
200202
}
201203

202-
// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field
204+
// 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remote address field
203205
// corresponding interface updated
204206
@Override
205207
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package org.logstash.tcp.util;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class DaemonThreadFactory implements ThreadFactory {
7+
8+
final ThreadGroup group;
9+
final AtomicInteger threadNumber = new AtomicInteger(1);
10+
final String namePrefix;
11+
12+
DaemonThreadFactory(String namePrefix) {
13+
this.namePrefix = namePrefix;
14+
group = Thread.currentThread().getThreadGroup();
15+
}
16+
17+
@Override
18+
public Thread newThread(Runnable r) {
19+
Thread t = new Thread(group, r,
20+
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
21+
0);
22+
t.setDaemon(true);
23+
return t;
24+
}
25+
26+
public static ThreadFactory daemonThreadFactory(String namePrefix) {
27+
return new DaemonThreadFactory(namePrefix);
28+
}
29+
30+
}

Diff for: version

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
6.4.4
1+
6.4.5

0 commit comments

Comments
 (0)