15
15
16
16
package software .amazon .awssdk .transfer .s3 .internal ;
17
17
18
- import java .util .Optional ;
19
18
import java .util .concurrent .CompletableFuture ;
20
- import java .util .concurrent .atomic .AtomicBoolean ;
21
19
import java .util .concurrent .atomic .AtomicInteger ;
22
20
import java .util .function .Function ;
23
21
import org .reactivestreams .Subscriber ;
24
22
import org .reactivestreams .Subscription ;
25
23
import software .amazon .awssdk .annotations .SdkInternalApi ;
26
24
import software .amazon .awssdk .utils .Logger ;
27
25
import software .amazon .awssdk .utils .Validate ;
28
- import software .amazon .awssdk .utils .async .DemandIgnoringSubscription ;
29
- import software .amazon .awssdk .utils .async .StoringSubscriber ;
30
26
31
27
/**
32
28
* An implementation of {@link Subscriber} that execute the provided function for every event and limits the number of concurrent
@@ -41,20 +37,16 @@ public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
41
37
private final Function <T , CompletableFuture <?>> consumer ;
42
38
private final int maxConcurrentExecutions ;
43
39
private final AtomicInteger numRequestsInFlight ;
44
- private final AtomicBoolean isDelivering = new AtomicBoolean (false );
45
- private volatile boolean isStreamingDone ;
40
+ private volatile boolean upstreamDone ;
46
41
private Subscription subscription ;
47
42
48
- private final StoringSubscriber <T > storingSubscriber ;
49
-
50
43
public AsyncBufferingSubscriber (Function <T , CompletableFuture <?>> consumer ,
51
44
CompletableFuture <Void > returnFuture ,
52
45
int maxConcurrentExecutions ) {
53
46
this .returnFuture = returnFuture ;
54
47
this .consumer = consumer ;
55
48
this .maxConcurrentExecutions = maxConcurrentExecutions ;
56
49
this .numRequestsInFlight = new AtomicInteger (0 );
57
- this .storingSubscriber = new StoringSubscriber <>(Integer .MAX_VALUE );
58
50
}
59
51
60
52
@ Override
@@ -65,89 +57,41 @@ public void onSubscribe(Subscription subscription) {
65
57
subscription .cancel ();
66
58
return ;
67
59
}
68
- storingSubscriber .onSubscribe (new DemandIgnoringSubscription (subscription ));
69
60
this .subscription = subscription ;
70
61
subscription .request (maxConcurrentExecutions );
71
62
}
72
63
73
64
@ Override
74
65
public void onNext (T item ) {
75
- storingSubscriber .onNext (item );
76
- flushBufferIfNeeded ();
77
- }
78
-
79
- private void flushBufferIfNeeded () {
80
- if (isDelivering .compareAndSet (false , true )) {
81
- try {
82
- Optional <StoringSubscriber .Event <T >> next = storingSubscriber .peek ();
83
- while (numRequestsInFlight .get () < maxConcurrentExecutions ) {
84
- if (!next .isPresent ()) {
85
- subscription .request (1 );
86
- break ;
87
- }
88
-
89
- switch (next .get ().type ()) {
90
- case ON_COMPLETE :
91
- handleCompleteEvent ();
92
- break ;
93
- case ON_ERROR :
94
- handleError (next .get ().runtimeError ());
95
- break ;
96
- case ON_NEXT :
97
- handleOnNext (next .get ().value ());
98
- break ;
99
- default :
100
- handleError (new IllegalStateException ("Unknown stored type: " + next .get ().type ()));
101
- break ;
102
- }
103
-
104
- next = storingSubscriber .peek ();
105
- }
106
- } finally {
107
- isDelivering .set (false );
108
- }
109
- }
110
- }
111
-
112
- private void handleOnNext (T item ) {
113
- storingSubscriber .poll ();
114
-
115
- int numberOfRequestInFlight = numRequestsInFlight .incrementAndGet ();
116
- log .debug (() -> "Delivering next item, numRequestInFlight=" + numberOfRequestInFlight );
117
-
66
+ numRequestsInFlight .incrementAndGet ();
118
67
consumer .apply (item ).whenComplete ((r , t ) -> {
119
- numRequestsInFlight .decrementAndGet ();
120
- if (! isStreamingDone ) {
68
+ checkForCompletion ( numRequestsInFlight .decrementAndGet () );
69
+ synchronized ( this ) {
121
70
subscription .request (1 );
122
- } else {
123
- flushBufferIfNeeded ();
124
71
}
125
72
});
126
73
}
127
74
128
- private void handleCompleteEvent () {
129
- if (numRequestsInFlight .get () == 0 ) {
130
- returnFuture .complete (null );
131
- storingSubscriber .poll ();
132
- }
133
- }
134
-
135
75
@ Override
136
76
public void onError (Throwable t ) {
137
- handleError (t );
138
- storingSubscriber .onError (t );
139
- }
140
-
141
- private void handleError (Throwable t ) {
77
+ // Need to complete future exceptionally first to prevent
78
+ // accidental successful completion by a concurrent checkForCompletion.
142
79
returnFuture .completeExceptionally (t );
143
- storingSubscriber . poll () ;
80
+ upstreamDone = true ;
144
81
}
145
82
146
83
@ Override
147
84
public void onComplete () {
148
- isStreamingDone = true ;
149
- storingSubscriber .onComplete ();
150
- flushBufferIfNeeded ();
85
+ upstreamDone = true ;
86
+ checkForCompletion (numRequestsInFlight .get ());
87
+ }
88
+
89
+ private void checkForCompletion (int requestsInFlight ) {
90
+ if (upstreamDone && requestsInFlight == 0 ) {
91
+ // This could get invoked multiple times, but it doesn't matter
92
+ // because future.complete is idempotent.
93
+ returnFuture .complete (null );
94
+ }
151
95
}
152
96
153
97
/**
0 commit comments