Skip to content

Commit ba29da9

Browse files
artembilanspring-builds
authored andcommitted
GH-9909: Fix SftpSession for shared client
Fixes: #9909 Issue link: #9909 The `SftpSession.close()` closes `sftpClient` and its `clientSession` unconditionally. At the same time the `DefaultSftpSessionFactory.isSharedSession` is expected to expose only a single shared client. When this `DefaultSftpSessionFactory` is used concurrently, there is a chance that one thread would close that shared client and another won't be able to interact due to `clientSession` is closed. * Fix `SftpSession` accepting an `isSharedClient` flag and doing nothing in the `close()` if client is shared. * Propagate `isSharedSession` state down to the `SftpSession` from the `DefaultSftpSessionFactory` * Closed shared client and its `clientSession` in the `DefaultSftpSessionFactory.destroy()` (cherry picked from commit 2df9611)
1 parent 63dcc5c commit ba29da9

File tree

3 files changed

+107
-1
lines changed

3 files changed

+107
-1
lines changed

Diff for: spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.io.IOException;
2020
import java.io.InputStream;
21+
import java.io.UncheckedIOException;
2122
import java.security.GeneralSecurityException;
2223
import java.security.KeyPair;
2324
import java.time.Duration;
@@ -316,7 +317,7 @@ public SftpSession getSession() {
316317
sftpClient = createSftpClient(initClientSession(), this.sftpVersionSelector, SftpErrorDataHandler.EMPTY);
317318
freshSftpClient = true;
318319
}
319-
sftpSession = new SftpSession(sftpClient);
320+
sftpSession = new SftpSession(sftpClient, this.isSharedSession);
320321
sftpSession.connect();
321322
if (this.isSharedSession && freshSftpClient) {
322323
this.sharedSftpClient = sftpClient;
@@ -449,6 +450,26 @@ public void destroy() {
449450
if (this.isInnerClient && this.sshClient != null && this.sshClient.isStarted()) {
450451
this.sshClient.stop();
451452
}
453+
454+
SftpClient sharedSftpClientToClose = this.sharedSftpClient;
455+
if (sharedSftpClientToClose != null) {
456+
try {
457+
sharedSftpClientToClose.close();
458+
}
459+
catch (IOException ex) {
460+
throw new UncheckedIOException("failed to close an SFTP client", ex);
461+
}
462+
463+
try {
464+
ClientSession session = sharedSftpClientToClose.getSession();
465+
if (session != null && session.isOpen()) {
466+
session.close();
467+
}
468+
}
469+
catch (IOException ex) {
470+
throw new UncheckedIOException("failed to close an SFTP client (session)", ex);
471+
}
472+
}
452473
}
453474

454475
/**

Diff for: spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java

+18
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,23 @@ public class SftpSession implements Session<SftpClient.DirEntry> {
5555

5656
private final SftpClient sftpClient;
5757

58+
private final boolean isSharedClient;
59+
5860
public SftpSession(SftpClient sftpClient) {
61+
this(sftpClient, false);
62+
}
63+
64+
/**
65+
* Construct an instance based on a {@link SftpClient} and its {@code shared} status.
66+
* When {@code isSharedClient == true}, the {@link #close()} is void.
67+
* @param sftpClient the {@link SftpClient} to use.
68+
* @param isSharedClient whether the {@link SftpClient} is shared.
69+
* @since 6.3.9
70+
*/
71+
public SftpSession(SftpClient sftpClient, boolean isSharedClient) {
5972
Assert.notNull(sftpClient, "'sftpClient' must not be null");
6073
this.sftpClient = sftpClient;
74+
this.isSharedClient = isSharedClient;
6175
}
6276

6377
@Override
@@ -152,6 +166,10 @@ public void append(InputStream inputStream, String destination) throws IOExcepti
152166

153167
@Override
154168
public void close() {
169+
if (this.isSharedClient) {
170+
return;
171+
}
172+
155173
try {
156174
this.sftpClient.close();
157175
}

Diff for: spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java

+67
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
import java.util.ArrayList;
2525
import java.util.Collections;
2626
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicInteger;
2732
import java.util.stream.IntStream;
2833

2934
import org.apache.sshd.client.SshClient;
@@ -37,6 +42,8 @@
3742
import org.apache.sshd.server.SshServer;
3843
import org.apache.sshd.server.keyprovider.SimpleGeneratorHostKeyProvider;
3944
import org.apache.sshd.sftp.client.SftpClient;
45+
import org.apache.sshd.sftp.client.SftpErrorDataHandler;
46+
import org.apache.sshd.sftp.client.SftpVersionSelector;
4047
import org.apache.sshd.sftp.client.impl.AbstractSftpClient;
4148
import org.apache.sshd.sftp.server.SftpSubsystemFactory;
4249
import org.junit.jupiter.api.Test;
@@ -268,4 +275,64 @@ void clientSessionIsClosedOnSessionClose() throws Exception {
268275
}
269276
}
270277

278+
@Test
279+
void sharedSessionConcurrentAccess() throws Exception {
280+
try (SshServer server = SshServer.setUpDefaultServer()) {
281+
server.setPasswordAuthenticator((arg0, arg1, arg2) -> true);
282+
server.setPort(0);
283+
server.setKeyPairProvider(new SimpleGeneratorHostKeyProvider(new File("hostkey.ser").toPath()));
284+
server.setSubsystemFactories(Collections.singletonList(new SftpSubsystemFactory()));
285+
server.start();
286+
287+
AtomicInteger clientInstances = new AtomicInteger();
288+
289+
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory(true) {
290+
291+
@Override
292+
protected SftpClient createSftpClient(ClientSession clientSession,
293+
SftpVersionSelector initialVersionSelector, SftpErrorDataHandler errorDataHandler)
294+
throws IOException {
295+
296+
clientInstances.incrementAndGet();
297+
return super.createSftpClient(clientSession, initialVersionSelector, errorDataHandler);
298+
}
299+
300+
};
301+
sftpSessionFactory.setHost("localhost");
302+
sftpSessionFactory.setPort(server.getPort());
303+
sftpSessionFactory.setUser("user");
304+
sftpSessionFactory.setPassword("pass");
305+
sftpSessionFactory.setAllowUnknownKeys(true);
306+
307+
ExecutorService executorService = Executors.newFixedThreadPool(10);
308+
309+
CountDownLatch executionLatch = new CountDownLatch(20);
310+
List<Exception> errors = Collections.synchronizedList(new ArrayList<>());
311+
312+
for (int i = 0; i < 20; i++) {
313+
executorService.execute(() -> {
314+
try (SftpSession session = sftpSessionFactory.getSession()) {
315+
session.list(".");
316+
}
317+
catch (Exception e) {
318+
errors.add(e);
319+
}
320+
executionLatch.countDown();
321+
});
322+
}
323+
324+
assertThat(executionLatch.await(10, TimeUnit.SECONDS)).isTrue();
325+
synchronized (errors) {
326+
assertThat(errors).isEmpty();
327+
}
328+
329+
assertThat(clientInstances).hasValue(1);
330+
331+
executorService.shutdown();
332+
assertThat(executorService.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
333+
334+
sftpSessionFactory.destroy();
335+
}
336+
}
337+
271338
}

0 commit comments

Comments
 (0)