@@ -178,7 +178,7 @@ static class ChunksCoordinator {
178
178
private final LongConsumer processedGlobalCheckpointUpdater ;
179
179
180
180
private final AtomicInteger activeWorkers ;
181
- private final AtomicLong lastPolledGlobalCheckpoint ;
181
+ private final AtomicLong lastProcessedGlobalCheckpoint ;
182
182
private final Queue <long []> chunks = new ConcurrentLinkedQueue <>();
183
183
184
184
ChunksCoordinator (Client followerClient ,
@@ -207,7 +207,7 @@ static class ChunksCoordinator {
207
207
this .stateSupplier = runningSuppler ;
208
208
this .processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater ;
209
209
this .activeWorkers = new AtomicInteger ();
210
- this .lastPolledGlobalCheckpoint = new AtomicLong ();
210
+ this .lastProcessedGlobalCheckpoint = new AtomicLong ();
211
211
}
212
212
213
213
void createChucks (long from , long to ) {
@@ -218,34 +218,34 @@ void createChucks(long from, long to) {
218
218
}
219
219
}
220
220
221
- void updateChunksQueue () {
221
+ void updateChunksQueue (long previousGlobalcheckpoint ) {
222
222
schedule (CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL , () -> {
223
223
if (stateSupplier .get () == false ) {
224
+ chunks .clear ();
224
225
return ;
225
226
}
226
227
227
- fetchGlobalCheckpoint (leaderClient , leaderShard , leaderGlobalCheckPoint -> {
228
- long followerGlobalCheckpoint = lastPolledGlobalCheckpoint .get ();
229
- if (leaderGlobalCheckPoint != followerGlobalCheckpoint ) {
230
- assert followerGlobalCheckpoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followerGlobalCheckpoint +
231
- "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]" ;
232
- createChucks (lastPolledGlobalCheckpoint .get (), leaderGlobalCheckPoint );
228
+ fetchGlobalCheckpoint (leaderClient , leaderShard , currentGlobalCheckPoint -> {
229
+ if (currentGlobalCheckPoint != previousGlobalcheckpoint ) {
230
+ assert previousGlobalcheckpoint < currentGlobalCheckPoint : "followGlobalCheckPoint [" + previousGlobalcheckpoint +
231
+ "] is not below leaderGlobalCheckPoint [" + currentGlobalCheckPoint + "]" ;
232
+ createChucks (previousGlobalcheckpoint , currentGlobalCheckPoint );
233
233
initiateChunkWorkers ();
234
+ updateChunksQueue (currentGlobalCheckPoint );
234
235
} else {
235
236
LOGGER .debug ("{} no write operations to fetch" , followerShard );
237
+ updateChunksQueue (previousGlobalcheckpoint );
236
238
}
237
- updateChunksQueue ();
238
239
}, failureHandler );
239
240
});
240
241
}
241
242
242
243
void start (long followerGlobalCheckpoint , long leaderGlobalCheckPoint ) {
243
244
createChucks (followerGlobalCheckpoint , leaderGlobalCheckPoint );
244
- lastPolledGlobalCheckpoint .set (leaderGlobalCheckPoint );
245
245
LOGGER .debug ("{} Start coordination of [{}] chunks with [{}] concurrent processors" ,
246
246
leaderShard , chunks .size (), maxConcurrentWorker );
247
247
initiateChunkWorkers ();
248
- updateChunksQueue ();
248
+ updateChunksQueue (leaderGlobalCheckPoint );
249
249
}
250
250
251
251
void initiateChunkWorkers () {
@@ -275,10 +275,6 @@ protected void doRun() throws Exception {
275
275
}
276
276
277
277
void processNextChunk () {
278
- if (stateSupplier .get () == false ) {
279
- return ;
280
- }
281
-
282
278
long [] chunk = chunks .poll ();
283
279
if (chunk == null ) {
284
280
int activeWorkers = this .activeWorkers .decrementAndGet ();
@@ -289,7 +285,7 @@ void processNextChunk() {
289
285
Consumer <Exception > processorHandler = e -> {
290
286
if (e == null ) {
291
287
LOGGER .debug ("{} Successfully processed chunk [{}/{}]" , leaderShard , chunk [0 ], chunk [1 ]);
292
- if (lastPolledGlobalCheckpoint .updateAndGet (x -> x < chunk [1 ] ? chunk [1 ] : x ) == chunk [1 ]) {
288
+ if (lastProcessedGlobalCheckpoint .updateAndGet (x -> x < chunk [1 ] ? chunk [1 ] : x ) == chunk [1 ]) {
293
289
processedGlobalCheckpointUpdater .accept (chunk [1 ]);
294
290
}
295
291
processNextChunk ();
0 commit comments