Skip to content

Commit c876305

Browse files
committed
Improved V2Batch: iterators don't change V2Batch internal state any more. It now implement Closable, for auto-release.
1 parent 95b94d3 commit c876305

File tree

2 files changed

+62
-50
lines changed

2 files changed

+62
-50
lines changed

src/main/java/org/logstash/beats/V2Batch.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
package org.logstash.beats;
22

3+
import java.io.Closeable;
4+
import java.util.Iterator;
5+
import java.util.NoSuchElementException;
6+
37
import io.netty.buffer.ByteBuf;
48
import io.netty.buffer.PooledByteBufAllocator;
59

6-
import java.util.Iterator;
7-
810
/**
911
* Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use.
1012
*/
11-
public class V2Batch implements Batch {
13+
public class V2Batch implements Batch, Closeable {
1214
private ByteBuf internalBuffer = PooledByteBufAllocator.DEFAULT.buffer();
1315
private int written = 0;
14-
private int read = 0;
1516
private static final int SIZE_OF_INT = 4;
1617
private int batchSize;
1718

@@ -26,20 +27,27 @@ public byte getProtocol() {
2627
return Protocol.VERSION_2;
2728
}
2829

29-
public Iterator<Message> iterator(){
30-
internalBuffer.resetReaderIndex();
30+
public Iterator<Message> iterator() {
3131
return new Iterator<Message>() {
32+
private int read = 0;
33+
private ByteBuf readerBuffer = internalBuffer.asReadOnly();
34+
{
35+
readerBuffer.resetReaderIndex();
36+
}
3237
@Override
3338
public boolean hasNext() {
3439
return read < written;
3540
}
3641

3742
@Override
3843
public Message next() {
39-
int sequenceNumber = internalBuffer.readInt();
40-
int readableBytes = internalBuffer.readInt();
41-
Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes));
42-
internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes);
44+
if (read >= written) {
45+
throw new NoSuchElementException();
46+
}
47+
int sequenceNumber = readerBuffer.readInt();
48+
int readableBytes = readerBuffer.readInt();
49+
Message message = new Message(sequenceNumber, readerBuffer.slice(readerBuffer.readerIndex(), readableBytes));
50+
readerBuffer.readerIndex(readerBuffer.readerIndex() + readableBytes);
4351
message.setBatch(V2Batch.this);
4452
read++;
4553
return message;
@@ -92,4 +100,10 @@ void addMessage(int sequenceNumber, ByteBuf buffer, int size) {
92100
public void release() {
93101
internalBuffer.release();
94102
}
103+
104+
@Override
105+
public void close() {
106+
release();
107+
}
108+
95109
}

src/test/java/org/logstash/beats/V2BatchTest.java

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -19,48 +19,49 @@ public class V2BatchTest {
1919

2020
@Test
2121
public void testIsEmpty() {
22-
V2Batch batch = new V2Batch();
23-
assertTrue(batch.isEmpty());
24-
ByteBuf content = messageContents();
25-
batch.addMessage(1, content, content.readableBytes());
26-
assertFalse(batch.isEmpty());
22+
try (V2Batch batch = new V2Batch()){
23+
assertTrue(batch.isEmpty());
24+
ByteBuf content = messageContents();
25+
batch.addMessage(1, content, content.readableBytes());
26+
assertFalse(batch.isEmpty());
27+
}
2728
}
2829

2930
@Test
3031
public void testSize() {
31-
V2Batch batch = new V2Batch();
32-
assertEquals(0, batch.size());
33-
ByteBuf content = messageContents();
34-
batch.addMessage(1, content, content.readableBytes());
35-
assertEquals(1, batch.size());
32+
try (V2Batch batch = new V2Batch()) {
33+
assertEquals(0, batch.size());
34+
ByteBuf content = messageContents();
35+
batch.addMessage(1, content, content.readableBytes());
36+
assertEquals(1, batch.size());
37+
}
3638
}
3739

3840
@Test
39-
public void TestGetProtocol() {
40-
assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol());
41+
public void testGetProtocol() {
42+
try (V2Batch batch = new V2Batch()) {
43+
assertEquals(Protocol.VERSION_2, batch.getProtocol());
44+
}
4145
}
4246

4347
@Test
44-
public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
45-
V2Batch batch = new V2Batch();
46-
int numberOfEvent = 2;
47-
48-
batch.setBatchSize(numberOfEvent);
49-
50-
for(int i = 1; i <= numberOfEvent; i++) {
51-
ByteBuf content = messageContents();
52-
batch.addMessage(i, content, content.readableBytes());
48+
public void testCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
49+
try (V2Batch batch = new V2Batch()) {
50+
int numberOfEvent = 2;
51+
batch.setBatchSize(numberOfEvent);
52+
for (int i = 1; i <= numberOfEvent; i++) {
53+
ByteBuf content = messageContents();
54+
batch.addMessage(i, content, content.readableBytes());
55+
}
56+
assertTrue(batch.isComplete());
5357
}
54-
55-
assertTrue(batch.isComplete());
5658
}
5759

5860
@Test
5961
public void testBigBatch() {
60-
V2Batch batch = new V2Batch();
61-
int size = 4096;
62-
assertEquals(0, batch.size());
63-
try {
62+
try (V2Batch batch = new V2Batch()) {
63+
int size = 4096;
64+
assertEquals(0, batch.size());
6465
ByteBuf content = messageContents();
6566
for (int i = 0; i < size; i++) {
6667
batch.addMessage(i, content, content.readableBytes());
@@ -70,22 +71,19 @@ public void testBigBatch() {
7071
for (Message message : batch) {
7172
assertEquals(message.getSequence(), i++);
7273
}
73-
}finally {
74-
batch.release();
7574
}
76-
}
7775

76+
}
7877

7978
@Test
80-
public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
81-
V2Batch batch = new V2Batch();
82-
int numberOfEvent = 2;
83-
84-
batch.setBatchSize(numberOfEvent);
85-
ByteBuf content = messageContents();
86-
batch.addMessage(1, content, content.readableBytes());
87-
88-
assertFalse(batch.isComplete());
79+
public void testCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
80+
try (V2Batch batch = new V2Batch()) {
81+
int numberOfEvent = 2;
82+
batch.setBatchSize(numberOfEvent);
83+
ByteBuf content = messageContents();
84+
batch.addMessage(1, content, content.readableBytes());
85+
assertFalse(batch.isComplete());
86+
}
8987
}
9088

9189
public static ByteBuf messageContents() {
@@ -98,4 +96,4 @@ public static ByteBuf messageContents() {
9896
throw new RuntimeException(e);
9997
}
10098
}
101-
}
99+
}

0 commit comments

Comments
 (0)