Skip to content

Commit 44c79ad

Browse files
committed
Ported core part of original logstash-plugins#410 PR
1 parent 23f96d6 commit 44c79ad

File tree

1 file changed

+79
-5
lines changed

1 file changed

+79
-5
lines changed

src/main/java/org/logstash/beats/BeatsParser.java

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
import io.netty.buffer.ByteBuf;
55
import io.netty.buffer.ByteBufOutputStream;
6+
import io.netty.buffer.PooledByteBufAllocator;
67
import io.netty.channel.ChannelHandlerContext;
78
import io.netty.handler.codec.ByteToMessageDecoder;
9+
import io.netty.handler.codec.DecoderException;
810
import org.apache.logging.log4j.LogManager;
911
import org.apache.logging.log4j.Logger;
1012

@@ -14,12 +16,14 @@
1416
import java.util.HashMap;
1517
import java.util.List;
1618
import java.util.Map;
19+
import java.util.concurrent.TimeUnit;
1720
import java.util.zip.Inflater;
1821
import java.util.zip.InflaterOutputStream;
1922

2023

2124
public class BeatsParser extends ByteToMessageDecoder {
2225
private final static Logger logger = LogManager.getLogger(BeatsParser.class);
26+
private final static long maxDirectMemory = io.netty.util.internal.PlatformDependent.maxDirectMemory();
2327

2428
private Batch batch;
2529

@@ -45,15 +49,18 @@ private enum States {
4549
private int requiredBytes = 0;
4650
private int sequence = 0;
4751
private boolean decodingCompressedBuffer = false;
52+
private long usedDirectMemory;
53+
private boolean closeCalled = false;
4854

4955
@Override
5056
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws InvalidFrameProtocolException, IOException {
51-
if(!hasEnoughBytes(in)) {
52-
if (decodingCompressedBuffer){
57+
if (!hasEnoughBytes(in)) {
58+
if (decodingCompressedBuffer) {
5359
throw new InvalidFrameProtocolException("Insufficient bytes in compressed content to decode: " + currentState);
5460
}
5561
return;
5662
}
63+
usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory();
5764

5865
switch (currentState) {
5966
case READ_HEADER: {
@@ -182,6 +189,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
182189

183190
case READ_COMPRESSED_FRAME: {
184191
logger.trace("Running: READ_COMPRESSED_FRAME");
192+
193+
if (usedDirectMemory + requiredBytes > maxDirectMemory * 0.90) {
194+
ctx.channel().config().setAutoRead(false);
195+
ctx.close();
196+
closeCalled = true;
197+
throw new IOException("not enough memory to decompress this from " + ctx.channel().id());
198+
}
185199
inflateCompressedFrame(ctx, in, (buffer) -> {
186200
transition(States.READ_HEADER);
187201

@@ -192,16 +206,20 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
192206
}
193207
} finally {
194208
decodingCompressedBuffer = false;
209+
ctx.channel().config().setAutoRead(false);
210+
ctx.channel().eventLoop().schedule(() -> {
211+
ctx.channel().config().setAutoRead(true);
212+
}, 5, TimeUnit.MILLISECONDS);
195213
transition(States.READ_HEADER);
196214
}
197215
});
198216
break;
199217
}
200218
case READ_JSON: {
201219
logger.trace("Running: READ_JSON");
202-
((V2Batch)batch).addMessage(sequence, in, requiredBytes);
203-
if(batch.isComplete()) {
204-
if(logger.isTraceEnabled()) {
220+
((V2Batch) batch).addMessage(sequence, in, requiredBytes);
221+
if (batch.isComplete()) {
222+
if (logger.isTraceEnabled()) {
205223
logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence);
206224
}
207225
out.add(batch);
@@ -260,6 +278,62 @@ private void batchComplete() {
260278
batch = null;
261279
}
262280

281+
@Override
282+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
283+
//System.out.println("channelRead(" + ctx.channel().isActive() + ": " + ctx.channel().id() + ":" + currentState + ":" + decodingCompressedBuffer);
284+
if (closeCalled) {
285+
((ByteBuf) msg).release();
286+
//if(batch != null) batch.release();
287+
return;
288+
}
289+
usedDirectMemory = ((PooledByteBufAllocator) ctx.alloc()).metric().usedDirectMemory();
290+
291+
// If we're just beginning a new frame on this channel,
292+
// don't accumulate more data for 25 ms if usage of direct memory is above 20%
293+
//
294+
// The goal here is to avoid thundering herd: many beats connecting and sending data
295+
// at the same time. As some channels progress to other states they'll use more memory
296+
// but also give it back once a full batch is read.
297+
if ((!decodingCompressedBuffer) && (this.currentState != States.READ_COMPRESSED_FRAME)) {
298+
if (usedDirectMemory > (maxDirectMemory * 0.40)) {
299+
ctx.channel().config().setAutoRead(false);
300+
//System.out.println("pausing reads on " + ctx.channel().id());
301+
ctx.channel().eventLoop().schedule(() -> {
302+
//System.out.println("resuming reads on " + ctx.channel().id());
303+
ctx.channel().config().setAutoRead(true);
304+
}, 200, TimeUnit.MILLISECONDS);
305+
} else {
306+
//System.out.println("no need to pause reads on " + ctx.channel().id());
307+
}
308+
} else if (usedDirectMemory > maxDirectMemory * 0.90) {
309+
ctx.channel().config().setAutoRead(false);
310+
ctx.close();
311+
closeCalled = true;
312+
((ByteBuf) msg).release();
313+
if (batch != null) batch.release();
314+
throw new IOException("about to explode, cut them all down " + ctx.channel().id());
315+
}
316+
super.channelRead(ctx, msg);
317+
}
318+
319+
@Override
320+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
321+
System.out.println(cause.getClass().toString() + ":" + ctx.channel().id().toString() + ":" + this.currentState + "|" + cause.getMessage());
322+
if (cause instanceof DecoderException) {
323+
ctx.channel().config().setAutoRead(false);
324+
if (!closeCalled) ctx.close();
325+
} else if (cause instanceof OutOfMemoryError) {
326+
cause.printStackTrace();
327+
ctx.channel().config().setAutoRead(false);
328+
if (!closeCalled) ctx.close();
329+
} else if (cause instanceof IOException) {
330+
ctx.channel().config().setAutoRead(false);
331+
if (!closeCalled) ctx.close();
332+
} else {
333+
super.exceptionCaught(ctx, cause);
334+
}
335+
}
336+
263337
@FunctionalInterface
264338
private interface CheckedConsumer<T> {
265339
void accept(T t) throws IOException;

0 commit comments

Comments
 (0)