Skip to content

Commit 3a12e7e

Browse files
committed
chore: fix checkstyle
1 parent dbe079c commit 3a12e7e

File tree

7 files changed

+77
-44
lines changed

7 files changed

+77
-44
lines changed

connect-file-pulse-filesystems/filepulse-aliyunoss-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AliyunOSSParquetFileInputReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
import java.net.URI;
2525

2626
/**
27-
* The {@code AliyunOSSParquetFileInputReader} can be used to created records from a parquet file loaded from Aliyun OSS.
27+
* The {@code AliyunOSSParquetFileInputReader} can be used to
28+
* created records from a parquet file loaded from Aliyun OSS.
2829
*/
2930
public class AliyunOSSParquetFileInputReader extends BaseAliyunOSSInputReader {
3031

connect-file-pulse-filesystems/filepulse-amazons3-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/AmazonS3ParquetInputReaderTest.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@
1515
*/
1616
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
1717

18+
import static org.junit.jupiter.api.Assertions.assertThrows;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
1823
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
1924
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
2025
import io.streamthoughts.kafka.connect.filepulse.fs.BaseAmazonS3Test;
@@ -32,11 +37,6 @@
3237
import org.junit.Before;
3338
import org.junit.Test;
3439

35-
import static org.junit.jupiter.api.Assertions.assertThrows;
36-
import static org.mockito.ArgumentMatchers.any;
37-
import static org.mockito.Mockito.mock;
38-
import static org.mockito.Mockito.when;
39-
4040
public class AmazonS3ParquetInputReaderTest extends BaseAmazonS3Test {
4141

4242
private static final String FILE_NAME = "src/test/resources/test.snappy.parquet";

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/parquet/ParquetFileInputIterator.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@
3434
import org.apache.parquet.example.data.simple.SimpleGroup;
3535
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
3636
import org.apache.parquet.hadoop.ParquetFileReader;
37-
import org.apache.parquet.io.*;
37+
import org.apache.parquet.io.ColumnIOFactory;
38+
import org.apache.parquet.io.MessageColumnIO;
39+
import org.apache.parquet.io.RecordReader;
3840
import org.apache.parquet.schema.MessageType;
3941
import org.slf4j.Logger;
4042
import org.slf4j.LoggerFactory;

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/parquet/ParquetInputFile.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
*/
1616
package io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet;
1717

18-
import java.io.*;
19-
18+
import java.io.ByteArrayInputStream;
19+
import java.io.IOException;
20+
import java.io.InputStream;
2021
import org.apache.hadoop.fs.FSDataInputStream;
2122
import org.apache.hadoop.fs.PositionedReadable;
2223
import org.apache.hadoop.fs.Seekable;

connect-file-pulse-filesystems/filepulse-commons-fs/src/main/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/parquet/ParquetTypedStructConverter.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,21 @@
1515
*/
1616
package io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet;
1717

18-
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.*;
18+
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.bool;
19+
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.float32;
20+
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.float64;
21+
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.int32;
22+
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.int64;
23+
import static io.streamthoughts.kafka.connect.filepulse.data.TypedValue.string;
1924

2025
import io.streamthoughts.kafka.connect.filepulse.data.Type;
2126
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
2227
import io.streamthoughts.kafka.connect.filepulse.data.TypedValue;
23-
24-
import java.util.*;
25-
28+
import java.util.ArrayList;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Optional;
2633
import org.apache.commons.lang3.function.TriFunction;
2734
import org.apache.parquet.example.data.Group;
2835
import org.apache.parquet.example.data.simple.SimpleGroup;
@@ -66,7 +73,9 @@ public static TypedStruct fromParquetFileReader(final SimpleGroup simpleGroup) {
6673
GroupType group = simpleGroup.getType();
6774
for (int i = 0; i < group.getFieldCount(); i++) {
6875
org.apache.parquet.schema.Type field = group.getType(i);
69-
String filedType = field instanceof PrimitiveType ? field.asPrimitiveType().getPrimitiveTypeName().name() : field.getLogicalTypeAnnotation().toString();
76+
String filedType = field instanceof PrimitiveType ?
77+
field.asPrimitiveType().getPrimitiveTypeName().name() :
78+
field.getLogicalTypeAnnotation().toString();
7079
struct.put(field.getName(), getTypedValueFromSimpleGroup(filedType, simpleGroup, i));
7180
}
7281
return struct;
@@ -91,7 +100,10 @@ private static TypedValue list(String fieldName, SimpleGroup simpleGroup, int i)
91100
//Get a group of a list element
92101
Group subGroup = group.getGroup(0, k);
93102
//Get the name of the field type of the list element
94-
String fieldTypeString = subGroup.getType().getType(0).asPrimitiveType().getPrimitiveTypeName().name();
103+
String fieldTypeString = subGroup
104+
.getType()
105+
.getType(0)
106+
.asPrimitiveType().getPrimitiveTypeName().name();
95107
//Convert list element into TypedValue
96108
TypedValue value = getTypedValueFromSimpleGroup(fieldTypeString, (SimpleGroup) subGroup, 0);
97109
//Get the array element type, which will be the list type

connect-file-pulse-filesystems/filepulse-commons-fs/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/reader/parquet/ParquetTypedStructConverterTest.java

+28-21
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,16 @@
1515
*/
1616
package io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet;
1717

18+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
19+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
20+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
21+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
22+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
23+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
24+
import static org.apache.parquet.schema.Type.Repetition.REPEATED;
25+
1826
import java.util.ArrayList;
1927
import java.util.List;
20-
2128
import org.apache.parquet.example.data.simple.SimpleGroup;
2229
import org.apache.parquet.schema.*;
2330
import org.junit.Assert;
@@ -37,19 +44,19 @@ public class ParquetTypedStructConverterTest {
3744

3845
@Test
3946
public void check_string_value_converter() {
40-
var stringValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.BINARY, "test");
47+
var stringValue = new PrimitiveType(REPEATED, BINARY, "test");
4148
listType.add(stringValue);
42-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
49+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
4350
simpleGroup.add(0, STRING_VALUE);
4451
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
4552

4653
Assert.assertEquals(STRING_VALUE, typedStruct.get("test").getString());
4754
}
4855
@Test
4956
public void check_int_value_converter() {
50-
var integerValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.INT32, "integer");
57+
var integerValue = new PrimitiveType(REPEATED, INT32, "integer");
5158
listType.add(0, integerValue);
52-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
59+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
5360
simpleGroup.add(0, INT_VALUE);
5461
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
5562

@@ -58,9 +65,9 @@ public void check_int_value_converter() {
5865

5966
@Test
6067
public void check_double_value_converter() {
61-
var doubleValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.DOUBLE, "double");
68+
var doubleValue = new PrimitiveType(REPEATED, DOUBLE, "double");
6269
listType.add(0, doubleValue);
63-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
70+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
6471
simpleGroup.add(0, DOUBLE_VALUE);
6572
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
6673

@@ -69,9 +76,9 @@ public void check_double_value_converter() {
6976

7077
@Test
7178
public void check_long_value_converter() {
72-
var longValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.INT64, "long");
79+
var longValue = new PrimitiveType(REPEATED, INT64, "long");
7380
listType.add(0, longValue);
74-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
81+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
7582
simpleGroup.add(0, LONG_VALUE);
7683
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
7784

@@ -80,9 +87,9 @@ public void check_long_value_converter() {
8087

8188
@Test
8289
public void check_boolean_value_converter() {
83-
var booleanValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.BOOLEAN, "boolean");
90+
var booleanValue = new PrimitiveType(REPEATED, BOOLEAN, "boolean");
8491
listType.add(0, booleanValue);
85-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
92+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
8693
simpleGroup.add(0, BOOLEAN_VALUE);
8794
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
8895

@@ -91,9 +98,9 @@ public void check_boolean_value_converter() {
9198

9299
@Test
93100
public void check_float_value_converter() {
94-
var floatValue = new PrimitiveType(Type.Repetition.REPEATED, PrimitiveType.PrimitiveTypeName.FLOAT, "float");
101+
var floatValue = new PrimitiveType(REPEATED, FLOAT, "float");
95102
listType.add(0, floatValue);
96-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
103+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
97104
simpleGroup.add(0, FLOAT_VALUE);
98105
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
99106

@@ -103,7 +110,7 @@ public void check_float_value_converter() {
103110
@Test
104111
public void check_array_value_converter() {
105112
listType.add(0, generateArray());
106-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
113+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
107114
simpleGroup.add(0, baseArraySimpleGroup);
108115

109116
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
@@ -114,7 +121,7 @@ public void check_array_value_converter() {
114121
@Test
115122
public void check_array_value_converter_when_array_is_empty() {
116123
listType.add(0, generateEmptyArray());
117-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
124+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
118125
simpleGroup.add(0, baseArraySimpleGroup);
119126

120127
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
@@ -125,15 +132,15 @@ public void check_array_value_converter_when_array_is_empty() {
125132
@Test
126133
public void check_value_converter_when_repetition_count_equals_0() {
127134
listType.add(0, generateFieldRepetitionCountEmpty());
128-
simpleGroup = new SimpleGroup(new GroupType(Type.Repetition.REPEATED, "name", listType));
135+
simpleGroup = new SimpleGroup(new GroupType(REPEATED, "name", listType));
129136

130137
var typedStruct = ParquetTypedStructConverter.fromParquetFileReader(simpleGroup);
131138

132139
Assert.assertTrue(typedStruct.get("REPLICATION_EMPTY").isNull());
133140
}
134141

135142
private GroupType generateArray() {
136-
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "element");
143+
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, INT32, "element");
137144

138145
var dataList = new GroupType(Type.Repetition.OPTIONAL, "LIST", elementList);
139146

@@ -147,20 +154,20 @@ private GroupType generateArray() {
147154
baseArraySimpleGroup = new SimpleGroup(schemaGroup);
148155
baseArraySimpleGroup.add(0, dataGroup1);
149156
baseArraySimpleGroup.add(0, dataGroup2);
150-
return ConversionPatterns.listOfElements(Type.Repetition.REPEATED, "LIST", schemaGroup);
157+
return ConversionPatterns.listOfElements(REPEATED, "LIST", schemaGroup);
151158
}
152159

153160
private GroupType generateEmptyArray() {
154-
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveType.PrimitiveTypeName.INT32, "element");
161+
var elementList = new PrimitiveType(Type.Repetition.OPTIONAL, INT32, "element");
155162
var dataList = new GroupType(Type.Repetition.OPTIONAL, "LIST", elementList);
156163

157164
var schemaGroup = new GroupType(Type.Repetition.OPTIONAL, "element", List.of(dataList));
158165
baseArraySimpleGroup = new SimpleGroup(schemaGroup);
159-
return ConversionPatterns.listOfElements(Type.Repetition.REPEATED, "EMPTY_LIST", schemaGroup);
166+
return ConversionPatterns.listOfElements(REPEATED, "EMPTY_LIST", schemaGroup);
160167
}
161168

162169
private GroupType generateFieldRepetitionCountEmpty() {
163170
var schemaGroup = new GroupType(Type.Repetition.OPTIONAL, "element");
164-
return ConversionPatterns.listOfElements(Type.Repetition.REPEATED, "REPLICATION_EMPTY", schemaGroup);
171+
return ConversionPatterns.listOfElements(REPEATED, "REPLICATION_EMPTY", schemaGroup);
165172
}
166173
}

connect-file-pulse-plugin/src/test/java/io/streamthoughts/kafka/connect/filepulse/fs/DefaultFileSystemMonitorTest.java

+18-8
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,30 @@
1515
*/
1616
package io.streamthoughts.kafka.connect.filepulse.fs;
1717

18-
import io.streamthoughts.kafka.connect.filepulse.source.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertNotNull;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
23+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
24+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
25+
import io.streamthoughts.kafka.connect.filepulse.source.LocalFileObjectMeta;
26+
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
1927
import io.streamthoughts.kafka.connect.filepulse.state.InMemoryFileObjectStateBackingStore;
2028
import io.streamthoughts.kafka.connect.filepulse.storage.KafkaStateBackingStore;
2129
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
2230
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
2331
import io.streamthoughts.kafka.connect.filepulse.utils.MockFileCleaner;
2432
import io.streamthoughts.kafka.connect.filepulse.utils.TemporaryFileInput;
33+
import java.io.File;
34+
import java.util.ArrayList;
35+
import java.util.Collection;
36+
import java.util.Collections;
37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
import java.util.Optional;
41+
import java.util.stream.Collectors;
2542
import org.apache.kafka.connect.connector.ConnectorContext;
2643
import org.apache.kafka.connect.source.SourceTaskContext;
2744
import org.junit.Assert;
@@ -32,13 +49,6 @@
3249
import org.junit.rules.TestRule;
3350
import org.mockito.Mockito;
3451

35-
import java.io.File;
36-
import java.util.*;
37-
import java.util.stream.Collectors;
38-
39-
import static org.junit.Assert.assertEquals;
40-
import static org.junit.Assert.assertNotNull;
41-
4252
public class DefaultFileSystemMonitorTest {
4353

4454
private static final SourceOffsetPolicy OFFSET_MANAGER = new SourceOffsetPolicy() {

0 commit comments

Comments
 (0)