Skip to content

Commit d8e8df8

Browse files
TheWorkshopComfhussonnois
TheWorkshopCom
authored andcommitted
feat(plugin): add SFTP Filesystem
1 parent e2bf74f commit d8e8df8

25 files changed

+3055
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2019-2021 StreamThoughts.
4+
~
5+
~ Licensed to the Apache Software Foundation (ASF) under one or more
6+
~ contributor license agreements. See the NOTICE file distributed with
7+
~ this work for additional information regarding copyright ownership.
8+
~ The ASF licenses this file to You under the Apache License, Version 2.0
9+
~ (the "License"); you may not use this file except in compliance with
10+
~ the License. You may obtain a copy of the License at
11+
~
12+
~ http://www.apache.org/licenses/LICENSE-2.0
13+
~
14+
~ Unless required by applicable law or agreed to in writing, software
15+
~ distributed under the License is distributed on an "AS IS" BASIS,
16+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
~ See the License for the specific language governing permissions and
18+
~ limitations under the License.
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<groupId>io.streamthoughts</groupId>
25+
<artifactId>kafka-connect-filepulse-filesystems</artifactId>
26+
<version>2.13.0-SNAPSHOT</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
30+
<name>Kafka Connect Source File Pulse SFTP FS</name>
31+
<artifactId>kafka-connect-filepulse-sftp-fs</artifactId>
32+
33+
<properties>
34+
<checkstyle.config.location>${project.parent.basedir}/..</checkstyle.config.location>
35+
<license.header.file>${project.parent.basedir}/../license-header</license.header.file>
36+
<jsch.version>0.1.55</jsch.version>
37+
<mockito-junit-jupiter.version>2.28.2</mockito-junit-jupiter.version>
38+
<assertj-core.version>3.11.1</assertj-core.version>
39+
<lombok.version>1.18.12</lombok.version>
40+
</properties>
41+
42+
<dependencies>
43+
<dependency>
44+
<groupId>com.jcraft</groupId>
45+
<artifactId>jsch</artifactId>
46+
<version>${jsch.version}</version>
47+
</dependency>
48+
<dependency>
49+
<groupId>io.streamthoughts</groupId>
50+
<artifactId>kafka-connect-filepulse-commons-fs</artifactId>
51+
<version>${project.version}</version>
52+
</dependency>
53+
<dependency>
54+
<groupId>org.apache.commons</groupId>
55+
<artifactId>commons-compress</artifactId>
56+
</dependency>
57+
<dependency>
58+
<groupId>org.apache.avro</groupId>
59+
<artifactId>avro</artifactId>
60+
</dependency>
61+
<!-- Test Dependencies-->
62+
<dependency>
63+
<groupId>org.mockito</groupId>
64+
<artifactId>mockito-junit-jupiter</artifactId>
65+
<version>${mockito-junit-jupiter.version}</version>
66+
<scope>test</scope>
67+
</dependency>
68+
<dependency>
69+
<groupId>org.assertj</groupId>
70+
<artifactId>assertj-core</artifactId>
71+
<version>${assertj-core.version}</version>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.projectlombok</groupId>
76+
<artifactId>lombok</artifactId>
77+
<version>${lombok.version}</version>
78+
<scope>test</scope>
79+
</dependency>
80+
<dependency>
81+
<groupId>io.streamthoughts</groupId>
82+
<artifactId>kafka-connect-filepulse-plugin</artifactId>
83+
<version>${project.version}</version>
84+
<scope>test</scope>
85+
</dependency>
86+
</dependencies>
87+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2019-2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs;
20+
21+
import com.jcraft.jsch.SftpATTRS;
22+
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
23+
import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
24+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
25+
import java.io.InputStream;
26+
import java.net.URI;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public class SftpFileStorage implements Storage {
31+
private static final Logger log = LoggerFactory.getLogger(SftpFileStorage.class);
32+
private static final String CANNOT_STAT_FILE_ERROR_MSG_TEMPLATE = "Cannot stat file with uri: %s";
33+
34+
private final SftpClient sftpClient;
35+
36+
public SftpFileStorage(SftpFilesystemListingConfig config) {
37+
this.sftpClient = new SftpClient(config);
38+
}
39+
40+
SftpFileStorage(SftpClient sftpClient) {
41+
this.sftpClient = sftpClient;
42+
}
43+
44+
@Override
45+
public FileObjectMeta getObjectMetadata(URI uri) {
46+
log.debug("Getting object metadata for '{}'", uri);
47+
return sftpClient.getObjectMetadata(uri)
48+
.findFirst().orElseThrow(() -> new ConnectFilePulseException(buildCannotStatFileErrorMsg(uri)));
49+
}
50+
51+
@Override
52+
public boolean exists(URI uri) {
53+
log.info("Checking if '{}' exists", uri);
54+
SftpATTRS attrs = sftpClient.statFile(uri.toString());
55+
56+
return attrs.isReg();
57+
}
58+
59+
private String buildCannotStatFileErrorMsg(URI uri) {
60+
return String.format(CANNOT_STAT_FILE_ERROR_MSG_TEMPLATE, uri);
61+
}
62+
63+
/**
64+
* {@inheritDoc}
65+
*/
66+
@Override
67+
public boolean delete(URI uri) {
68+
return sftpClient.delete(uri);
69+
}
70+
71+
@Override
72+
public boolean move(URI source, URI dest) {
73+
return sftpClient.move(source, dest);
74+
}
75+
76+
@Override
77+
public InputStream getInputStream(URI uri) {
78+
return sftpClient.sftpFileInputStream(uri);
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2019-2023 StreamThoughts.
3+
*
4+
* Licensed to the Apache Software Foundation (ASF) under one or more
5+
* contributor license agreements. See the NOTICE file distributed with
6+
* this work for additional information regarding copyright ownership.
7+
* The ASF licenses this file to You under the Apache License, Version 2.0
8+
* (the "License"); you may not use this file except in compliance with
9+
* the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package io.streamthoughts.kafka.connect.filepulse.fs;
20+
21+
import io.streamthoughts.kafka.connect.filepulse.fs.client.SftpClient;
22+
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
23+
import java.util.Collection;
24+
import java.util.Collections;
25+
import java.util.List;
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.stream.Collectors;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
31+
32+
public class SftpFilesystemListing implements FileSystemListing<SftpFileStorage> {
33+
34+
private static final Logger log = LoggerFactory.getLogger(SftpFilesystemListing.class);
35+
private FileListFilter filter;
36+
37+
private SftpFilesystemListingConfig config;
38+
39+
private SftpClient sftpClient;
40+
41+
public SftpFilesystemListing(final List<FileListFilter> filters) {
42+
Objects.requireNonNull(filters, "filters can't be null");
43+
this.filter = new CompositeFileListFilter(filters);
44+
}
45+
46+
@SuppressWarnings("unused")
47+
public SftpFilesystemListing() {
48+
this(Collections.emptyList());
49+
}
50+
51+
@Override
52+
public void configure(final Map<String, ?> configs) {
53+
log.debug("Configuring SftpFilesystemListing");
54+
config = new SftpFilesystemListingConfig(configs);
55+
sftpClient = new SftpClient(config);
56+
}
57+
58+
@Override
59+
public Collection<FileObjectMeta> listObjects() {
60+
String listingDirectoryPath = getConfig().getSftpListingDirectoryPath();
61+
62+
List<FileObjectMeta> filesMetadata = getSftpClient().listFiles(listingDirectoryPath)
63+
.collect(Collectors.toList());
64+
65+
return filter.filterFiles(filesMetadata);
66+
}
67+
68+
@Override
69+
public void setFilter(FileListFilter filter) {
70+
this.filter = filter;
71+
}
72+
73+
@Override
74+
public SftpFileStorage storage() {
75+
return new SftpFileStorage(config);
76+
}
77+
78+
SftpClient getSftpClient() {
79+
return sftpClient;
80+
}
81+
82+
SftpFilesystemListingConfig getConfig() {
83+
return config;
84+
}
85+
}

0 commit comments

Comments
 (0)