Skip to content

Commit ce9f7f4

Browse files
authored
GH-8562: Fix streaming source for remote calls (#8564)
* GH-8562: Fix streaming source for remote calls Fixes #8562 The `AbstractRemoteFileStreamingMessageSource.doReceive()` takes files first from a `toBeReceived` queue. When `AbstractRemoteFileStreamingMessageSource.remoteFileToMessage()` fails to fetch the file content because of interim connection issue, we reset this file from a filter and rethrow an exception. The next `receive()` call will just go ahead to the next entry in the `toBeReceived` queue, but the file we have just failed for will be retried only on the next list call to the remove directory. This essentially breaks a possible in-order target application logic. * Introduce `AbstractRemoteFileStreamingMessageSource.strictOrder` option to clear the `toBeReceived` queue when we fail in the `remoteFileToMessage()`, so the next `receive()` call would re-fetch files from remote dir, because the filter has been reset for those files. * Fix `AbstractFileInfo.toString()` to not perform remote calls when we just log this file. For example, we reset the file for connection failure and log the message about it, but it fails again because we request `size` of the file which may require a remote connection. **Cherry-pick to `6.0.x` & `5.5.x`** * * Revert `AbstractFileInfo` changes * Override `toString()` in `SmbFileInfo` instead - exactly the place where connection is used to obtain file attributes like `size` or `lastModified`
1 parent a09d6dc commit ce9f7f4

File tree

3 files changed

+49
-15
lines changed

3 files changed

+49
-15
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/AbstractRemoteFileStreamingMessageSource.java

+33-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,8 @@ public abstract class AbstractRemoteFileStreamingMessageSource<F>
8686
*/
8787
private FileListFilter<F> filter;
8888

89+
private boolean strictOrder;
90+
8991
protected AbstractRemoteFileStreamingMessageSource(RemoteFileTemplate<? extends F> template,
9092
@Nullable Comparator<? extends F> comparator) {
9193

@@ -133,7 +135,8 @@ protected final void doSetFilter(FileListFilter<F> filterToSet) {
133135
}
134136

135137
/**
136-
* Set to false to add the {@link FileHeaders#REMOTE_FILE_INFO} header to the raw {@link FileInfo}.
138+
* Set to {@code false} to add the {@link FileHeaders#REMOTE_FILE_INFO}
139+
* header to the raw {@link FileInfo}.
137140
* Default is true meaning that common file information properties are provided
138141
* in that header as JSON.
139142
* @param fileInfoJson false to set the raw object.
@@ -143,6 +146,18 @@ public void setFileInfoJson(boolean fileInfoJson) {
143146
this.fileInfoJson = fileInfoJson;
144147
}
145148

149+
/**
150+
* The flag indicating if the local cache has to be fully clear on failure
151+
* to preserve a processing order of remote files on the next {@link #receive()} attempt.
152+
* By default, only the failed file will be re-fetched from remote directory,
153+
* but only when local cache is already empty, essential out of order.
154+
* @param strictOrder if cached files has to be cleared on failure.
155+
* @since 5.5.17
156+
*/
157+
public void setStrictOrder(boolean strictOrder) {
158+
this.strictOrder = strictOrder;
159+
}
160+
146161
protected RemoteFileTemplate<? extends F> getRemoteFileTemplate() {
147162
return this.remoteFileTemplate;
148163
}
@@ -235,9 +250,19 @@ private Object remoteFileToMessage(AbstractFileInfo<F> file) {
235250
throw new UncheckedIOException("IOException when retrieving " + remotePath, e);
236251
}
237252
}
238-
catch (RuntimeException e) {
239-
resetFilterIfNecessary(file);
240-
throw e;
253+
catch (RuntimeException ex) {
254+
if (this.strictOrder) {
255+
// If we could not fetch the file content, then it is fatal.
256+
// Clear local queue to be refreshed on the next 'receive()' call.
257+
List<AbstractFileInfo<F>> filesToReset = new ArrayList<>();
258+
filesToReset.add(file);
259+
this.toBeReceived.drainTo(filesToReset);
260+
filesToReset.forEach(this::resetFilterIfNecessary);
261+
}
262+
else {
263+
resetFilterIfNecessary(file);
264+
}
265+
throw ex;
241266
}
242267
}
243268

@@ -250,8 +275,9 @@ protected AbstractFileInfo<F> poll() {
250275

251276
private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
252277
if (this.filter instanceof ResettableFileListFilter) {
253-
this.logger.info(LogMessage.format("Removing the remote file '%s' from"
254-
+ "the filterfor a subsequent transfer attempt", file));
278+
this.logger.info(
279+
LogMessage.format("Removing the remote file '%s' from the filter for a subsequent transfer attempt",
280+
file.getFilename()));
255281
((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
256282
}
257283
}

spring-integration-file/src/test/java/org/springframework/integration/file/remote/StreamingInboundTests.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -243,6 +243,10 @@ public void testFilterReversedOnBadFetch() {
243243
.isThrownBy(streamer::receive);
244244
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(1);
245245
assertThat(streamer.metadataMap).hasSize(0);
246+
streamer.setStrictOrder(true);
247+
assertThatExceptionOfType(UncheckedIOException.class)
248+
.isThrownBy(streamer::receive);
249+
assertThat(TestUtils.getPropertyValue(streamer, "toBeReceived", BlockingQueue.class)).hasSize(0);
246250
}
247251

248252
public static class Streamer extends AbstractRemoteFileStreamingMessageSource<String> {

spring-integration-smb/src/main/java/org/springframework/integration/smb/session/SmbFileInfo.java

+11-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,8 +28,7 @@
2828
import org.springframework.util.Assert;
2929

3030
/**
31-
* A {@link org.springframework.integration.file.remote.FileInfo} implementation for
32-
* SMB.
31+
* An {@link AbstractFileInfo} implementation for SMB protocol.
3332
*
3433
* @author Gregory Bragg
3534
* @author Artem Bilan
@@ -156,13 +155,18 @@ public String getPermissions() {
156155
return sb.toString();
157156
}
158157

159-
private static String aceToAllowFlag(ACE ace) {
160-
return ace.isAllow() ? "Allow " : "Deny ";
161-
}
162-
163158
@Override
164159
public SmbFile getFileInfo() {
165160
return this.smbFile;
166161
}
167162

163+
@Override
164+
public String toString() {
165+
return "SmbFileInfo{smbFile=" + this.smbFile + '}';
166+
}
167+
168+
private static String aceToAllowFlag(ACE ace) {
169+
return ace.isAllow() ? "Allow " : "Deny ";
170+
}
171+
168172
}

0 commit comments

Comments
 (0)