@@ -336,14 +336,65 @@ public void testNotificationUsesExecutor() {
336
336
};
337
337
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
338
338
globalCheckpointListeners .globalCheckpointUpdated (NO_OPS_PERFORMED );
339
+ final long globalCheckpoint = randomLongBetween (NO_OPS_PERFORMED , Long .MAX_VALUE );
340
+ final AtomicInteger notified = new AtomicInteger ();
339
341
final int numberOfListeners = randomIntBetween (0 , 16 );
340
342
for (int i = 0 ; i < numberOfListeners ; i ++) {
341
- globalCheckpointListeners .add (NO_OPS_PERFORMED , (g , e ) -> {});
343
+ globalCheckpointListeners .add (NO_OPS_PERFORMED , (g , e ) -> {
344
+ notified .incrementAndGet ();
345
+ assertThat (g , equalTo (globalCheckpoint ));
346
+ assertNull (e );
347
+ });
342
348
}
343
- globalCheckpointListeners .globalCheckpointUpdated (randomLongBetween (NO_OPS_PERFORMED , Long .MAX_VALUE ));
349
+ globalCheckpointListeners .globalCheckpointUpdated (globalCheckpoint );
350
+ assertThat (notified .get (), equalTo (numberOfListeners ));
344
351
assertThat (count .get (), equalTo (numberOfListeners == 0 ? 0 : 1 ));
345
352
}
346
353
354
+ public void testNotificationOnClosedUsesExecutor () throws IOException {
355
+ final AtomicInteger count = new AtomicInteger ();
356
+ final Executor executor = command -> {
357
+ count .incrementAndGet ();
358
+ command .run ();
359
+ };
360
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
361
+ globalCheckpointListeners .close ();
362
+ final AtomicInteger notified = new AtomicInteger ();
363
+ final int numberOfListeners = randomIntBetween (0 , 16 );
364
+ for (int i = 0 ; i < numberOfListeners ; i ++) {
365
+ globalCheckpointListeners .add (NO_OPS_PERFORMED , (g , e ) -> {
366
+ notified .incrementAndGet ();
367
+ assertThat (g , equalTo (UNASSIGNED_SEQ_NO ));
368
+ assertNotNull (e );
369
+ assertThat (e .getShardId (), equalTo (shardId ));
370
+ });
371
+ }
372
+ assertThat (notified .get (), equalTo (numberOfListeners ));
373
+ assertThat (count .get (), equalTo (numberOfListeners ));
374
+ }
375
+
376
+ public void testListenersReadyToBeNotifiedUsesExecutor () {
377
+ final AtomicInteger count = new AtomicInteger ();
378
+ final Executor executor = command -> {
379
+ count .incrementAndGet ();
380
+ command .run ();
381
+ };
382
+ final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
383
+ final long globalCheckpoint = randomNonNegativeLong ();
384
+ globalCheckpointListeners .globalCheckpointUpdated (globalCheckpoint );
385
+ final AtomicInteger notified = new AtomicInteger ();
386
+ final int numberOfListeners = randomIntBetween (0 , 16 );
387
+ for (int i = 0 ; i < numberOfListeners ; i ++) {
388
+ globalCheckpointListeners .add (randomLongBetween (0 , globalCheckpoint ), (g , e ) -> {
389
+ notified .incrementAndGet ();
390
+ assertThat (g , equalTo (globalCheckpoint ));
391
+ assertNull (e );
392
+ });
393
+ }
394
+ assertThat (notified .get (), equalTo (numberOfListeners ));
395
+ assertThat (count .get (), equalTo (numberOfListeners ));
396
+ }
397
+
347
398
public void testConcurrency () throws BrokenBarrierException , InterruptedException {
348
399
final ExecutorService executor = Executors .newFixedThreadPool (randomIntBetween (1 , 8 ));
349
400
final GlobalCheckpointListeners globalCheckpointListeners = new GlobalCheckpointListeners (shardId , executor , logger );
0 commit comments