Skip to content

Commit 89729eb

Browse files
Marky110fhussonnois
authored andcommitted
feat(plugin): add parquet file reader
add support for reading Parquet file #215
1 parent 13fac18 commit 89729eb

File tree

19 files changed

+1192
-24
lines changed

19 files changed

+1192
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2023 StreamThoughts.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
17+
18+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
19+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet.ParquetFileInputIterator;
20+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
21+
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
23+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
24+
import java.net.URI;
25+
26+
/**
27+
* The {@code AliyunOSSParquetFileInputReader} can be used to created records from a parquet file loaded from Aliyun OSS.
28+
*/
29+
public class AliyunOSSParquetFileInputReader extends BaseAliyunOSSInputReader {
30+
31+
@Override
32+
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(final URI objectURI,
33+
final IteratorManager iteratorManager) {
34+
35+
try {
36+
final FileObjectMeta metadata = storage().getObjectMetadata(objectURI);
37+
return new ParquetFileInputIterator(
38+
metadata,
39+
iteratorManager,
40+
storage().getInputStream(objectURI)
41+
);
42+
43+
} catch (Exception e) {
44+
throw new ReaderException("Failed to create ParquetFileInputIterator for: " + objectURI, e);
45+
}
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
17+
18+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
19+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet.ParquetFileInputIterator;
20+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
21+
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
23+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
24+
import java.net.URI;
25+
26+
/**
27+
* The {@code AmazonS3ParquetFileInputReader} can be used to created records from a parquet file loaded from Amazon S3.
28+
*/
29+
public class AmazonS3ParquetFileInputReader extends BaseAmazonS3InputReader {
30+
31+
/**
32+
* {@inheritDoc}
33+
*/
34+
@Override
35+
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(final URI objectURI,
36+
final IteratorManager iteratorManager) {
37+
38+
try {
39+
final FileObjectMeta metadata = storage().getObjectMetadata(objectURI);
40+
return new ParquetFileInputIterator(
41+
metadata,
42+
iteratorManager,
43+
storage().getInputStream(objectURI)
44+
);
45+
} catch (Exception e) {
46+
throw new ReaderException("Failed to create ParquetFileInputIterator for: " + objectURI, e);
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
17+
18+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
19+
import io.streamthoughts.kafka.connect.filepulse.fs.AmazonS3Storage;
20+
import io.streamthoughts.kafka.connect.filepulse.fs.BaseAmazonS3Test;
21+
import io.streamthoughts.kafka.connect.filepulse.fs.S3BucketKey;
22+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
23+
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
24+
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
25+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
26+
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
27+
import java.io.File;
28+
import java.net.URI;
29+
import java.util.ArrayList;
30+
import java.util.List;
31+
import org.junit.Assert;
32+
import org.junit.Before;
33+
import org.junit.Test;
34+
35+
import static org.junit.jupiter.api.Assertions.assertThrows;
36+
import static org.mockito.ArgumentMatchers.any;
37+
import static org.mockito.Mockito.mock;
38+
import static org.mockito.Mockito.when;
39+
40+
public class AmazonS3ParquetInputReaderTest extends BaseAmazonS3Test {
41+
42+
private static final String FILE_NAME = "src/test/resources/test.snappy.parquet";
43+
44+
45+
private File objectFile;
46+
47+
private AmazonS3ParquetFileInputReader reader;
48+
49+
@Before
50+
public void setUp() throws Exception {
51+
super.setUp();
52+
objectFile = new File(FILE_NAME);
53+
reader = new AmazonS3ParquetFileInputReader();
54+
reader.setStorage(new AmazonS3Storage(client));
55+
reader.configure(unmodifiableCommonsProperties);
56+
}
57+
58+
@Override
59+
public void tearDown() throws Exception {
60+
super.tearDown();
61+
reader.close();
62+
}
63+
64+
@Test
65+
public void should_read_all_lines() {
66+
client.createBucket(S3_TEST_BUCKET);
67+
client.putObject(S3_TEST_BUCKET, "my_key", objectFile);
68+
69+
final GenericFileObjectMeta meta = new GenericFileObjectMeta.Builder()
70+
.withUri(new S3BucketKey(S3_TEST_BUCKET, "my_key").toURI())
71+
.build();
72+
final FileInputIterator<FileRecord<TypedStruct>> iterator = reader.newIterator(meta.uri());
73+
List<FileRecord<TypedStruct>> results = new ArrayList<>();
74+
while (iterator.hasNext()) {
75+
final RecordsIterable<FileRecord<TypedStruct>> next = iterator.next();
76+
results.addAll(next.collect());
77+
}
78+
Assert.assertEquals(4, results.size());
79+
}
80+
81+
@Test
82+
public void should_throw_reader_exception() {
83+
try (AmazonS3ParquetFileInputReader reader = mock(AmazonS3ParquetFileInputReader.class)) {
84+
when(reader.newIterator(any())).thenThrow(new ReaderException("exception"));
85+
86+
assertThrows(ReaderException.class, () -> reader.newIterator(new URI("test")));
87+
}
88+
}
89+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2019-2021 StreamThoughts.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.streamthoughts.kafka.connect.filepulse.fs.reader;
17+
18+
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
19+
import io.streamthoughts.kafka.connect.filepulse.fs.reader.parquet.ParquetFileInputIterator;
20+
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
21+
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
23+
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
24+
import java.net.URI;
25+
26+
public class AzureBlobStorageParquetFileInputReader extends AzureBlobStorageInputReader {
27+
28+
@Override
29+
protected FileInputIterator<FileRecord<TypedStruct>> newIterator(URI objectURI, IteratorManager iteratorManager) {
30+
try {
31+
final FileObjectMeta metadata = storage.getObjectMetadata(objectURI);
32+
return new ParquetFileInputIterator(
33+
metadata,
34+
iteratorManager,
35+
storage().getInputStream(objectURI)
36+
);
37+
38+
} catch (Exception e) {
39+
throw new ReaderException("Failed to create ParquetFileInputIterator for: " + objectURI, e);
40+
}
41+
}
42+
}

connect-file-pulse-filesystems/filepulse-commons-fs/pom.xml

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
<?xml version="1.0" encoding="UTF-8"?>
21
<!--
32
Copyright 2023 StreamThoughts
43
@@ -23,7 +22,6 @@ limitations under the License.
2322
<version>2.14.0-SNAPSHOT</version>
2423
</parent>
2524
<modelVersion>4.0.0</modelVersion>
26-
2725
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
2826
<name>Kafka Connect Source File Pulse Common FS</name>
2927

@@ -37,6 +35,14 @@ limitations under the License.
3735
<groupId>org.apache.avro</groupId>
3836
<artifactId>avro</artifactId>
3937
</dependency>
38+
<dependency>
39+
<groupId>org.apache.parquet</groupId>
40+
<artifactId>parquet-hadoop</artifactId>
41+
</dependency>
42+
<dependency>
43+
<groupId>org.apache.hadoop</groupId>
44+
<artifactId>hadoop-common</artifactId>
45+
</dependency>
4046
<dependency>
4147
<groupId>net.sf.saxon</groupId>
4248
<artifactId>Saxon-HE</artifactId>

0 commit comments

Comments
 (0)