22
22
import java .util .concurrent .CountDownLatch ;
23
23
import java .util .concurrent .TimeUnit ;
24
24
import java .util .concurrent .atomic .AtomicBoolean ;
25
+
25
26
import org .reactivestreams .Subscriber ;
27
+ import org .reactivestreams .Subscription ;
26
28
import software .amazon .awssdk .annotations .SdkPublicApi ;
27
29
import software .amazon .awssdk .core .exception .NonRetryableException ;
30
+ import software .amazon .awssdk .core .internal .async .SplittingPublisher ;
28
31
import software .amazon .awssdk .core .internal .io .SdkLengthAwareInputStream ;
29
32
import software .amazon .awssdk .core .internal .util .NoopSubscription ;
33
+ import software .amazon .awssdk .utils .async .DelegatingSubscriber ;
30
34
import software .amazon .awssdk .utils .async .InputStreamConsumingPublisher ;
31
35
32
36
/**
@@ -44,17 +48,17 @@ public final class BlockingInputStreamAsyncRequestBody implements AsyncRequestBo
44
48
private final Duration subscribeTimeout ;
45
49
46
50
BlockingInputStreamAsyncRequestBody (Long contentLength ) {
47
- this (contentLength , Duration .ofSeconds (10 ));
51
+ this (contentLength , Duration .ofSeconds (10 ));
48
52
}
49
53
50
54
BlockingInputStreamAsyncRequestBody (Long contentLength , Duration subscribeTimeout ) {
51
- this .contentLength = contentLength ;
52
- this .subscribeTimeout = subscribeTimeout ;
55
+ this .contentLength = contentLength ;
56
+ this .subscribeTimeout = subscribeTimeout ;
53
57
}
54
58
55
59
@ Override
56
60
public Optional <Long > contentLength () {
57
- return Optional .ofNullable (contentLength );
61
+ return Optional .ofNullable (contentLength );
58
62
}
59
63
60
64
/**
@@ -70,46 +74,92 @@ public Optional<Long> contentLength() {
70
74
* failed).
71
75
*/
72
76
public long writeInputStream (InputStream inputStream ) {
73
- try {
74
- waitForSubscriptionIfNeeded ();
75
- if (contentLength != null ) {
76
- return delegate .doBlockingWrite (new SdkLengthAwareInputStream (inputStream , contentLength ));
77
- }
78
-
79
- return delegate .doBlockingWrite (inputStream );
80
- } catch (InterruptedException e ) {
81
- Thread .currentThread ().interrupt ();
82
- delegate .cancel ();
83
- throw new RuntimeException (e );
84
- }
77
+ try {
78
+ waitForSubscriptionIfNeeded ();
79
+ if (contentLength != null ) {
80
+ return delegate .doBlockingWrite (new SdkLengthAwareInputStream (inputStream , contentLength ));
81
+ }
82
+
83
+ return delegate .doBlockingWrite (inputStream );
84
+ } catch (InterruptedException e ) {
85
+ Thread .currentThread ().interrupt ();
86
+ delegate .cancel ();
87
+ throw new RuntimeException (e );
88
+ }
85
89
}
86
90
87
91
/**
88
92
* Cancel any running write (and mark the stream as failed).
89
93
*/
90
94
public void cancel () {
91
- delegate .cancel ();
95
+ delegate .cancel ();
92
96
}
93
97
94
98
@ Override
95
99
public void subscribe (Subscriber <? super ByteBuffer > s ) {
96
- if (subscribeCalled .compareAndSet (false , true )) {
97
- delegate .subscribe (s );
98
- subscribedLatch .countDown ();
99
- } else {
100
- s .onSubscribe (new NoopSubscription (s ));
101
- s .onError (NonRetryableException .create ("A retry was attempted, but AsyncRequestBody.forBlockingInputStream does not "
102
- + "support retries. Consider using AsyncRequestBody.fromInputStream with an "
103
- + "input stream that supports mark/reset to get retry support." ));
104
- }
100
+ if (subscribeCalled .compareAndSet (false , true )) {
101
+ delegate .subscribe (s );
102
+ subscribedLatch .countDown ();
103
+ } else {
104
+ s .onSubscribe (new NoopSubscription (s ));
105
+ s .onError (NonRetryableException .create (
106
+ "A retry was attempted, but AsyncRequestBody.forBlockingInputStream does not "
107
+ + "support retries. Consider using AsyncRequestBody.fromInputStream with an "
108
+ + "input stream that supports mark/reset to get retry support." ));
109
+ }
110
+ }
111
+
112
+ @ Override
113
+ public SdkPublisher <AsyncRequestBody > split (AsyncRequestBodySplitConfiguration splitConfiguration ) {
114
+ return new BlockingSplittingPublisher (this , splitConfiguration );
105
115
}
106
116
107
117
private void waitForSubscriptionIfNeeded () throws InterruptedException {
108
- long timeoutSeconds = subscribeTimeout .getSeconds ();
109
- if (!subscribedLatch .await (timeoutSeconds , TimeUnit .SECONDS )) {
110
- throw new IllegalStateException ("The service request was not made within " + timeoutSeconds + " seconds of "
111
- + "doBlockingWrite being invoked. Make sure to invoke the service request "
112
- + "BEFORE invoking doBlockingWrite if your caller is single-threaded." );
113
- }
118
+ long timeoutSeconds = subscribeTimeout .getSeconds ();
119
+ if (!subscribedLatch .await (timeoutSeconds , TimeUnit .SECONDS )) {
120
+ throw new IllegalStateException ("The service request was not made within " + timeoutSeconds + " seconds of "
121
+ + "doBlockingWrite being invoked. Make sure to invoke the service request "
122
+ + "BEFORE invoking doBlockingWrite if your caller is single-threaded." );
123
+ }
124
+ }
125
+
126
+ private class BlockingSplittingPublisher extends SplittingPublisher {
127
+
128
+ public BlockingSplittingPublisher (AsyncRequestBody asyncRequestBody ,
129
+ AsyncRequestBodySplitConfiguration splitConfiguration ) {
130
+ super (asyncRequestBody , splitConfiguration );
131
+ }
132
+
133
+ @ Override
134
+ public void subscribe (Subscriber <? super AsyncRequestBody > downstreamSubscriber ) {
135
+ Subscriber <? super AsyncRequestBody > delegatingSubscriber = new DelegatingSubscriber <AsyncRequestBody , AsyncRequestBody >(
136
+ downstreamSubscriber ) {
137
+ @ Override
138
+ public void onSubscribe (Subscription subscription ) {
139
+ Subscription delegatingSubscription = new Subscription () {
140
+ @ Override
141
+ public void request (long n ) {
142
+ subscription .request (n );
143
+ }
144
+
145
+ @ Override
146
+ public void cancel () {
147
+ subscription .cancel ();
148
+
149
+ //Cancel origin body to prevent stuck calling thread
150
+ BlockingInputStreamAsyncRequestBody .this .cancel ();
151
+ }
152
+ };
153
+ super .onSubscribe (delegatingSubscription );
154
+ }
155
+
156
+ @ Override
157
+ public void onNext (AsyncRequestBody body ) {
158
+ subscriber .onNext (body );
159
+ }
160
+ };
161
+
162
+ super .subscribe (delegatingSubscriber );
163
+ }
114
164
}
115
165
}
0 commit comments