Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow AbstractRemoteFileStreamingMessageSource#remoteFileToMessage Retry #8562

Closed
jayChrono opened this issue Feb 26, 2023 · 6 comments · Fixed by #8564
Closed

Allow AbstractRemoteFileStreamingMessageSource#remoteFileToMessage Retry #8562

jayChrono opened this issue Feb 26, 2023 · 6 comments · Fixed by #8564

Comments

@jayChrono
Copy link

jayChrono commented Feb 26, 2023

Expected Behavior
When using SmbStreamingMessageSource, in the doReceive() function, should be able to retry remoteFileToMessage() call with a RetryTemplate that can be set via SmbStreamingInboundChannelAdapterSpec.

Current Behavior
AbstractRemoteFileStreamingMessageSource#remoteFileToMessage is only called once. Network errors can cause the call to fail when resolving SmbFileInfo and drop the file from being processed.

Context
I wrote an integration flow that streams/processes zip files from a shared folder in a defined order. Occasionally, the process hits a network error when resolving the SmbFileInfo for the remote file and gets dropped from processing. Handling the messageException in the errorChannel will cause the zip files to be processed out of order.

Could not simply extend SmbStreamingMessageSource and add a RetryTemplate because the class properties used by the doReceive() function are private. I had to copy/paste AbstractRemoteFileStreamingMessageSource into my project, add a RetryTemplate and modify the call to remoteFileToMessage (line 214).

return remoteFileToMessage(file);

->

var processFile = file;
return retry.execute(ctx -> {
if (ctx.getRetryCount() > 0)
logger.info(ctx::toString);
return remoteFileToMessage(processFile);
});

@jayChrono jayChrono added status: waiting-for-triage The issue need to be evaluated and its future decided type: enhancement labels Feb 26, 2023
@artembilan
Copy link
Member

Can we see, please, a stack trace for the error to determine what is going on and understand why the next poll does not take that file into account any more.

You also can look into a RetryInterceptorBuilder to provide a a MethodInterceptor for the adviceChain() of the PollerSpec: https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#conditional-pollers.

I don't think it is a MessageSource responsibility to do that retry...

@artembilan artembilan added status: waiting-for-reporter Needs a feedback from the reporter and removed status: waiting-for-triage The issue need to be evaluated and its future decided labels Feb 26, 2023
@jayChrono
Copy link
Author

jayChrono commented Feb 26, 2023

From the source code, the smbFiles to process is called once and stored in the toBeReceived queue (listFiles() function). The poll() function takes from that queue and only does a 'real' poll of files to receive when toBeReceived is empty. So eventually, the skipped file will get processed again only after the current queue is emptied.

I'll look into MethodInterceptor but would that not run into the same out of order issues that would happen with the errorChannel? Also, as described before, actual polling of file list only happens when the toBeReceived queue is emptied. I doubt retrying via the interceptor would replay the dropped file.

2023-02-25T21:41:06.569-05:00 ERROR 28924 --- [scheduling-1] o.s.integration.smb.session.SmbFileInfo : Unable to determine if this SmbFile represents a directory

