Skip to content

Commit a85a651

Browse files
author
Kirill Chaykin
committed
aws#4801 Cancelling origin body to prevent stuck calling thread
1 parent 5c2868f commit a85a651

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/BlockingInputStreamAsyncRequestBody.java

+48
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,13 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525
import org.reactivestreams.Subscriber;
26+
import org.reactivestreams.Subscription;
2627
import software.amazon.awssdk.annotations.SdkPublicApi;
2728
import software.amazon.awssdk.core.exception.NonRetryableException;
29+
import software.amazon.awssdk.core.internal.async.SplittingPublisher;
2830
import software.amazon.awssdk.core.internal.io.SdkLengthAwareInputStream;
2931
import software.amazon.awssdk.core.internal.util.NoopSubscription;
32+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
3033
import software.amazon.awssdk.utils.async.InputStreamConsumingPublisher;
3134

3235
/**
@@ -104,6 +107,11 @@ public void subscribe(Subscriber<? super ByteBuffer> s) {
104107
}
105108
}
106109

110+
@Override
111+
public SdkPublisher<AsyncRequestBody> split(AsyncRequestBodySplitConfiguration splitConfiguration) {
112+
return new BlockingSplittingPublisher(this, splitConfiguration);
113+
}
114+
107115
private void waitForSubscriptionIfNeeded() throws InterruptedException {
108116
long timeoutSeconds = subscribeTimeout.getSeconds();
109117
if (!subscribedLatch.await(timeoutSeconds, TimeUnit.SECONDS)) {
@@ -112,4 +120,44 @@ private void waitForSubscriptionIfNeeded() throws InterruptedException {
112120
+ "BEFORE invoking doBlockingWrite if your caller is single-threaded.");
113121
}
114122
}
123+
124+
private class BlockingSplittingPublisher extends SplittingPublisher {
125+
126+
public BlockingSplittingPublisher(AsyncRequestBody asyncRequestBody,
127+
AsyncRequestBodySplitConfiguration splitConfiguration) {
128+
super(asyncRequestBody, splitConfiguration);
129+
}
130+
131+
@Override
132+
public void subscribe(Subscriber<? super AsyncRequestBody> downstreamSubscriber) {
133+
Subscriber<? super AsyncRequestBody> delegatingSubscriber = new DelegatingSubscriber<AsyncRequestBody, AsyncRequestBody>(
134+
downstreamSubscriber) {
135+
@Override
136+
public void onSubscribe(Subscription subscription) {
137+
Subscription delegatingSubscription = new Subscription() {
138+
@Override
139+
public void request(long n) {
140+
subscription.request(n);
141+
}
142+
143+
@Override
144+
public void cancel() {
145+
subscription.cancel();
146+
147+
//Cancel origin body to prevent stuck calling thread
148+
BlockingInputStreamAsyncRequestBody.this.cancel();
149+
}
150+
};
151+
super.onSubscribe(delegatingSubscription);
152+
}
153+
154+
@Override
155+
public void onNext(AsyncRequestBody body) {
156+
subscriber.onNext(body);
157+
}
158+
};
159+
160+
super.subscribe(delegatingSubscriber);
161+
}
162+
}
115163
}

0 commit comments

Comments
 (0)