17
17
package org .springframework .http .codec .multipart ;
18
18
19
19
import java .io .IOException ;
20
- import java .io .UncheckedIOException ;
21
20
import java .nio .channels .Channels ;
22
21
import java .nio .channels .FileChannel ;
23
22
import java .nio .channels .ReadableByteChannel ;
31
30
import java .util .Map ;
32
31
import java .util .Optional ;
33
32
import java .util .concurrent .atomic .AtomicInteger ;
33
+ import java .util .concurrent .atomic .AtomicReference ;
34
34
import java .util .function .Consumer ;
35
35
36
36
import org .synchronoss .cloud .nio .multipart .DefaultPartBodyStreamStorageFactory ;
46
46
import reactor .core .publisher .FluxSink ;
47
47
import reactor .core .publisher .Mono ;
48
48
import reactor .core .publisher .SignalType ;
49
+ import reactor .core .scheduler .Schedulers ;
49
50
50
51
import org .springframework .core .ResolvableType ;
51
52
import org .springframework .core .codec .DecodingException ;
@@ -88,7 +89,7 @@ public class SynchronossPartHttpMessageReader extends LoggingCodecSupport implem
88
89
89
90
private int maxParts = -1 ;
90
91
91
- private Path fileStorageDirectory = createTempDirectory ();
92
+ private final AtomicReference < Path > fileStorageDirectory = new AtomicReference <> ();
92
93
93
94
94
95
/**
@@ -163,7 +164,7 @@ public void setFileStorageDirectory(Path fileStorageDirectory) throws IOExceptio
163
164
if (!Files .exists (fileStorageDirectory )) {
164
165
Files .createDirectory (fileStorageDirectory );
165
166
}
166
- this .fileStorageDirectory = fileStorageDirectory ;
167
+ this .fileStorageDirectory . set ( fileStorageDirectory ) ;
167
168
}
168
169
169
170
@@ -189,29 +190,46 @@ public boolean canRead(ResolvableType elementType, @Nullable MediaType mediaType
189
190
190
191
@ Override
191
192
public Flux <Part > read (ResolvableType elementType , ReactiveHttpInputMessage message , Map <String , Object > hints ) {
192
- return Flux .create (new SynchronossPartGenerator (message , this .fileStorageDirectory ))
193
- .doOnNext (part -> {
194
- if (!Hints .isLoggingSuppressed (hints )) {
195
- LogFormatUtils .traceDebug (logger , traceOn -> Hints .getLogPrefix (hints ) + "Parsed " +
196
- (isEnableLoggingRequestDetails () ?
197
- LogFormatUtils .formatValue (part , !traceOn ) :
198
- "parts '" + part .name () + "' (content masked)" ));
199
- }
200
- });
193
+ return getFileStorageDirectory ().flatMapMany (directory ->
194
+ Flux .create (new SynchronossPartGenerator (message , directory ))
195
+ .doOnNext (part -> {
196
+ if (!Hints .isLoggingSuppressed (hints )) {
197
+ LogFormatUtils .traceDebug (logger , traceOn -> Hints .getLogPrefix (hints ) + "Parsed " +
198
+ (isEnableLoggingRequestDetails () ?
199
+ LogFormatUtils .formatValue (part , !traceOn ) :
200
+ "parts '" + part .name () + "' (content masked)" ));
201
+ }
202
+ }));
201
203
}
202
204
203
205
@ Override
204
206
public Mono <Part > readMono (ResolvableType elementType , ReactiveHttpInputMessage message , Map <String , Object > hints ) {
205
207
return Mono .error (new UnsupportedOperationException ("Cannot read multipart request body into single Part" ));
206
208
}
207
209
208
- private static Path createTempDirectory () {
209
- try {
210
- return Files .createTempDirectory (FILE_STORAGE_DIRECTORY_PREFIX );
211
- }
212
- catch (IOException ex ) {
213
- throw new UncheckedIOException (ex );
214
- }
210
+ private Mono <Path > getFileStorageDirectory () {
211
+ return Mono .defer (() -> {
212
+ Path directory = this .fileStorageDirectory .get ();
213
+ if (directory != null ) {
214
+ return Mono .just (directory );
215
+ }
216
+ else {
217
+ return Mono .fromCallable (() -> {
218
+ Path tempDirectory = Files .createTempDirectory (FILE_STORAGE_DIRECTORY_PREFIX );
219
+ if (this .fileStorageDirectory .compareAndSet (null , tempDirectory )) {
220
+ return tempDirectory ;
221
+ }
222
+ else {
223
+ try {
224
+ Files .delete (tempDirectory );
225
+ }
226
+ catch (IOException ignored ) {
227
+ }
228
+ return this .fileStorageDirectory .get ();
229
+ }
230
+ }).subscribeOn (Schedulers .boundedElastic ());
231
+ }
232
+ });
215
233
}
216
234
217
235
0 commit comments