jcifs.smb.SmbException: Failed to connect: somesite.com/x.x.x.x
	at jcifs.smb.SmbTransportImpl.ensureConnected(SmbTransportImpl.java:689)
	at jcifs.smb.SmbTransportPoolImpl.getSmbTransport(SmbTransportPoolImpl.java:220)
	at jcifs.smb.SmbTransportPoolImpl.getSmbTransport(SmbTransportPoolImpl.java:48)
	at jcifs.smb.SmbTreeConnection.connectHost(SmbTreeConnection.java:565)
	at jcifs.smb.SmbTreeConnection.connectHost(SmbTreeConnection.java:489)
	at jcifs.smb.SmbTreeConnection.connect(SmbTreeConnection.java:465)
	at jcifs.smb.SmbTreeConnection.connectWrapException(SmbTreeConnection.java:426)
	at jcifs.smb.SmbFile.ensureTreeConnected(SmbFile.java:310)
	at jcifs.smb.SmbFile.exists(SmbFile.java:610)
	at jcifs.smb.SmbFile.isDirectory(SmbFile.java:828)
	at org.springframework.integration.smb.session.SmbFileInfo.isDirectory(SmbFileInfo.java:53)
	at org.springframework.integration.file.remote.AbstractFileInfo.toString(AbstractFileInfo.java:60)
	at java.base/java.util.Formatter$FormatSpecifier.printString(Formatter.java:3056)
	at java.base/java.util.Formatter$FormatSpecifier.print(Formatter.java:2933)
	at java.base/java.util.Formatter.format(Formatter.java:2689)
	at java.base/java.util.Formatter.format(Formatter.java:2625)
	at java.base/java.lang.String.format(String.java:4140)
	at org.springframework.core.log.LogMessage$FormatMessage1.buildString(LogMessage.java:182)
	at org.springframework.core.log.LogMessage.toString(LogMessage.java:70)
	at java.base/java.lang.String.valueOf(String.java:4215)
	at org.apache.commons.logging.impl.SLF4JLocationAwareLog.info(SLF4JLocationAwareLog.java:164)
	at org.springframework.core.log.LogAccessor.info(LogAccessor.java:174)
	at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.resetFilterIfNecessary(AbstractRemoteFileStreamingMessageSource.java:253)
	at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:239)
	at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:214)
	at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
	at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:443)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: jcifs.util.transport.TransportException: java.net.ConnectException: Connection timed out: no further information
	at jcifs.util.transport.Transport.run(Transport.java:769)
	... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: no further information
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:549)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
	at java.base/java.net.Socket.connect(Socket.java:633)
	at jcifs.smb.SmbTransportImpl.negotiate(SmbTransportImpl.java:494)
	at jcifs.smb.SmbTransportImpl.doConnect(SmbTransportImpl.java:706)
	at jcifs.util.transport.Transport.run(Transport.java:742)
	... 1 common frames omitted

2023-02-25T21:41:27.664-05:00 ERROR 28924 --- [scheduling-1] o.s.integration.smb.session.SmbFileInfo  : Unable to determine file size

jcifs.smb.SmbException: Failed to connect: somesite.com/x.x.x.x
	at jcifs.smb.SmbTransportImpl.ensureConnected(SmbTransportImpl.java:689)
	at jcifs.smb.SmbTransportPoolImpl.getSmbTransport(SmbTransportPoolImpl.java:220)
	at jcifs.smb.SmbTransportPoolImpl.getSmbTransport(SmbTransportPoolImpl.java:48)
	at jcifs.smb.SmbTreeConnection.connectHost(SmbTreeConnection.java:565)
	at jcifs.smb.SmbTreeConnection.connectHost(SmbTreeConnection.java:489)
	at jcifs.smb.SmbTreeConnection.connect(SmbTreeConnection.java:465)
	at jcifs.smb.SmbTreeConnection.connectWrapException(SmbTreeConnection.java:426)
	at jcifs.smb.SmbFile.ensureTreeConnected(SmbFile.java:310)
	at jcifs.smb.SmbFile.length(SmbFile.java:1307)
	at org.springframework.integration.smb.session.SmbFileInfo.getSize(SmbFileInfo.java:74)
	at org.springframework.integration.file.remote.AbstractFileInfo.toString(AbstractFileInfo.java:61)
	at java.base/java.util.Formatter$FormatSpecifier.printString(Formatter.java:3056)
	at java.base/java.util.Formatter$FormatSpecifier.print(Formatter.java:2933)
	at java.base/java.util.Formatter.format(Formatter.java:2689)
	at java.base/java.util.Formatter.format(Formatter.java:2625)
	at java.base/java.lang.String.format(String.java:4140)
	at org.springframework.core.log.LogMessage$FormatMessage1.buildString(LogMessage.java:182)
	at org.springframework.core.log.LogMessage.toString(LogMessage.java:70)
	at java.base/java.lang.String.valueOf(String.java:4215)
	at org.apache.commons.logging.impl.SLF4JLocationAwareLog.info(SLF4JLocationAwareLog.java:164)
	at org.springframework.core.log.LogAccessor.info(LogAccessor.java:174)
	at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.resetFilterIfNecessary(AbstractRemoteFileStreamingMessageSource.java:253)
	at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.remoteFileToMessage(AbstractRemoteFileStreamingMessageSource.java:239)
	at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:214)
	at org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:47)
	at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142)
	at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:443)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
	at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
	at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
	at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:341)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: jcifs.util.transport.TransportException: java.net.ConnectException: Connection timed out: no further information
	at jcifs.util.transport.Transport.run(Transport.java:769)
	... 1 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: no further information
	at java.base/sun.nio.ch.Net.pollConnect(Native Method)
	at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:549)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
	at java.base/java.net.Socket.connect(Socket.java:633)
	at jcifs.smb.SmbTransportImpl.negotiate(SmbTransportImpl.java:494)
	at jcifs.smb.SmbTransportImpl.doConnect(SmbTransportImpl.java:706)
	at jcifs.util.transport.Transport.run(Transport.java:742)
	... 1 common frames omitted

