Skip to content

Commit d78219a

Browse files
TheWorkshopComfhussonnois
TheWorkshopCom
authored andcommitted
feat(plugin): add built-in DateInFilenameFileListFilter
1 parent 9b65463 commit d78219a

File tree

5 files changed

+421
-0
lines changed

5 files changed

+421
-0
lines changed

connect-file-pulse-plugin/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,10 @@
381381
<groupId>io.rest-assured</groupId>
382382
<artifactId>json-path</artifactId>
383383
</dependency>
384+
<dependency>
385+
<groupId>org.assertj</groupId>
386+
<artifactId>assertj-core</artifactId>
387+
</dependency>
384388
<dependency>
385389
<groupId>io.streamthoughts</groupId>
386390
<artifactId>kafka-connect-filepulse-local-fs</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs.filter;
20+
21+
import static java.time.format.DateTimeFormatter.ISO_LOCAL_DATE;
22+
import static java.time.format.DateTimeFormatter.ofPattern;
23+
24+
import io.streamthoughts.kafka.connect.filepulse.fs.PredicateFileListFilter;
25+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
26+
import java.time.LocalDate;
27+
import java.time.format.DateTimeFormatter;
28+
import java.time.format.DateTimeParseException;
29+
import java.util.Map;
30+
import java.util.regex.Matcher;
31+
import java.util.regex.Pattern;
32+
import org.apache.kafka.common.config.AbstractConfig;
33+
import org.apache.kafka.common.config.ConfigDef;
34+
import org.apache.kafka.common.config.ConfigException;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
public class DateInFilenameFileListFilter extends PredicateFileListFilter {
39+
40+
private final static String GROUP = "DateInFilenameFileListFilter";
41+
42+
private static final Logger LOG = LoggerFactory.getLogger(DateInFilenameFileListFilter.class);
43+
44+
static final String FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_CONFIG =
45+
"file.filter.date.regex.extractor.pattern";
46+
47+
private static final String FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_DOC =
48+
"The regex pattern used to extract the date from the filename";
49+
50+
static final String FILE_FILTER_DATE_FORMATTER_PATTERN_CONFIG =
51+
"file.filter.date.formatter.pattern";
52+
53+
private static final String FILE_FILTER_DATE_FORMATTER_PATTERN_DOC =
54+
"The formatter pattern used to format the date extracted from the filename";
55+
56+
static final String FILE_FILTER_DATE_MIN_DATE_CONFIG =
57+
"file.filter.date.min.date";
58+
59+
private static final String FILE_FILTER_DATE_MIN_DATE_DOC =
60+
"The minimum date that the date extracted from the filename should match (filenameDate >= minDate)";
61+
62+
static final String FILE_FILTER_DATE_MAX_DATE_CONFIG =
63+
"file.filter.date.max.date";
64+
65+
private static final String FILE_FILTER_DATE_MAX_DATE_DOC =
66+
"The maximum date that the date extracted from the filename should match (filenameDate < maxDate)";
67+
68+
private Pattern pattern;
69+
70+
DateTimeFormatter dateFormatter;
71+
72+
LocalDate minDate;
73+
74+
LocalDate maxDate;
75+
76+
@Override
77+
public void configure(Map<String, ?> config) {
78+
AbstractConfig abstractConfig = new AbstractConfig(getConfigDef(), config, false);
79+
String pattern = abstractConfig.getString(FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_CONFIG);
80+
if (pattern == null) {
81+
throw new ConfigException("missing configuration: " + FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_CONFIG);
82+
}
83+
84+
setPattern(pattern);
85+
86+
dateFormatter = ofPattern(abstractConfig.getString(FILE_FILTER_DATE_FORMATTER_PATTERN_CONFIG));
87+
88+
String strMinDate = abstractConfig.getString(FILE_FILTER_DATE_MIN_DATE_CONFIG);
89+
String strMaxDate = abstractConfig.getString(FILE_FILTER_DATE_MAX_DATE_CONFIG);
90+
91+
if (strMinDate == null && strMaxDate == null) {
92+
throw new ConfigException("At least one of " + FILE_FILTER_DATE_MIN_DATE_CONFIG + " or " +
93+
FILE_FILTER_DATE_MAX_DATE_CONFIG + " should be specified");
94+
}
95+
96+
if (strMinDate != null) {
97+
minDate = DateValidator.parseDate(FILE_FILTER_DATE_MIN_DATE_CONFIG, strMinDate);
98+
}
99+
if (strMaxDate != null) {
100+
maxDate = DateValidator.parseDate(FILE_FILTER_DATE_MAX_DATE_CONFIG, strMaxDate);
101+
}
102+
}
103+
104+
private static ConfigDef getConfigDef() {
105+
DateValidator dateValidator = new DateValidator();
106+
int groupCounter = 0;
107+
return new ConfigDef()
108+
.define(FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_CONFIG,
109+
ConfigDef.Type.STRING,
110+
ConfigDef.Importance.HIGH,
111+
FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_DOC,
112+
GROUP,
113+
groupCounter++,
114+
ConfigDef.Width.NONE,
115+
FILE_FILTER_DATE_REGEX_EXTRACTOR_PATTERN_CONFIG)
116+
.define(FILE_FILTER_DATE_FORMATTER_PATTERN_CONFIG,
117+
ConfigDef.Type.STRING,
118+
"yyyy-MM-dd",
119+
ConfigDef.Importance.HIGH,
120+
FILE_FILTER_DATE_FORMATTER_PATTERN_DOC,
121+
GROUP,
122+
groupCounter++,
123+
ConfigDef.Width.NONE,
124+
FILE_FILTER_DATE_FORMATTER_PATTERN_CONFIG)
125+
.define(FILE_FILTER_DATE_MIN_DATE_CONFIG,
126+
ConfigDef.Type.STRING,
127+
null,
128+
dateValidator,
129+
ConfigDef.Importance.LOW,
130+
FILE_FILTER_DATE_MIN_DATE_DOC,
131+
GROUP,
132+
groupCounter++,
133+
ConfigDef.Width.NONE,
134+
FILE_FILTER_DATE_MIN_DATE_CONFIG)
135+
.define(FILE_FILTER_DATE_MAX_DATE_CONFIG,
136+
ConfigDef.Type.STRING,
137+
null,
138+
dateValidator,
139+
ConfigDef.Importance.LOW,
140+
FILE_FILTER_DATE_MAX_DATE_DOC,
141+
GROUP,
142+
groupCounter,
143+
ConfigDef.Width.NONE,
144+
FILE_FILTER_DATE_MAX_DATE_CONFIG)
145+
;
146+
}
147+
148+
@Override
149+
public boolean test(FileObjectMeta meta) {
150+
if (meta == null) {
151+
return false;
152+
}
153+
154+
Matcher matcher = pattern.matcher(meta.stringURI());
155+
boolean matched = false;
156+
157+
if (matcher.find() && matcher.groupCount() > 0) {
158+
matched = true;
159+
160+
String strDate = matcher.group(1);
161+
LocalDate filenameDate = LocalDate.parse(strDate, dateFormatter);
162+
163+
if (minDate != null) {
164+
matched = !filenameDate.isBefore(minDate);
165+
}
166+
if (maxDate != null) {
167+
matched &= filenameDate.isBefore(maxDate);
168+
}
169+
170+
if (!matched) {
171+
String msg = String.format("Date '%s' is not between boundaries [%s, %s)",
172+
filenameDate, minDate == null ? "-inf" : minDate, maxDate == null ? "+inf" : maxDate);
173+
LOG.debug(msg);
174+
}
175+
} else {
176+
LOG.debug("Cannot extract date from '" + meta.stringURI() + "' using regexp '" + pattern.toString() + "'");
177+
}
178+
179+
return matched;
180+
}
181+
182+
private void setPattern(String pattern) {
183+
this.pattern = Pattern.compile(pattern);
184+
}
185+
186+
public static class DateValidator implements ConfigDef.Validator {
187+
188+
@Override
189+
public void ensureValid(String name, Object value) {
190+
String date = (String)value;
191+
if (date != null) {
192+
if (date.isEmpty()) {
193+
throw new ConfigException(name, value, "Date must be non-empty");
194+
}
195+
196+
parseDate(name, date);
197+
}
198+
}
199+
200+
public static LocalDate parseDate(String configKey, String date) {
201+
try {
202+
return LocalDate.parse(date, ISO_LOCAL_DATE);
203+
} catch (DateTimeParseException e) {
204+
throw new ConfigException(configKey, date, "Cannot parse date: " + date);
205+
}
206+
}
207+
}
208+
}

0 commit comments

Comments
 (0)