Skip to content

Commit 1aee27a

Browse files
TheWorkshopComfhussonnois
TheWorkshopCom
authored andcommitted
fix(filters): fix fields by index logic
1 parent 636e73c commit 1aee27a

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

connect-file-pulse-api/src/main/java/io/streamthoughts/kafka/connect/filepulse/data/StructSchema.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,7 @@ public List<TypedField> fields() {
109109

110110
public List<TypedField> fieldsByIndex() {
111111
ArrayList<TypedField> ordered = new ArrayList<>(fields.values());
112-
// order elements in array to match field column index
113-
for (TypedField field: fields.values()) {
114-
ordered.add(field.index(),field);
115-
}
112+
ordered.sort(Comparator.comparing(TypedField::index));
116113
return ordered;
117114
}
118115

connect-file-pulse-filters/src/test/java/io/streamthoughts/kafka/connect/filepulse/config/DelimitedRowFilterConfigTest.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -67,20 +67,29 @@ public void shouldCreateConfigGivenOverrideValues() {
6767
public void shouldCreateConfigGivenSchema() {
6868

6969
Map<String, String> props = new HashMap<>() {{
70-
put(READER_FIELD_COLUMNS_CONFIG, "field1:BOOLEAN;field2:INT32;field3:STRING");
70+
put(READER_FIELD_COLUMNS_CONFIG, "x1:BOOLEAN;c2:INT32;y3:STRING");
7171
}};
7272

7373
DelimitedRowFilterConfig config = new DelimitedRowFilterConfig(filter.configDef(), props);
7474

7575
StructSchema schema = config.schema();
7676
Assert.assertNotNull(schema);
7777

78+
// Fields ordered by name (default)
7879
List<TypedField> fields = schema.fields();
7980
Assert.assertEquals(3, fields.size());
8081

81-
Assert.assertEquals("field1", fields.get(0).name());
82-
Assert.assertEquals("field2", fields.get(1).name());
83-
Assert.assertEquals("field3", fields.get(2).name());
82+
Assert.assertEquals("c2", fields.get(0).name());
83+
Assert.assertEquals("x1", fields.get(1).name());
84+
Assert.assertEquals("y3", fields.get(2).name());
85+
86+
// Fields ordered by index
87+
List<TypedField> fieldsByIndex = schema.fieldsByIndex();
88+
Assert.assertEquals(3, fieldsByIndex.size());
89+
90+
Assert.assertEquals("x1", fieldsByIndex.get(0).name());
91+
Assert.assertEquals("c2", fieldsByIndex.get(1).name());
92+
Assert.assertEquals("y3", fieldsByIndex.get(2).name());
8493
}
8594

8695
}

0 commit comments

Comments
 (0)