2023-02-25T21:41:49.155-05:00  INFO 28924 --- [scheduling-1] o.s.i.s.i.SmbStreamingMessageSource      : Removing the remote file 'FileInfo [isDirectory=false, isLink=false, Size=0, ModifiedTime=Wed Dec 31 19:00:00 EST 1969, Filename=TORDOO_20230213_D_S00049.ZIP, RemoteDirectory=remoteFolder, Permissions=Administrators - Allow Read, Allow Write, Allow Modify, Allow Execute, Allow Delete, Inherited - This folder only
domain1\group1_FC - Allow Read, Allow Write, Allow Modify, Allow Execute, Allow Delete, Inherited - This folder only
domain1\group1_RW - Allow Read, Allow Write, Allow Modify, Allow Execute, Allow Delete, Inherited - This folder only
domain1\group1_RO - Allow Read, Allow Execute, Inherited - This folder only
domain2\group2-RW - Allow Read, Allow Write, Allow Modify, Allow Execute, Allow Delete, Inherited - This folder only
domain2\group2-RO - Allow Read, Allow Execute, Inherited - This folder only
]' fromthe filterfor a subsequent transfer attempt

@jayChrono
Copy link
Author

Just tried MethodInterceptor in the advice() chain and as expected, the file was skipped because subsequent poll() calls just retrieves the remoteFile from the toBeReceived queue.

@artembilan
Copy link
Member

OK. Let's see if I understood this correctly.
When we poll a source we fetch file file references into a toBeReceived internal queue.
Then we poll an item from that queue and fail on the remoteFileToMessage() throwing an exception up essentially skipping this file.
Even if we have a retry advice on a source polling channel adapter, we are going to the next entry in the toBeReceived.

Apparently we have to treat this remoteFileToMessage() failure as fatal and clear toBeReceived, so the next doReceive() call would renew the state of the component and would fetch files from remote store again and probably in the same order.
Fortunately, we already reset filters for failed files:

		catch (RuntimeException e) {
			resetFilterIfNecessary(file);
			throw e;
		}

If that fits to your expectation, we simply can go ahead with the fix I've just suggested.

@jayChrono
Copy link
Author

Your understanding is correct.

Clearing toBeReceived is one option similar to simply reinserting the file back into to the front of the queue. However, there is a possibility where the file could be in a continuous error loop never allowing the rest of the queue to proceed. I would prefer a more flexible/customizable way of handling that - ie RetryTemplate.

@artembilan
Copy link
Member

But isn't that what retry advice on a source polling channel adapter would do for us in the end?
I think the most of error we got at that remoteFileToMessage() is really about connection to target system.
So, no matter what file we deal with it is still going to fail same way.

