Skip to content

Commit d04002b

Browse files
committed
spring-projectsGH-8562: Fix streaming source for remote calls
Fixes spring-projects#8562 The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue. When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content because of interim connection issue, we reset this file from a filter and rethrow an exception. The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the file we have just failed for will be retried only on the next list call to the remove directory. This essentially breaks a possible in-order target application logic. * Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue when we fail in the `remoteFileToMessage()`, so the next `receive()` call would re-fetch files from remote dir, because the filter has been reset for those files. * Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file. For example, we reset the file for connection failure and log the message about it, but it fails again because we request `size` of the file which may require a remote connection. **Cherry-pick to `6.0.x` & `5.5.x`**
1 parent a09d6dc commit d04002b

File tree

3 files changed

+43
-15
lines changed

3 files changed

+43
-15
lines changed

Diff for: spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractFileInfo.java

+5-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,6 @@
1616

1717
package org.springframework.integration.file.remote;
1818

19-
import java.util.Date;
20-
2119
import org.springframework.integration.json.SimpleJsonSerializer;
2220

2321
/**
@@ -27,6 +25,7 @@
2725
* @param <F> The target protocol file type.
2826
*
2927
* @author Gary Russell
28+
* @author Artem Bilan
3029
*
3130
* @since 2.1
3231
*/
@@ -57,10 +56,9 @@ public String toJson() {
5756

5857
@Override
5958
public String toString() {
60-
return "FileInfo [isDirectory=" + isDirectory() + ", isLink=" + isLink()
61-
+ ", Size=" + getSize() + ", ModifiedTime="
62-
+ new Date(getModified()) + ", Filename=" + getFilename()
63-
+ ", RemoteDirectory=" + getRemoteDirectory() + ", Permissions=" + getPermissions() + "]";
59+
return "FileInfo [isLink=" + isLink()
60+
+ ", Filename=" + getFilename()
61+
+ ", RemoteDirectory=" + getRemoteDirectory() + "]";
6462
}
6563

6664
}

Diff for: spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

+33-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,8 @@ public abstract class AbstractRemoteFileStreamingMessageSource<F>
8686
*/
8787
private FileListFilter<F> filter;
8888

89+
private boolean strictOrder;
90+
8991
protected AbstractRemoteFileStreamingMessageSource(RemoteFileTemplate<? extends F> template,
9092
@Nullable Comparator<? extends F> comparator) {
9193

@@ -133,7 +135,8 @@ protected final void doSetFilter(FileListFilter<F> filterToSet) {
133135
}
134136

135137
/**
136-
* Set to false to add the {@link FileHeaders#REMOTE_FILE_INFO} header to the raw {@link FileInfo}.
138+
* Set to {@code false} to add the {@link FileHeaders#REMOTE_FILE_INFO}
139+
* header to the raw {@link FileInfo}.
137140
* Default is true meaning that common file information properties are provided
138141
* in that header as JSON.
139142
* @param fileInfoJson false to set the raw object.
@@ -143,6 +146,18 @@ public void setFileInfoJson(boolean fileInfoJson) {
143146
this.fileInfoJson = fileInfoJson;
144147
}
145148

149+
/**
150+
* The flag indicating if the local cache has to be fully clear on failure
151+
* to preserve a processing order of remote files on the next {@link #receive()} attempt.
152+
* By default, only the failed file will be re-fetched from remote directory,
153+
* but only when local cache is already empty, essential out of order.
154+
* @param strictOrder if cached files has to be cleared on failure.
155+
* @since 5.5.17
156+
*/
157+
public void setStrictOrder(boolean strictOrder) {
158+
this.strictOrder = strictOrder;
159+
}
160+
146161
protected RemoteFileTemplate<? extends F> getRemoteFileTemplate() {
147162
return this.remoteFileTemplate;
148163
}
@@ -235,9 +250,19 @@ private Object remoteFileToMessage(AbstractFileInfo<F> file) {
235250
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
236251
}
237252
}
238-
catch (RuntimeException e) {
239-
resetFilterIfNecessary(file);
240-
throw e;
253+
catch (RuntimeException ex) {
254+
if (this.strictOrder) {
255+
// If we could not fetch the file content, then it is fatal.
256+
// Clear local queue to be refreshed on the next 'receive()' call.
257+
List<AbstractFileInfo<F>> filesToReset = new ArrayList<>();
258+
filesToReset.add(file);
259+
this.toBeReceived.drainTo(filesToReset);
260+
filesToReset.forEach(this::resetFilterIfNecessary);
261+
}
262+
else {
263+
resetFilterIfNecessary(file);
264+
}
265+
throw ex;
241266
}
242267
}
243268

@@ -250,8 +275,9 @@ protected AbstractFileInfo<F> poll() {
250275

251276
private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
252277
if (this.filter instanceof ResettableFileListFilter) {
253-
this.logger.info(LogMessage.format("Removing the remote file '%s' from"
254-
+ "the filterfor a subsequent transfer attempt", file));
278+
this.logger.info(
279+
LogMessage.format("Removing the remote file '%s' from the filter for a subsequent transfer attempt",
280+
file.getFilename()));
255281
((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
256282
}
257283
}

Diff for: spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -243,6 +243,10 @@ public void testFilterReversedOnBadFetch() {
243243
.isThrownBy(streamer::receive);
244244
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(1);
245245
assertThat(streamer.metadataMap).hasSize(0);
246+
streamer.setStrictOrder(true);
247+
assertThatExceptionOfType(UncheckedIOException.class)
248+
.isThrownBy(streamer::receive);
249+
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(0);
246250
}
247251

248252
public static class Streamer extends AbstractRemoteFileStreamingMessageSource<String> {

0 commit comments

Comments
 (0)