@@ -105,10 +105,10 @@ private void handleIncomingMessages(Function<Mono<JSONRPCMessage>, Mono<JSONRPCM
105
105
.flatMap (message -> Mono .just (message )
106
106
.transform (inboundMessageHandler )
107
107
.contextWrite (ctx -> ctx .put ("observation" , "myObservation" )))
108
- .doOnComplete (() -> {
108
+ .doOnTerminate (() -> {
109
+ // The outbound processing will dispose its scheduler upon completion
109
110
this .outboundSink .tryEmitComplete ();
110
111
this .inboundScheduler .dispose ();
111
- this .outboundScheduler .dispose ();
112
112
})
113
113
.subscribe ();
114
114
}
@@ -208,13 +208,13 @@ else if (isClosing) {
208
208
})
209
209
.doOnComplete (() -> {
210
210
isClosing = true ;
211
- outboundSink . tryEmitComplete ();
211
+ outboundScheduler . dispose ();
212
212
})
213
213
.doOnError (e -> {
214
214
if (!isClosing ) {
215
215
logger .error ("Error in outbound processing" , e );
216
216
isClosing = true ;
217
- outboundSink . tryEmitComplete ();
217
+ outboundScheduler . dispose ();
218
218
}
219
219
})
220
220
.map (msg -> (JSONRPCMessage ) msg );
@@ -224,26 +224,15 @@ else if (isClosing) {
224
224
225
225
@ Override
226
226
public Mono <Void > closeGracefully () {
227
-
228
- return Mono .fromRunnable (() -> {
227
+ return Mono .<Void >defer (() -> {
229
228
isClosing = true ;
230
229
logger .debug ("Initiating graceful shutdown" );
231
- }). then ( Mono . defer (() -> {
232
- // First complete the sinks to stop processing
230
+ // Completing the inbound causes the outbound to be completed as well, so
231
+ // we only close the inbound.
233
232
inboundSink .tryEmitComplete ();
234
- outboundSink .tryEmitComplete ();
235
- return Mono .delay (Duration .ofMillis (100 ));
236
- })).then (Mono .fromRunnable (() -> {
237
- try {
238
- // Dispose schedulers with longer timeout
239
- inboundScheduler .dispose ();
240
- outboundScheduler .dispose ();
241
- logger .info ("Graceful shutdown completed" );
242
- }
243
- catch (Exception e ) {
244
- logger .error ("Error during graceful shutdown" , e );
245
- }
246
- })).then ().subscribeOn (Schedulers .boundedElastic ());
233
+ logger .info ("Graceful shutdown complete" );
234
+ return Mono .empty ();
235
+ }).subscribeOn (Schedulers .boundedElastic ());
247
236
}
248
237
249
238
@ Override
0 commit comments