I see, though, the problem in your stack trace around at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.resetFilterIfNecessary(AbstractRemoteFileStreamingMessageSource.java:253).
This is really a bug and need to be fixed the way that we don't do any connection when we log such a message for remote file.
Plus I've already suggested to clean up the toBeReceived queue for subsequent attempt from a fresh page.

@artembilan artembilan added status: discussion-required Needs a discussion with the team and removed status: waiting-for-reporter Needs a feedback from the reporter labels Feb 27, 2023
@artembilan artembilan added this to the 6.1.0-M2 milestone Feb 28, 2023
@artembilan artembilan added in: file backport 5.5.x and removed status: discussion-required Needs a discussion with the team labels Feb 28, 2023
artembilan added a commit to artembilan/spring-integration that referenced this issue Feb 28, 2023
Fixes spring-projects#8562

The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue.
When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content
because of interim connection issue, we reset this file from a filter and rethrow an exception.
The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the
file we have just failed for will be retried only on the next list call to the remove directory.
This essentially breaks a possible in-order target application logic.

* Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue
 when we fail in the `remoteFileToMessage()`, so the next `receive()`
 call would re-fetch files from remote dir, because the filter has been reset for those files.
* Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file.
For example, we reset the file for connection failure and log the message about it,
but it fails again because we request `size` of the file which may require a remote connection.

**Cherry-pick to `6.0.x` & `5.5.x`**
garyrussell pushed a commit that referenced this issue Feb 28, 2023
* GH-8562: Fix streaming source for remote calls

Fixes #8562

The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue.
When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content
because of interim connection issue, we reset this file from a filter and rethrow an exception.
The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the
file we have just failed for will be retried only on the next list call to the remove directory.
This essentially breaks a possible in-order target application logic.

* Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue
 when we fail in the `remoteFileToMessage()`, so the next `receive()`
 call would re-fetch files from remote dir, because the filter has been reset for those files.
* Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file.
For example, we reset the file for connection failure and log the message about it,
but it fails again because we request `size` of the file which may require a remote connection.

**Cherry-pick to `6.0.x` & `5.5.x`**

* * Revert `AbstractFileInfo` changes
* Override `toString()` in `SmbFileInfo` instead -
exactly the place where connection is used to obtain
file attributes like `size` or `lastModified`
garyrussell pushed a commit that referenced this issue Feb 28, 2023
* GH-8562: Fix streaming source for remote calls

Fixes #8562

The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue.
When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content
because of interim connection issue, we reset this file from a filter and rethrow an exception.
The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the
file we have just failed for will be retried only on the next list call to the remove directory.
This essentially breaks a possible in-order target application logic.

* Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue
 when we fail in the `remoteFileToMessage()`, so the next `receive()`
 call would re-fetch files from remote dir, because the filter has been reset for those files.
* Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file.
For example, we reset the file for connection failure and log the message about it,
but it fails again because we request `size` of the file which may require a remote connection.

**Cherry-pick to `6.0.x` & `5.5.x`**

* * Revert `AbstractFileInfo` changes
* Override `toString()` in `SmbFileInfo` instead -
exactly the place where connection is used to obtain
file attributes like `size` or `lastModified`
garyrussell pushed a commit that referenced this issue Feb 28, 2023
* GH-8562: Fix streaming source for remote calls

Fixes #8562

The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue.
When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content
because of interim connection issue, we reset this file from a filter and rethrow an exception.
The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the
file we have just failed for will be retried only on the next list call to the remove directory.
This essentially breaks a possible in-order target application logic.

* Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue
 when we fail in the `remoteFileToMessage()`, so the next `receive()`
 call would re-fetch files from remote dir, because the filter has been reset for those files.
* Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file.
For example, we reset the file for connection failure and log the message about it,
but it fails again because we request `size` of the file which may require a remote connection.

**Cherry-pick to `6.0.x` & `5.5.x`**

* * Revert `AbstractFileInfo` changes
* Override `toString()` in `SmbFileInfo` instead -
exactly the place where connection is used to obtain
file attributes like `size` or `lastModified`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants