Skip to content

Commit 7fec3c1

Browse files
TheWorkshopComfhussonnois
TheWorkshopCom
authored andcommitted
feat(filters): add built-in ExtractValueFilter
1 parent e854995 commit 7fec3c1

File tree

4 files changed

+424
-0
lines changed

4 files changed

+424
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.config;
20+
21+
import java.util.Map;
22+
import java.util.regex.Pattern;
23+
import org.apache.kafka.common.config.ConfigDef;
24+
25+
public class ExtractValueConfig extends CommonFilterConfig {
26+
public static final String REGEX_FIELD_CONFIG = "field";
27+
28+
private static final String REGEX_FIELD_CONFIG_DOC = "The field to extract the data from.";
29+
30+
public static final String EXTRACT_TARGET_CONFIG = "target";
31+
32+
private static final String EXTRACT_TARGET_DOC = "(Optional) The target field. If not defined, source field will be overwritten";
33+
34+
public static final String REGEX_CONFIG = "regex";
35+
36+
private static final String REGEX_CONFIG_DOC = "Regexp pattern applied to a field value to extract the desired value out of a specific field.";
37+
38+
public static final String REGEX_DEFAULT_VALUE_CONFIG = "default.value";
39+
40+
private static final String REGEX_DEFAULT_VALUE_CONFIG_DOC = "Default value applied when regex returns nothing.";
41+
42+
43+
public ExtractValueConfig(Map<?, ?> originals) {
44+
super(configDef(), originals);
45+
}
46+
47+
public static ConfigDef configDef() {
48+
return new ConfigDef()
49+
.define(REGEX_FIELD_CONFIG,
50+
ConfigDef.Type.STRING,
51+
ConfigDef.Importance.HIGH,
52+
REGEX_FIELD_CONFIG_DOC)
53+
.define(REGEX_CONFIG,
54+
ConfigDef.Type.STRING,
55+
ConfigDef.Importance.HIGH,
56+
REGEX_CONFIG_DOC)
57+
.define(REGEX_DEFAULT_VALUE_CONFIG,
58+
ConfigDef.Type.STRING,
59+
null,
60+
ConfigDef.Importance.HIGH,
61+
REGEX_DEFAULT_VALUE_CONFIG_DOC)
62+
.define(EXTRACT_TARGET_CONFIG,
63+
ConfigDef.Type.STRING,
64+
null,
65+
ConfigDef.Importance.MEDIUM,
66+
EXTRACT_TARGET_DOC);
67+
}
68+
69+
public Pattern pattern() {
70+
return Pattern.compile(getString(REGEX_CONFIG));
71+
}
72+
73+
public String getFieldName() {
74+
return getString(REGEX_FIELD_CONFIG);
75+
}
76+
77+
public String getDefaultValue() {
78+
return getString(REGEX_DEFAULT_VALUE_CONFIG);
79+
}
80+
81+
public String getTargetName() {
82+
return getString(EXTRACT_TARGET_CONFIG);
83+
}
84+
85+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.filter;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.config.ExtractValueConfig;
22+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
23+
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
24+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
25+
import java.util.Map;
26+
import java.util.Optional;
27+
import java.util.regex.Matcher;
28+
import org.apache.kafka.common.config.ConfigDef;
29+
30+
public class ExtractValueFilter extends AbstractRecordFilter<ExtractValueFilter> {
31+
32+
private ExtractValueConfig config;
33+
34+
@Override
35+
public ConfigDef configDef() {
36+
return ExtractValueConfig.configDef();
37+
}
38+
39+
@Override
40+
public void configure(Map<String, ?> configs) {
41+
super.configure(configs);
42+
this.config = new ExtractValueConfig(configs);
43+
}
44+
45+
@Override
46+
public RecordsIterable<TypedStruct> apply(FilterContext filterContext,
47+
TypedStruct record,
48+
boolean hasNext) throws FilterException {
49+
50+
String targetField = Optional.ofNullable(config.getTargetName()).orElse(config.getFieldName());
51+
52+
return Optional.ofNullable(record)
53+
.map(r -> r.get(config.getFieldName()))
54+
.map(fieldValue -> {
55+
56+
Matcher matcher = config.pattern().matcher(fieldValue.getString());
57+
58+
if (matcher.matches() && matcher.groupCount() > 0) {
59+
record.put(targetField, TypedValue.string(matcher.group(1)));
60+
} else {
61+
record.put(targetField, TypedValue.string(config.getDefaultValue()));
62+
}
63+
return record;
64+
})
65+
.map(RecordsIterable::of)
66+
.orElse(RecordsIterable.empty());
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.config;
20+
21+
import static io.streamthoughts.kafka.connect.filepulse.config.ExtractValueConfig.EXTRACT_TARGET_CONFIG;
22+
import static io.streamthoughts.kafka.connect.filepulse.config.ExtractValueConfig.REGEX_CONFIG;
23+
import static io.streamthoughts.kafka.connect.filepulse.config.ExtractValueConfig.REGEX_DEFAULT_VALUE_CONFIG;
24+
import static io.streamthoughts.kafka.connect.filepulse.config.ExtractValueConfig.REGEX_FIELD_CONFIG;
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertNull;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
import static org.junit.jupiter.api.Assertions.assertTrue;
29+
30+
import java.util.Map;
31+
import org.apache.kafka.common.config.ConfigException;
32+
import org.junit.jupiter.api.Test;
33+
34+
class ExtractValueConfigTest {
35+
36+
37+
@Test
38+
void when_all_fields_config_specified_config_ok() {
39+
ExtractValueConfig config = new ExtractValueConfig(Map.of(
40+
REGEX_FIELD_CONFIG, Fixture.fieldName,
41+
REGEX_CONFIG, Fixture.regex,
42+
REGEX_DEFAULT_VALUE_CONFIG, Fixture.defaultValue,
43+
EXTRACT_TARGET_CONFIG, Fixture.targetName
44+
));
45+
assertEquals(Fixture.fieldName, config.getFieldName());
46+
assertEquals(Fixture.regex, config.pattern().pattern());
47+
assertEquals(Fixture.defaultValue, config.getDefaultValue());
48+
assertEquals(Fixture.targetName, config.getTargetName());
49+
}
50+
51+
@Test
52+
void when_regex_field_config_missing_exception_expected() {
53+
ConfigException configException = assertThrows(
54+
ConfigException.class,
55+
() -> new ExtractValueConfig(Map.of(
56+
REGEX_CONFIG, Fixture.regex,
57+
REGEX_DEFAULT_VALUE_CONFIG, Fixture.defaultValue)));
58+
assertTrue(configException.getMessage().contains(REGEX_FIELD_CONFIG));
59+
}
60+
61+
@Test
62+
void when_regex_config_missing_exception_expected() {
63+
ConfigException configException = assertThrows(
64+
ConfigException.class,
65+
() -> new ExtractValueConfig(Map.of(
66+
REGEX_FIELD_CONFIG, Fixture.fieldName,
67+
REGEX_DEFAULT_VALUE_CONFIG, Fixture.defaultValue))
68+
);
69+
assertTrue(configException.getMessage().contains(REGEX_CONFIG));
70+
}
71+
72+
@Test
73+
void when_default_value_config_field_config_missing_null_expected() {
74+
ExtractValueConfig config = new ExtractValueConfig(Map.of(
75+
REGEX_FIELD_CONFIG, Fixture.fieldName,
76+
REGEX_CONFIG, Fixture.regex));
77+
assertEquals(Fixture.fieldName, config.getFieldName());
78+
assertEquals(Fixture.regex, config.pattern().pattern());
79+
assertNull(config.getDefaultValue());
80+
assertNull(config.getTargetName());
81+
}
82+
83+
@Test
84+
void when_target_field_config_missing_null_expected() {
85+
ExtractValueConfig config = new ExtractValueConfig(Map.of(
86+
REGEX_FIELD_CONFIG, Fixture.fieldName,
87+
REGEX_CONFIG, Fixture.regex,
88+
REGEX_DEFAULT_VALUE_CONFIG, Fixture.defaultValue
89+
));
90+
assertEquals(Fixture.fieldName, config.getFieldName());
91+
assertEquals(Fixture.regex, config.pattern().pattern());
92+
assertEquals(Fixture.defaultValue, config.getDefaultValue());
93+
assertNull(config.getTargetName());
94+
}
95+
96+
interface Fixture {
97+
String fieldName = "fieldA";
98+
String regex = "[a-z]";
99+
String defaultValue = "default";
100+
String targetName = "targetA";
101+
}
102+
}

0 commit comments

Comments
 (0)