Skip to content

Commit 2f68e50

Browse files
committed
Improve Back pressure Handling
Previously, when Logstash queues were blocked, this plugin would still accept new batches of events, but not send keep-alives back to the client if the work queue was full. This would cause each connected beats client to resend batches of work every time the TTL was breached. Also: Reduces the default number of beats executor threads from 4*number_of_processors to number_of_processors Includes fixes for #259 Fixes #299
1 parent 442f487 commit 2f68e50

File tree

8 files changed

+158
-75
lines changed

8 files changed

+158
-75
lines changed

lib/logstash/inputs/beats.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class LogStash::Inputs::Beats < LogStash::Inputs::Base
120120
config :client_inactivity_timeout, :validate => :number, :default => 60
121121

122122
# Beats handler executor thread
123-
config :executor_threads, :validate => :number, :default => LogStash::Config::CpuCoreStrategy.maximum * 4
123+
config :executor_threads, :validate => :number, :default => LogStash::Config::CpuCoreStrategy.maximum
124124

125125
def register
126126
# For Logstash 2.4 we need to make sure that the logger is correctly set for the

lib/logstash/inputs/beats/message_listener.rb

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,29 @@ def initialize(queue, input)
2727
end
2828

2929
def onNewMessage(ctx, message)
30-
hash = message.getData
31-
ip_address = ip_address(ctx)
30+
begin
31+
hash = message.getData
32+
ip_address = ip_address(ctx)
3233

33-
hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil?
34-
target_field = extract_target_field(hash)
34+
hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil?
35+
target_field = extract_target_field(hash)
3536

36-
if target_field.nil?
37-
event = LogStash::Event.new(hash)
38-
@nocodec_transformer.transform(event)
39-
@queue << event
40-
else
41-
codec(ctx).accept(CodecCallbackListener.new(target_field,
42-
hash,
43-
message.getIdentityStream(),
44-
@codec_transformer,
45-
@queue))
37+
if target_field.nil?
38+
event = LogStash::Event.new(hash)
39+
@nocodec_transformer.transform(event)
40+
@queue << event
41+
else
42+
codec(ctx).accept(CodecCallbackListener.new(target_field,
43+
hash,
44+
message.getIdentityStream(),
45+
@codec_transformer,
46+
@queue))
47+
end
48+
rescue => e
49+
logger.warn("Error handling message #{message}", e)
50+
raise e
51+
ensure
52+
message.release
4653
end
4754
end
4855

@@ -85,7 +92,7 @@ def ip_address(ctx)
8592
end
8693

8794
def register_connection(ctx)
88-
connections_list[ctx] = ConnectionState.new(ctx, input.codec.dup, ip_address_from_ctx(ctx))
95+
connections_list[ctx] = ConnectionState.new(ctx, input.codec.clone, ip_address_from_ctx(ctx))
8996
end
9097

9198
def ip_address_from_ctx(ctx)

src/main/java/org/logstash/beats/BeatsHandler.java

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,58 @@
11
package org.logstash.beats;
22

3-
import io.netty.channel.ChannelHandler;
43
import io.netty.channel.ChannelHandlerContext;
54
import io.netty.channel.SimpleChannelInboundHandler;
6-
import io.netty.handler.timeout.IdleState;
7-
import io.netty.handler.timeout.IdleStateEvent;
85
import io.netty.util.AttributeKey;
96
import org.apache.logging.log4j.LogManager;
107
import org.apache.logging.log4j.Logger;
11-
import org.apache.logging.log4j.ThreadContext;
128

139
import java.net.InetSocketAddress;
14-
import java.net.SocketAddress;
15-
import java.util.Map;
16-
import java.util.UUID;
17-
import java.util.concurrent.atomic.AtomicBoolean;
1810
import javax.net.ssl.SSLHandshakeException;
1911

20-
public class BeatsHandler extends SimpleChannelInboundHandler<Batch> {
12+
public class BeatsHandler extends SimpleChannelInboundHandler<NewBatch> {
2113
private final static Logger logger = LogManager.getLogger(BeatsHandler.class);
2214
public static AttributeKey<Boolean> PROCESSING_BATCH = AttributeKey.valueOf("processing-batch");
2315
private final IMessageListener messageListener;
2416
private ChannelHandlerContext context;
2517

2618

27-
2819
public BeatsHandler(IMessageListener listener) {
2920
messageListener = listener;
3021
}
3122

3223
@Override
33-
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
24+
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
25+
if (logger.isTraceEnabled()){
26+
logger.trace(format("Channel Active"));
27+
}
28+
ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(false);
29+
30+
super.channelActive(ctx);
3431
context = ctx;
3532
messageListener.onNewConnection(ctx);
36-
// Give some breathing room on new clients to receive the keep alive.
37-
ctx.channel().attr(PROCESSING_BATCH).set(false);
3833
}
3934

4035
@Override
41-
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
42-
ctx.channel().attr(PROCESSING_BATCH).set(false);
36+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
37+
super.channelInactive(ctx);
38+
if (logger.isTraceEnabled()){
39+
logger.trace(format("Channel Inactive"));
40+
}
41+
ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(false);
4342
messageListener.onConnectionClose(ctx);
4443
}
4544

4645

4746
@Override
4847
public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception {
49-
if(logger.isDebugEnabled()) {
50-
logger.debug(format("Received a new payload"));
48+
if(logger.isTraceEnabled()) {
49+
logger.trace(format("Received a new payload"));
5150
}
5251

53-
ctx.channel().attr(PROCESSING_BATCH).set(true);
52+
ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(true);
5453
for(Message message : batch.getMessages()) {
55-
if(logger.isDebugEnabled()) {
56-
logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
54+
if(logger.isTraceEnabled()) {
55+
logger.trace(format("Sending a new message for the listener, sequence: " + message.getSequence()));
5756
}
5857
messageListener.onNewMessage(ctx, message);
5958

@@ -65,6 +64,33 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio
6564
ctx.channel().attr(PROCESSING_BATCH).set(false);
6665
}
6766

67+
@Override
68+
public void channelRead0(ChannelHandlerContext ctx, NewBatch batch) throws Exception {
69+
if(logger.isDebugEnabled()) {
70+
logger.debug(format("Received a new payload"));
71+
}
72+
73+
ctx.channel().attr(PROCESSING_BATCH).set(true);
74+
batch.getMessageStream().forEach(e -> {
75+
messageListener.onNewMessage(ctx, e);
76+
if (needAck(e)){
77+
ack(ctx, e);
78+
}
79+
});
80+
// for(Message message : batch.getMessages()) {
81+
// if(logger.isDebugEnabled()) {
82+
// logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence()));
83+
// }
84+
// messageListener.onNewMessage(ctx, message);
85+
//
86+
// if(needAck(message)) {
87+
// ack(ctx, message);
88+
// }
89+
// }
90+
ctx.flush();
91+
ctx.channel().attr(PROCESSING_BATCH).set(false);
92+
}
93+
6894
/*
6995
* Do not propagate the SSL handshake exception down to the ruby layer handle it locally instead and close the connection
7096
* if the channel is still active. Calling `onException` will flush the content of the codec's buffer, this call
@@ -75,26 +101,33 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio
75101
* overlap Filebeat transmission; we were recommending multiline at the source in v5 and in v6 we enforce it.
76102
*/
77103
@Override
78-
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
79-
ctx.close();
80-
81-
if (!(cause instanceof SSLHandshakeException)) {
82-
messageListener.onException(ctx, cause);
83-
}
84-
85-
String causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage();
104+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
105+
try {
106+
if (!(cause instanceof SSLHandshakeException)) {
107+
messageListener.onException(ctx, cause);
108+
}
109+
String causeMessage = cause.getMessage() == null ? cause.getClass().toString() : cause.getMessage();
86110

87-
if (logger.isDebugEnabled()){
88-
logger.debug(format("Handling exception: " + causeMessage), cause);
111+
if (logger.isDebugEnabled()){
112+
logger.debug(format("Handling exception: " + causeMessage), cause);
113+
}
114+
logger.info(format("Handling exception: " + causeMessage));
115+
} finally{
116+
super.exceptionCaught(ctx, cause);
117+
ctx.flush();
118+
ctx.close();
89119
}
90-
logger.info(format("Handling exception: " + causeMessage));
91120
}
92121

93122
private boolean needAck(Message message) {
94-
return message.getSequence() == message.getBatch().getBatchSize();
123+
return message.getSequence() == message.getNewBatch().getBatchSize();
95124
}
96125

97126
private void ack(ChannelHandlerContext ctx, Message message) {
127+
logger.warn(format("Acking message number " + message.getSequence()));
128+
if (logger.isTraceEnabled()){
129+
logger.trace(format("Acking message number " + message.getSequence()));
130+
}
98131
writeAck(ctx, message.getBatch().getProtocol(), message.getSequence());
99132
}
100133

src/main/java/org/logstash/beats/BeatsParser.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class BeatsParser extends ByteToMessageDecoder {
2424
public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
2525
private final static Logger logger = LogManager.getLogger(BeatsParser.class);
2626

27-
private Batch batch = new Batch();
27+
private Batch batch;
2828

2929
private enum States {
3030
READ_HEADER(1),
@@ -56,6 +56,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
5656

5757
switch (currentState) {
5858
case READ_HEADER: {
59+
if (batch == null){
60+
batch = new Batch();
61+
}
5962
logger.trace("Running: READ_HEADER");
6063

6164
byte currentVersion = in.readByte();
@@ -67,7 +70,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
6770
logger.trace("Frame version 1 detected");
6871
batch.setProtocol(Protocol.VERSION_1);
6972
}
70-
7173
transition(States.READ_FRAME_TYPE);
7274
break;
7375
}
@@ -100,7 +102,6 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
100102
}
101103
case READ_WINDOW_SIZE: {
102104
logger.trace("Running: READ_WINDOW_SIZE");
103-
104105
batch.setBatchSize((int) in.readUnsignedInt());
105106

106107
// This is unlikely to happen but I have no way to known when a frame is
@@ -201,12 +202,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
201202
case READ_JSON: {
202203
logger.trace("Running: READ_JSON");
203204

204-
byte[] bytes = new byte[requiredBytes];
205-
in.readBytes(bytes);
206-
Message message = new Message(sequence, (Map) MAPPER.readValue(bytes, Object.class));
207-
208-
batch.addMessage(message);
209-
205+
batch.addMessage(new Message(sequence, in.readBytes(requiredBytes)));
210206
if(batch.size() == batch.getBatchSize()) {
211207
if(logger.isTraceEnabled()) {
212208
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
@@ -241,7 +237,8 @@ private void transition(States nextState, int requiredBytes) {
241237
private void batchComplete() {
242238
requiredBytes = 0;
243239
sequence = 0;
244-
batch = new Batch();
240+
batch = null;
241+
logger.warn("batch compete");
245242
}
246243

247244
public class InvalidFrameProtocolException extends Exception {

src/main/java/org/logstash/beats/KeepAliveHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
2121
if (e.state() == IdleState.WRITER_IDLE) {
2222
if (isProcessing(ctx)) {
2323
ChannelFuture f = ctx.writeAndFlush(new Ack(Protocol.VERSION_2, 0));
24+
logger.warn("sending keep alive ack to libbeat");
2425
if (logger.isTraceEnabled()) {
2526
logger.trace("sending keep alive ack to libbeat");
2627
f.addListener((ChannelFutureListener) future -> {
@@ -51,6 +52,6 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
5152
}
5253

5354
public boolean isProcessing(ChannelHandlerContext ctx) {
54-
return ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).get();
55+
return ctx.channel().hasAttr(BeatsHandler.PROCESSING_BATCH) && ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).get();
5556
}
5657
}

src/main/java/org/logstash/beats/Message.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,64 @@
11
package org.logstash.beats;
22

3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
5+
import io.netty.buffer.ByteBuf;
6+
import io.netty.buffer.ByteBufInputStream;
7+
import org.apache.logging.log4j.LogManager;
8+
import org.apache.logging.log4j.Logger;
9+
10+
import java.io.IOException;
311
import java.util.HashMap;
412
import java.util.Map;
513

614
public class Message implements Comparable<Message> {
715
private final int sequence;
816
private String identityStream;
9-
private final Map data;
17+
private Map data;
1018
private Batch batch;
19+
private ByteBuf buffer;
20+
private Logger logger = LogManager.getLogger(Message.class);
21+
22+
public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule());
1123

1224
public Message(int sequence, Map map) {
1325
this.sequence = sequence;
1426
this.data = map;
27+
}
1528

16-
identityStream = extractIdentityStream();
29+
public Message(int sequence, ByteBuf buffer){
30+
this.sequence = sequence;
31+
this.buffer = buffer;
1732
}
1833

1934
public int getSequence() {
2035
return sequence;
2136
}
2237

38+
public boolean release() {
39+
if (buffer != null){
40+
return buffer.release();
41+
}
42+
return true;
43+
}
44+
45+
public Map getData(){
46+
if (data == null && buffer != null){
47+
try (ByteBufInputStream byteBufInputStream = new ByteBufInputStream(buffer)){
48+
data = (Map)MAPPER.readValue(byteBufInputStream, Object.class);
49+
} catch (IOException e){
50+
throw new RuntimeException("Unable to parse beats payload ", e);
51+
}
52+
// finally{
53+
// release();
54+
// }
55+
}
2356

24-
public Map getData() {
2557
return data;
2658
}
2759

60+
private NewBatch newBatch;
61+
2862
@Override
2963
public int compareTo(Message o) {
3064
return Integer.compare(getSequence(), o.getSequence());
@@ -38,7 +72,19 @@ public void setBatch(Batch newBatch) {
3872
batch = newBatch;
3973
}
4074

75+
public NewBatch getNewBatch() {
76+
return newBatch;
77+
}
78+
79+
public void setNewBatch(NewBatch newnewBatch){
80+
this.newBatch = newnewBatch;
81+
}
82+
83+
4184
public String getIdentityStream() {
85+
if (identityStream == null){
86+
identityStream = extractIdentityStream();
87+
}
4288
return identityStream;
4389
}
4490

@@ -52,7 +98,7 @@ private String extractIdentityStream() {
5298
if(id != null && resourceId != null) {
5399
return id + "-" + resourceId;
54100
} else {
55-
return (String) beatsData.get("name") + "-" + (String) beatsData.get("source");
101+
return beatsData.get("name") + "-" + beatsData.get("source");
56102
}
57103
}
58104

0 commit comments

Comments
 (0)