Skip to content

Commit 5fb4b6e

Browse files
a.dekinfhussonnois
a.dekin
authored andcommitted
fix(filters): fix buffered records not being flushed (#667)
Resolves: #667
1 parent d8ca5ba commit 5fb4b6e

File tree

2 files changed

+109
-0
lines changed

2 files changed

+109
-0
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipeline.java

+13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
1212
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
1313
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
14+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
1415
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
1516
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
1617
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
@@ -89,6 +90,18 @@ public RecordsIterable<FileRecord<TypedStruct>> apply(final RecordsIterable<File
8990
// Apply the filter-chain on current record.
9091
results.addAll(apply(context, record.value(), doHasNext));
9192
}
93+
94+
// Flush all records buffered in the filter chain applying subsequent filters to each buffered record
95+
if (!hasNext && records.isEmpty()) {
96+
FilterNode node = rootNode;
97+
while (node != null) {
98+
List<FileRecord<TypedStruct>> flushed = node
99+
.flush(newContextFor(FileObjectOffset::empty, fileObjectObject.metadata()));
100+
results.addAll(flushed);
101+
node = node.onSuccess;
102+
}
103+
}
104+
92105
return new RecordsIterable<>(results);
93106
}
94107

connect-file-pulse-api/src/test/java/io/streamthoughts/kafka/connect/filepulse/filter/DefaultRecordFilterPipelineTest.java

+96
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Map;
2525
import org.apache.kafka.common.config.ConfigDef;
2626
import org.junit.Test;
27+
import org.junit.jupiter.api.Assertions;
2728

2829

2930
public class DefaultRecordFilterPipelineTest {
@@ -190,6 +191,101 @@ public void shouldNotFlushBufferedRecordsGivenNoAcceptFilterAndThereIsNoRemainin
190191
assertEquals(record2, records.collect().get(0));
191192
}
192193

194+
@Test
195+
public void shouldFlushBufferedRecordsGivenAcceptFilterEmptyRecordsIterableAndNoRemainingRecords() {
196+
197+
final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
198+
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");
199+
200+
List<FileRecord<TypedStruct>> bufferedRecords = List.of(record1, record2);
201+
TestFilter filter1 = new TestFilter()
202+
.setBuffer(bufferedRecords);
203+
204+
DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(Collections.singletonList(filter1));
205+
pipeline.init(context);
206+
207+
RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);
208+
209+
assertNotNull(records);
210+
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
211+
Assertions.assertIterableEquals(bufferedRecords, filteredRecords);
212+
}
213+
214+
@Test
215+
public void shouldFlushBufferedRecordsFromFirstFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() {
216+
217+
final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
218+
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");
219+
220+
List<FileRecord<TypedStruct>> bufferedRecords = List.of(record1, record2);
221+
TestFilter filter1 = new TestFilter()
222+
.setBuffer(bufferedRecords);
223+
TestFilter filter2 = new TestFilter()
224+
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)));
225+
226+
DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2));
227+
pipeline.init(context);
228+
229+
RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);
230+
231+
assertNotNull(records);
232+
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
233+
Assertions.assertIterableEquals(bufferedRecords, filteredRecords);
234+
}
235+
236+
@Test
237+
public void shouldFlushBufferedRecordsFromLastFilterGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() {
238+
239+
final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
240+
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");
241+
242+
List<FileRecord<TypedStruct>> bufferedRecords = List.of(record1, record2);
243+
TestFilter filter1 = new TestFilter();
244+
TestFilter filter2 = new TestFilter()
245+
.setBuffer(bufferedRecords);
246+
DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2));
247+
pipeline.init(context);
248+
249+
RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);
250+
251+
assertNotNull(records);
252+
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
253+
Assertions.assertIterableEquals(bufferedRecords, filteredRecords);
254+
}
255+
256+
@Test
257+
public void shouldFlushBufferedRecordsFromAllFiltersGivenAcceptFiltersEmptyRecordsIterableAndNoRemainingRecods() {
258+
259+
final FileRecord<TypedStruct> record1 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value1");
260+
final FileRecord<TypedStruct> record2 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value2");
261+
final FileRecord<TypedStruct> record3 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value3");
262+
final FileRecord<TypedStruct> record4 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value4");
263+
final FileRecord<TypedStruct> record5 = createWithOffsetAndValue(FileRecordOffset.invalid(), "value5");
264+
265+
List<FileRecord<TypedStruct>> allBuffered = List.of(record1, record2, record3, record4, record5);
266+
267+
List<FileRecord<TypedStruct>> bufferedRecords1 = List.of(record1);
268+
List<FileRecord<TypedStruct>> bufferedRecords2 = List.of(record2, record3);
269+
List<FileRecord<TypedStruct>> bufferedRecords3 = List.of(record4, record5);
270+
TestFilter filter1 = new TestFilter()
271+
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)))
272+
.setBuffer(bufferedRecords1);
273+
TestFilter filter2 = new TestFilter()
274+
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)))
275+
.setBuffer(bufferedRecords2);
276+
TestFilter filter3 = new TestFilter()
277+
.setFunction(((context1, record, hasNext) -> RecordsIterable.of(record)))
278+
.setBuffer(bufferedRecords3);
279+
DefaultRecordFilterPipeline pipeline = new DefaultRecordFilterPipeline(List.of(filter1, filter2, filter3));
280+
pipeline.init(context);
281+
282+
RecordsIterable<FileRecord<TypedStruct>> records = pipeline.apply(new RecordsIterable<>(), false);
283+
284+
assertNotNull(records);
285+
List<FileRecord<TypedStruct>> filteredRecords = records.collect();
286+
Assertions.assertIterableEquals(allBuffered, filteredRecords);
287+
}
288+
193289
@Test
194290
public void shouldReturnRecordUnchangedGivenNoFilter() {
195291

0 commit comments

Comments
 (0)