Skip to content

Commit 6752c23

Browse files
TheWorkshopComfhussonnois
TheWorkshopCom
authored andcommitted
feat(filters): add built-in ExcludeFieldsMatchingPatternsFilter
1 parent d5375b7 commit 6752c23

File tree

3 files changed

+256
-0
lines changed

3 files changed

+256
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.config;
20+
21+
import java.util.Map;
22+
import java.util.regex.Pattern;
23+
import org.apache.kafka.common.config.ConfigDef;
24+
25+
public class ExcludeFieldsMatchingPatternsConfig extends CommonFilterConfig {
26+
public static final String EXCLUDE_FIELDS_REGEX_CONFIG = "regex";
27+
28+
private static final String EXCLUDE_FIELDS_REGEX_CONFIG_DOC = "Regexp pattern applied to a field value to determine if the fields should be propagated or not.";
29+
30+
public static final String EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG = "block.field";
31+
32+
private static final String EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG_DOC = "If true, omits propagating the field downstream. Otherwise, propagates the field with a null value";
33+
34+
public ExcludeFieldsMatchingPatternsConfig(Map<?, ?> originals) {
35+
super(configDef(), originals);
36+
}
37+
38+
public static ConfigDef configDef() {
39+
return new ConfigDef()
40+
.define(EXCLUDE_FIELDS_REGEX_CONFIG,
41+
ConfigDef.Type.STRING,
42+
ConfigDef.Importance.HIGH,
43+
EXCLUDE_FIELDS_REGEX_CONFIG_DOC)
44+
.define(EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG,
45+
ConfigDef.Type.BOOLEAN,
46+
false,
47+
ConfigDef.Importance.HIGH,
48+
EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG_DOC);
49+
}
50+
51+
public Pattern pattern() {
52+
return Pattern.compile(this.getString(EXCLUDE_FIELDS_REGEX_CONFIG));
53+
}
54+
55+
public boolean blockField() {
56+
return this.getBoolean(EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG);
57+
}
58+
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright 2019-2020 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.filter;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternsConfig;
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
23+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
24+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.regex.Matcher;
28+
import org.apache.kafka.common.config.ConfigDef;
29+
30+
public class ExcludeFieldsMatchingPatternsFilter extends AbstractRecordFilter<ExcludeFieldsMatchingPatternsFilter> {
31+
32+
private ExcludeFieldsMatchingPatternsConfig config;
33+
34+
@Override
35+
public ConfigDef configDef() {
36+
return ExcludeFieldsMatchingPatternsConfig.configDef();
37+
}
38+
39+
@Override
40+
public void configure(Map<String, ?> configs) {
41+
super.configure(configs);
42+
this.config = new ExcludeFieldsMatchingPatternsConfig(configs);
43+
}
44+
45+
@Override
46+
public RecordsIterable<TypedStruct> apply(FilterContext filterContext,
47+
TypedStruct record,
48+
boolean hasNext) throws FilterException {
49+
50+
return Optional.ofNullable(record)
51+
.stream()
52+
.peek(r -> r.schema().fields()
53+
.stream()
54+
.forEach(typedField -> {
55+
Optional.ofNullable(r.get(typedField.name()).getString())
56+
.ifPresentOrElse(fieldValue -> {
57+
Matcher matcher = this.config.pattern().matcher(fieldValue);
58+
if (matcher.matches() && !config.blockField()) {
59+
r.put(typedField.name(), TypedValue.string(null));
60+
} else if (matcher.matches()) {
61+
r.remove(typedField.name());
62+
}
63+
}, () -> {
64+
if (!config.blockField()) {
65+
r.put(typedField.name(), TypedValue.string(null));
66+
} else {
67+
r.remove(typedField.name());
68+
}
69+
});
70+
71+
72+
}))
73+
.findFirst()
74+
.map(RecordsIterable::of)
75+
.orElse(RecordsIterable.empty());
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.filter;
20+
21+
import static io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternsConfig.EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG;
22+
import static io.streamthoughts.kafka.connect.filepulse.config.ExcludeFieldsMatchingPatternsConfig.EXCLUDE_FIELDS_REGEX_CONFIG;
23+
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
import static org.junit.jupiter.api.Assertions.assertFalse;
25+
import static org.junit.jupiter.api.Assertions.assertNotNull;
26+
import static org.junit.jupiter.api.Assertions.assertNull;
27+
import static org.junit.jupiter.api.Assertions.assertTrue;
28+
import static org.mockito.Mockito.mock;
29+
30+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
31+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
32+
import java.util.Map;
33+
import org.junit.jupiter.api.Test;
34+
35+
class ExcludeFieldsMatchingPatternsFilterTest {
36+
37+
@Test
38+
void when_record_null_apply_should_return_empty_iterable() {
39+
FilterContext context = mock(FilterContext.class);
40+
ExcludeFieldsMatchingPatternsFilter filter = new ExcludeFieldsMatchingPatternsFilter();
41+
filter.configure(Map.of(EXCLUDE_FIELDS_REGEX_CONFIG, "[a-z]"));
42+
43+
RecordsIterable<TypedStruct> iterable = filter.apply(context, null, false);
44+
assertNotNull(iterable);
45+
assertEquals(0, iterable.size());
46+
}
47+
48+
@Test
49+
void when_record_apply_should_propagate_null_for_fields_matching_regex_and_propagate_all_other_fields() {
50+
FilterContext context = mock(FilterContext.class);
51+
ExcludeFieldsMatchingPatternsFilter filter = new ExcludeFieldsMatchingPatternsFilter();
52+
filter.configure(Map.of(EXCLUDE_FIELDS_REGEX_CONFIG, Fixture.nullFieldRegex));
53+
54+
TypedStruct typedStruct = TypedStruct.create()
55+
.put(Fixture.fieldA, Fixture.fieldAValue)
56+
.put(Fixture.fieldB, Fixture.fieldBValue)
57+
.put(Fixture.fieldC, Fixture.fieldCValue)
58+
.put(Fixture.fieldD, Fixture.fieldDValue);
59+
60+
RecordsIterable<TypedStruct> res = filter.apply(context, typedStruct, false);
61+
assertEquals(1, res.size());
62+
63+
TypedStruct record = res.last();
64+
65+
assertTrue(record.has(Fixture.fieldA));
66+
assertTrue(record.has(Fixture.fieldB));
67+
assertTrue(record.has(Fixture.fieldC));
68+
assertTrue(record.has(Fixture.fieldD));
69+
70+
assertEquals(Fixture.fieldAValue, record.get(Fixture.fieldA).getString());
71+
assertNull(record.get(Fixture.fieldB).getString());
72+
assertEquals(Fixture.fieldCValue, record.get(Fixture.fieldC).getString());
73+
assertNull(record.get(Fixture.fieldD).getString());
74+
75+
}
76+
77+
@Test
78+
void when_record_apply_should_block_fields_matching_regex_and_propagate_all_other_fields() {
79+
FilterContext context = mock(FilterContext.class);
80+
ExcludeFieldsMatchingPatternsFilter filter = new ExcludeFieldsMatchingPatternsFilter();
81+
filter.configure(Map.of(EXCLUDE_FIELDS_REGEX_CONFIG, Fixture.nullFieldRegex,
82+
EXCLUDE_FIELDS_BLOCK_FIELD_CONFIG, "true"));
83+
84+
TypedStruct typedStruct = TypedStruct.create()
85+
.put(Fixture.fieldA, Fixture.fieldAValue)
86+
.put(Fixture.fieldB, Fixture.fieldBValue)
87+
.put(Fixture.fieldC, Fixture.fieldCValue)
88+
.put(Fixture.fieldD, Fixture.fieldDValue);
89+
90+
RecordsIterable<TypedStruct> res = filter.apply(context, typedStruct, false);
91+
assertEquals(1, res.size());
92+
93+
TypedStruct record = res.last();
94+
95+
assertTrue(record.has(Fixture.fieldA));
96+
assertFalse(record.has(Fixture.fieldB));
97+
assertTrue(record.has(Fixture.fieldC));
98+
assertFalse(record.has(Fixture.fieldD));
99+
100+
assertEquals(Fixture.fieldAValue, record.get(Fixture.fieldA).getString());
101+
assertEquals(Fixture.fieldCValue, record.get(Fixture.fieldC).getString());
102+
}
103+
104+
interface Fixture {
105+
String nullFieldRegex = "null";
106+
107+
String fieldA = "fieldA";
108+
String fieldB = "fieldB";
109+
String fieldC = "fieldC";
110+
111+
String fieldD = "fieldD";
112+
113+
String fieldAValue = "2021-01-010 14:12";
114+
String fieldBValue = "null";
115+
String fieldCValue = "Hello";
116+
117+
String fieldDValue = null;
118+
119+
}
120+
}

0 commit comments

Comments
 (0)