Skip to content

Commit 0e7a4a1

Browse files
darrylsmithUGAspring-builds
authored andcommitted
GH-9272: Close ClientSession from SftpSession
Fixes: #9272 * close ClientSession when closing SftpSession * fix whitespace issues * stop the SshClient on bean destruction * use convenient assertions (cherry picked from commit a3fb68a)
1 parent 4ead954 commit 0e7a4a1

File tree

10 files changed

+137
-28
lines changed

10 files changed

+137
-28
lines changed

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/DefaultSftpSessionFactory.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@
4949
import org.apache.sshd.sftp.client.impl.AbstractSftpClient;
5050
import org.apache.sshd.sftp.client.impl.DefaultSftpClient;
5151

52+
import org.springframework.beans.factory.DisposableBean;
5253
import org.springframework.core.io.Resource;
5354
import org.springframework.integration.context.IntegrationContextUtils;
5455
import org.springframework.integration.file.remote.session.SessionFactory;
@@ -74,10 +75,11 @@
7475
* @author Auke Zaaiman
7576
* @author Christian Tzolov
7677
* @author Adama Sorho
78+
* @author Darryl Smith
7779
*
7880
* @since 2.0
7981
*/
80-
public class DefaultSftpSessionFactory implements SessionFactory<SftpClient.DirEntry>, SharedSessionCapable {
82+
public class DefaultSftpSessionFactory implements SessionFactory<SftpClient.DirEntry>, SharedSessionCapable, DisposableBean {
8183

8284
private final Lock lock = new ReentrantLock();
8385

@@ -421,6 +423,13 @@ protected SftpClient createSftpClient(
421423
return new ConcurrentSftpClient(clientSession, initialVersionSelector, errorDataHandler);
422424
}
423425

426+
@Override
427+
public void destroy() throws Exception {
428+
if (this.isInnerClient && this.sshClient != null && this.sshClient.isStarted()) {
429+
this.sshClient.stop();
430+
}
431+
}
432+
424433
/**
425434
* The {@link DefaultSftpClient} extension to lock the {@link #send(int, Buffer)}
426435
* for concurrent interaction.

spring-integration-sftp/src/main/java/org/springframework/integration/sftp/session/SftpSession.java

+12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.stream.Stream;
2626
import java.util.stream.StreamSupport;
2727

28+
import org.apache.sshd.client.session.ClientSession;
2829
import org.apache.sshd.common.util.net.SshdSocketAddress;
2930
import org.apache.sshd.sftp.SftpModuleProperties;
3031
import org.apache.sshd.sftp.client.SftpClient;
@@ -47,6 +48,7 @@
4748
* @author Gary Russell
4849
* @author Artem Bilan
4950
* @author Christian Tzolov
51+
* @author Darryl Smith
5052
* @since 2.0
5153
*/
5254
public class SftpSession implements Session<SftpClient.DirEntry> {
@@ -156,6 +158,16 @@ public void close() {
156158
catch (IOException ex) {
157159
throw new UncheckedIOException("failed to close an SFTP client", ex);
158160
}
161+
162+
try {
163+
ClientSession session = this.sftpClient.getSession();
164+
if (session != null && session.isOpen()) {
165+
session.close();
166+
}
167+
}
168+
catch (IOException ex) {
169+
throw new UncheckedIOException("failed to close an SFTP client (session)", ex);
170+
}
159171
}
160172

161173
@Override

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/dsl/SftpTests.java

+22-11
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.junit.jupiter.api.condition.OS;
3232

3333
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
3435
import org.springframework.context.annotation.Configuration;
3536
import org.springframework.integration.IntegrationMessageHeaderAccessor;
3637
import org.springframework.integration.channel.QueueChannel;
@@ -43,6 +44,7 @@
4344
import org.springframework.integration.file.FileHeaders;
4445
import org.springframework.integration.file.remote.RemoteFileTemplate;
4546
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
47+
import org.springframework.integration.file.remote.session.SessionFactory;
4648
import org.springframework.integration.file.support.FileExistsMode;
4749
import org.springframework.integration.sftp.SftpTestSupport;
4850
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
@@ -59,6 +61,7 @@
5961
* @author Gary Russell
6062
* @author Joaquin Santana
6163
* @author Deepak Gunasekaran
64+
* @author Darryl Smith
6265
*
6366
* @since 5.0
6467
*/
@@ -69,11 +72,14 @@ public class SftpTests extends SftpTestSupport {
6972
@Autowired
7073
private IntegrationFlowContext flowContext;
7174

75+
@Autowired
76+
private SessionFactory<SftpClient.DirEntry> sessionFactory;
77+
7278
@Test
7379
public void testSftpInboundFlow() {
7480
QueueChannel out = new QueueChannel();
7581
IntegrationFlow flow = IntegrationFlow
76-
.from(Sftp.inboundAdapter(sessionFactory())
82+
.from(Sftp.inboundAdapter(sessionFactory)
7783
.preserveTimestamp(true)
7884
.remoteDirectory("/sftpSource")
7985
.regexFilter(".*\\.txt$")
@@ -106,7 +112,7 @@ public void testSftpInboundFlow() {
106112
public void testSftpInboundStreamFlow() throws Exception {
107113
QueueChannel out = new QueueChannel();
108114
StandardIntegrationFlow flow = IntegrationFlow.from(
109-
Sftp.inboundStreamingAdapter(new SftpRemoteFileTemplate(sessionFactory()))
115+
Sftp.inboundStreamingAdapter(new SftpRemoteFileTemplate(sessionFactory))
110116
.remoteDirectory("sftpSource")
111117
.regexFilter(".*\\.txt$"),
112118
e -> e.id("sftpInboundAdapter").poller(Pollers.fixedDelay(100)))
@@ -133,7 +139,7 @@ public void testSftpInboundStreamFlow() throws Exception {
133139

134140
@Test
135141
public void testSftpOutboundFlow() {
136-
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(sessionFactory(), FileExistsMode.FAIL)
142+
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(sessionFactory, FileExistsMode.FAIL)
137143
.useTemporaryFileName(false)
138144
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
139145
.remoteDirectory("sftpTarget"));
@@ -143,7 +149,7 @@ public void testSftpOutboundFlow() {
143149
.setHeader(FileHeaders.FILENAME, fileName)
144150
.build());
145151

146-
RemoteFileTemplate<SftpClient.DirEntry> template = new RemoteFileTemplate<>(sessionFactory());
152+
RemoteFileTemplate<SftpClient.DirEntry> template = new RemoteFileTemplate<>(sessionFactory);
147153
SftpClient.DirEntry[] files =
148154
template.execute(session -> session.list(getTargetRemoteDirectory().getName() + "/" + fileName));
149155
assertThat(files.length).isEqualTo(1);
@@ -154,7 +160,7 @@ public void testSftpOutboundFlow() {
154160

155161
@Test
156162
public void testSftpOutboundFlowSftpTemplate() {
157-
SftpRemoteFileTemplate sftpTemplate = new SftpRemoteFileTemplate(sessionFactory());
163+
SftpRemoteFileTemplate sftpTemplate = new SftpRemoteFileTemplate(sessionFactory);
158164
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(sftpTemplate)
159165
.useTemporaryFileName(false)
160166
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
@@ -175,7 +181,7 @@ public void testSftpOutboundFlowSftpTemplate() {
175181

176182
@Test
177183
public void testSftpOutboundFlowSftpTemplateAndMode() {
178-
SftpRemoteFileTemplate sftpTemplate = new SftpRemoteFileTemplate(sessionFactory());
184+
SftpRemoteFileTemplate sftpTemplate = new SftpRemoteFileTemplate(sessionFactory);
179185
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(sftpTemplate, FileExistsMode.APPEND)
180186
.useTemporaryFileName(false)
181187
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
@@ -201,7 +207,7 @@ public void testSftpOutboundFlowSftpTemplateAndMode() {
201207
@Test
202208
@DisabledOnOs(OS.WINDOWS)
203209
public void testSftpOutboundFlowWithChmod() {
204-
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(sessionFactory(), FileExistsMode.FAIL)
210+
IntegrationFlow flow = f -> f.handle(Sftp.outboundAdapter(sessionFactory, FileExistsMode.FAIL)
205211
.useTemporaryFileName(false)
206212
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
207213
.chmod(0644)
@@ -212,7 +218,7 @@ public void testSftpOutboundFlowWithChmod() {
212218
.setHeader(FileHeaders.FILENAME, fileName)
213219
.build());
214220

215-
RemoteFileTemplate<SftpClient.DirEntry> template = new RemoteFileTemplate<>(sessionFactory());
221+
RemoteFileTemplate<SftpClient.DirEntry> template = new RemoteFileTemplate<>(sessionFactory);
216222
SftpClient.DirEntry[] files =
217223
template.execute(session -> session.list(getTargetRemoteDirectory().getName() + "/" + fileName));
218224
assertThat(files.length).isEqualTo(1);
@@ -231,7 +237,7 @@ public void testSftpOutboundFlowWithChmod() {
231237
public void testSftpMgetFlow() {
232238
QueueChannel out = new QueueChannel();
233239
IntegrationFlow flow = f -> f
234-
.handle(Sftp.outboundGateway(sessionFactory(), AbstractRemoteFileOutboundGateway.Command.MGET,
240+
.handle(Sftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.MGET,
235241
"payload")
236242
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
237243
.regexFileNameFilter("(subSftpSource|.*1.txt)")
@@ -260,7 +266,7 @@ public void testSftpMgetFlow() {
260266
public void testSftpSessionCallback() {
261267
QueueChannel out = new QueueChannel();
262268
IntegrationFlow flow = f -> f
263-
.<String>handle((p, h) -> new SftpRemoteFileTemplate(sessionFactory()).execute(s -> s.list(p)))
269+
.<String>handle((p, h) -> new SftpRemoteFileTemplate(sessionFactory).execute(s -> s.list(p)))
264270
.channel(out);
265271
IntegrationFlowRegistration registration = this.flowContext.registration(flow).register();
266272
registration.getInputChannel().send(new GenericMessage<>("sftpSource"));
@@ -279,7 +285,7 @@ public void testSftpSessionCallback() {
279285
public void testSftpMv() {
280286
QueueChannel out = new QueueChannel();
281287
IntegrationFlow flow = f -> f
282-
.handle(Sftp.outboundGateway(sessionFactory(), AbstractRemoteFileOutboundGateway.Command.MV, "payload")
288+
.handle(Sftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.MV, "payload")
283289
.renameExpression("payload.concat('.done')")
284290
.remoteDirectoryExpression("'sftpSource'"))
285291
.channel(out);
@@ -303,6 +309,11 @@ public void testSftpMv() {
303309
@EnableIntegration
304310
public static class ContextConfiguration {
305311

312+
@Bean
313+
public SessionFactory<SftpClient.DirEntry> ftpsessionFactory() {
314+
return SftpTests.sessionFactory();
315+
}
316+
306317
}
307318

308319
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpInboundRemoteFileSystemSynchronizerTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
* @author Gary Russell
6262
* @author Artem Bilan
6363
* @author Joaquin Santana
64+
* @author Darryl Smith
6465
*
6566
* @since 2.0
6667
*/
@@ -160,6 +161,8 @@ public void testCopyFileToLocalDir() throws Exception {
160161
ms.stop();
161162
verify(synchronizer).close();
162163
verify(store).close();
164+
165+
ftpSessionFactory.destroy();
163166
}
164167

165168
public static class TestSftpSessionFactory extends DefaultSftpSessionFactory {

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpMessageSourceTests.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2022 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,12 +16,15 @@
1616

1717
package org.springframework.integration.sftp.inbound;
1818

19+
import org.apache.sshd.sftp.client.SftpClient;
1920
import org.junit.jupiter.api.Test;
2021

2122
import org.springframework.beans.factory.annotation.Autowired;
2223
import org.springframework.context.ApplicationContext;
24+
import org.springframework.context.annotation.Bean;
2325
import org.springframework.context.annotation.Configuration;
2426
import org.springframework.integration.file.FileHeaders;
27+
import org.springframework.integration.file.remote.session.SessionFactory;
2528
import org.springframework.integration.sftp.SftpTestSupport;
2629
import org.springframework.messaging.Message;
2730
import org.springframework.test.annotation.DirtiesContext;
@@ -32,6 +35,7 @@
3235
/**
3336
* @author Gary Russell
3437
* @author Artem bilan
38+
* @author Darryl Smith
3539
*
3640
* @since 5.0.7
3741
*
@@ -43,6 +47,9 @@ public class SftpMessageSourceTests extends SftpTestSupport {
4347
@Autowired
4448
private ApplicationContext context;
4549

50+
@Autowired
51+
private SessionFactory<SftpClient.DirEntry> sessionFactory;
52+
4653
@Test
4754
public void testMaxFetch() {
4855
SftpInboundFileSynchronizingMessageSource messageSource = buildSource();
@@ -53,7 +60,7 @@ public void testMaxFetch() {
5360
}
5461

5562
private SftpInboundFileSynchronizingMessageSource buildSource() {
56-
SftpInboundFileSynchronizer sync = new SftpInboundFileSynchronizer(sessionFactory());
63+
SftpInboundFileSynchronizer sync = new SftpInboundFileSynchronizer(sessionFactory);
5764
sync.setRemoteDirectory("/sftpSource/");
5865
sync.setBeanFactory(this.context);
5966
SftpInboundFileSynchronizingMessageSource messageSource = new SftpInboundFileSynchronizingMessageSource(sync);
@@ -68,6 +75,11 @@ private SftpInboundFileSynchronizingMessageSource buildSource() {
6875
@Configuration
6976
public static class Config {
7077

78+
@Bean
79+
public SessionFactory<SftpClient.DirEntry> ftpSessionFactory() {
80+
return SftpMessageSourceTests.sessionFactory();
81+
}
82+
7183
}
7284

7385
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/outbound/SftpOutboundTests.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
* @author Gary Russell
7777
* @author Gunnar Hillert
7878
* @author Artem Bilan
79+
* @author Darryl Smith
7980
*/
8081
public class SftpOutboundTests {
8182

@@ -84,7 +85,7 @@ public void testHandleFileMessage() throws Exception {
8485
File targetDir = new File("remote-target-dir");
8586
assertThat(targetDir.exists()).as("target directory does not exist: " + targetDir.getName()).isTrue();
8687

87-
SessionFactory<SftpClient.DirEntry> sessionFactory = new TestSftpSessionFactory();
88+
TestSftpSessionFactory sessionFactory = new TestSftpSessionFactory();
8889
FileTransferringMessageHandler<SftpClient.DirEntry> handler =
8990
new FileTransferringMessageHandler<>(sessionFactory);
9091
handler.setRemoteDirectoryExpression(new LiteralExpression(targetDir.getName()));
@@ -103,6 +104,8 @@ public void testHandleFileMessage() throws Exception {
103104

104105
handler.handleMessage(new GenericMessage<>(srcFile));
105106
assertThat(destFile.exists()).as("destination file was not created").isTrue();
107+
108+
sessionFactory.destroy();
106109
}
107110

108111
@Test
@@ -111,7 +114,7 @@ public void testHandleStringMessage() throws Exception {
111114
if (file.exists()) {
112115
file.delete();
113116
}
114-
SessionFactory<SftpClient.DirEntry> sessionFactory = new TestSftpSessionFactory();
117+
TestSftpSessionFactory sessionFactory = new TestSftpSessionFactory();
115118
FileTransferringMessageHandler<SftpClient.DirEntry> handler =
116119
new FileTransferringMessageHandler<>(sessionFactory);
117120
DefaultFileNameGenerator fGenerator = new DefaultFileNameGenerator();
@@ -127,6 +130,8 @@ public void testHandleStringMessage() throws Exception {
127130
byte[] inFile = FileCopyUtils.copyToByteArray(file);
128131
assertThat(new String(inFile)).isEqualTo("String data");
129132
file.delete();
133+
134+
sessionFactory.destroy();
130135
}
131136

132137
@Test
@@ -135,7 +140,7 @@ public void testHandleBytesMessage() throws Exception {
135140
if (file.exists()) {
136141
file.delete();
137142
}
138-
SessionFactory<SftpClient.DirEntry> sessionFactory = new TestSftpSessionFactory();
143+
TestSftpSessionFactory sessionFactory = new TestSftpSessionFactory();
139144
FileTransferringMessageHandler<SftpClient.DirEntry> handler =
140145
new FileTransferringMessageHandler<>(sessionFactory);
141146
DefaultFileNameGenerator fGenerator = new DefaultFileNameGenerator();
@@ -151,6 +156,8 @@ public void testHandleBytesMessage() throws Exception {
151156
byte[] inFile = FileCopyUtils.copyToByteArray(file);
152157
assertThat(new String(inFile)).isEqualTo("byte[] data");
153158
file.delete();
159+
160+
sessionFactory.destroy();
154161
}
155162

156163
@Test //INT-2275
@@ -225,7 +232,7 @@ public void testMkDir() throws Exception {
225232

226233
@ParameterizedTest
227234
@ValueSource(booleans = {true, false})
228-
public void testSharedSession(boolean sharedSession) throws IOException {
235+
public void testSharedSession(boolean sharedSession) throws Exception {
229236
try (SshServer server = SshServer.setUpDefaultServer()) {
230237
server.setPasswordAuthenticator((arg0, arg1, arg2) -> true);
231238
server.setPort(0);
@@ -253,6 +260,8 @@ public void testSharedSession(boolean sharedSession) throws IOException {
253260
assertThat(TestUtils.getPropertyValue(s2, "sftpClient"))
254261
.isNotSameAs(TestUtils.getPropertyValue(s1, "sftpClient"));
255262
}
263+
264+
f.destroy();
256265
}
257266
}
258267

0 commit comments

Comments
 (0)