diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java index f022ced4fc342..12d95f2b81619 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java @@ -30,7 +30,6 @@ import org.elasticsearch.common.transport.NetworkExceptionHelper; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.CountDown; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.common.util.concurrent.ThreadContext; @@ -56,7 +55,6 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.LongConsumer; @@ -69,7 +67,7 @@ public class ShardFollowTasksExecutor extends PersistentTasksExecutor { - // TODO: check if both indices have the same history uuid - if (leaderGlobalCheckPoint == followGlobalCheckPoint) { - logger.debug("{} no write operations to fetch", followerShard); - retry(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } else { - assert followGlobalCheckPoint < leaderGlobalCheckPoint : "followGlobalCheckPoint [" + followGlobalCheckPoint + - "] is not below leaderGlobalCheckPoint [" + leaderGlobalCheckPoint + "]"; - logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, - leaderGlobalCheckPoint, followGlobalCheckPoint); - Executor ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); - Consumer handler = e -> { - if (e == null) { - task.updateProcessedGlobalCheckpoint(leaderGlobalCheckPoint); - prepare(leaderClient, followerClient, task, params, leaderGlobalCheckPoint, imdVersionChecker); - } else { - task.markAsFailed(e); - } - }; - ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, ccrExecutor, imdVersionChecker, - params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, - followerShard, handler); - coordinator.createChucks(followGlobalCheckPoint, leaderGlobalCheckPoint); - coordinator.start(); - } + logger.debug("{} fetching write operations, leaderGlobalCheckPoint={}, followGlobalCheckPoint={}", followerShard, + leaderGlobalCheckPoint, followGlobalCheckPoint); + ChunksCoordinator coordinator = new ChunksCoordinator(followerClient, leaderClient, threadPool, imdVersionChecker, + params.getMaxChunkSize(), params.getNumConcurrentChunks(), params.getProcessorMaxTranslogBytes(), leaderShard, + followerShard, task::markAsFailed, task::isRunning, task::updateProcessedGlobalCheckpoint); + coordinator.start(followGlobalCheckPoint, leaderGlobalCheckPoint); }, task::markAsFailed); } - private void retry(Client leaderClient, Client followerClient, ShardFollowNodeTask task, ShardFollowTask params, - long followGlobalCheckPoint, - IndexMetadataVersionChecker imdVersionChecker) { - threadPool.schedule(RETRY_TIMEOUT, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { - @Override - public void onFailure(Exception e) { - task.markAsFailed(e); - } - - @Override - protected void doRun() throws Exception { - prepare(leaderClient, followerClient, task, params, followGlobalCheckPoint, imdVersionChecker); - } - }); - } - - private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { + private static void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer handler, Consumer errorHandler) { client.admin().indices().stats(new IndicesStatsRequest().indices(shardId.getIndexName()), ActionListener.wrap(r -> { IndexStats indexStats = r.getIndex(shardId.getIndexName()); Optional filteredShardStats = Arrays.stream(indexStats.getShards()) @@ -201,37 +161,53 @@ private void fetchGlobalCheckpoint(Client client, ShardId shardId, LongConsumer static class ChunksCoordinator { private static final Logger LOGGER = Loggers.getLogger(ChunksCoordinator.class); - + private final Client followerClient; private final Client leaderClient; + private final ThreadPool threadPool; private final Executor ccrExecutor; private final IndexMetadataVersionChecker imdVersionChecker; private final long batchSize; - private final int concurrentProcessors; + private final int maxConcurrentWorker; private final long processorMaxTranslogBytes; private final ShardId leaderShard; private final ShardId followerShard; - private final Consumer handler; - - private final CountDown countDown; - private final Queue chunks = new ConcurrentLinkedQueue<>(); - private final AtomicReference failureHolder = new AtomicReference<>(); - - ChunksCoordinator(Client followerClient, Client leaderClient, Executor ccrExecutor, IndexMetadataVersionChecker imdVersionChecker, - long batchSize, int concurrentProcessors, long processorMaxTranslogBytes, ShardId leaderShard, - ShardId followerShard, Consumer handler) { + private final Consumer failureHandler; + private final Supplier stateSupplier; + private final LongConsumer processedGlobalCheckpointUpdater; + + private final AtomicInteger activeWorkers; + private final AtomicLong lastProcessedGlobalCheckpoint; + private final Queue chunkWorkerQueue = new ConcurrentLinkedQueue<>(); + + ChunksCoordinator(Client followerClient, + Client leaderClient, + ThreadPool threadPool, + IndexMetadataVersionChecker imdVersionChecker, + long batchSize, + int maxConcurrentWorker, + long processorMaxTranslogBytes, + ShardId leaderShard, + ShardId followerShard, + Consumer failureHandler, + Supplier runningSuppler, + LongConsumer processedGlobalCheckpointUpdater) { this.followerClient = followerClient; this.leaderClient = leaderClient; - this.ccrExecutor = ccrExecutor; + this.threadPool = threadPool; this.imdVersionChecker = imdVersionChecker; + this.ccrExecutor = threadPool.executor(Ccr.CCR_THREAD_POOL_NAME); this.batchSize = batchSize; - this.concurrentProcessors = concurrentProcessors; + this.maxConcurrentWorker = maxConcurrentWorker; this.processorMaxTranslogBytes = processorMaxTranslogBytes; this.leaderShard = leaderShard; this.followerShard = followerShard; - this.handler = handler; - this.countDown = new CountDown(concurrentProcessors); + this.failureHandler = failureHandler; + this.stateSupplier = runningSuppler; + this.processedGlobalCheckpointUpdater = processedGlobalCheckpointUpdater; + this.activeWorkers = new AtomicInteger(); + this.lastProcessedGlobalCheckpoint = new AtomicLong(); } /** @@ -244,69 +220,113 @@ void createChucks(final long from, final long to) { LOGGER.debug("{} Creating chunks for operation range [{}] to [{}]", leaderShard, from, to); for (long i = from; i < to; i += batchSize) { long v2 = i + batchSize <= to ? i + batchSize - 1 : to; - chunks.add(new long[]{i, v2}); + chunkWorkerQueue.add(new long[]{i, v2}); } } - void start() { + void updateChunksQueue(long previousGlobalcheckpoint) { + schedule(CHECK_LEADER_GLOBAL_CHECKPOINT_INTERVAL, () -> { + if (stateSupplier.get() == false) { + chunkWorkerQueue.clear(); + return; + } + + fetchGlobalCheckpoint(leaderClient, leaderShard, currentGlobalCheckPoint -> { + if (currentGlobalCheckPoint != previousGlobalcheckpoint) { + assert previousGlobalcheckpoint < currentGlobalCheckPoint : "followGlobalCheckPoint [" + previousGlobalcheckpoint + + "] is not below leaderGlobalCheckPoint [" + currentGlobalCheckPoint + "]"; + createChucks(previousGlobalcheckpoint, currentGlobalCheckPoint); + initiateChunkWorkers(); + updateChunksQueue(currentGlobalCheckPoint); + } else { + LOGGER.debug("{} no write operations to fetch", followerShard); + updateChunksQueue(previousGlobalcheckpoint); + } + }, failureHandler); + }); + } + + void start(long followerGlobalCheckpoint, long leaderGlobalCheckPoint) { + createChucks(followerGlobalCheckpoint, leaderGlobalCheckPoint); LOGGER.debug("{} Start coordination of [{}] chunks with [{}] concurrent processors", - leaderShard, chunks.size(), concurrentProcessors); - for (int i = 0; i < concurrentProcessors; i++) { + leaderShard, chunkWorkerQueue.size(), maxConcurrentWorker); + initiateChunkWorkers(); + updateChunksQueue(leaderGlobalCheckPoint); + } + + void initiateChunkWorkers() { + int workersToStart = maxConcurrentWorker - activeWorkers.get(); + if (workersToStart == 0) { + LOGGER.debug("{} No new chunk workers were started", followerShard); + return; + } + + LOGGER.debug("{} Starting [{}] new chunk workers", followerShard, workersToStart); + for (int i = 0; i < workersToStart; i++) { ccrExecutor.execute(new AbstractRunnable() { @Override public void onFailure(Exception e) { assert e != null; - LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", leaderShard), e); - postProcessChuck(e); + LOGGER.error(() -> new ParameterizedMessage("{} Failure starting processor", followerShard), e); + failureHandler.accept(e); } - + @Override protected void doRun() throws Exception { processNextChunk(); } }); + activeWorkers.incrementAndGet(); } } void processNextChunk() { - long[] chunk = chunks.poll(); + long[] chunk = chunkWorkerQueue.poll(); if (chunk == null) { - postProcessChuck(null); + int activeWorkers = this.activeWorkers.decrementAndGet(); + LOGGER.debug("{} No more chunks to process, active workers [{}]", leaderShard, activeWorkers); return; } LOGGER.debug("{} Processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); Consumer processorHandler = e -> { if (e == null) { LOGGER.debug("{} Successfully processed chunk [{}/{}]", leaderShard, chunk[0], chunk[1]); + if (lastProcessedGlobalCheckpoint.updateAndGet(x -> x < chunk[1] ? chunk[1] : x) == chunk[1]) { + processedGlobalCheckpointUpdater.accept(chunk[1]); + } processNextChunk(); } else { LOGGER.error(() -> new ParameterizedMessage("{} Failure processing chunk [{}/{}]", leaderShard, chunk[0], chunk[1]), e); - postProcessChuck(e); + failureHandler.accept(e); } }; - ChunkProcessor processor = new ChunkProcessor(leaderClient, followerClient, chunks, ccrExecutor, imdVersionChecker, + ChunkWorker worker = new ChunkWorker(leaderClient, followerClient, chunkWorkerQueue, ccrExecutor, imdVersionChecker, leaderShard, followerShard, processorHandler); - processor.start(chunk[0], chunk[1], processorMaxTranslogBytes); + worker.start(chunk[0], chunk[1], processorMaxTranslogBytes); } - void postProcessChuck(Exception e) { - if (failureHolder.compareAndSet(null, e) == false) { - Exception firstFailure = failureHolder.get(); - firstFailure.addSuppressed(e); - } - if (countDown.countDown()) { - handler.accept(failureHolder.get()); - } + void schedule(TimeValue delay, Runnable runnable) { + threadPool.schedule(delay, Ccr.CCR_THREAD_POOL_NAME, new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + failureHandler.accept(e); + } + + @Override + protected void doRun() throws Exception { + runnable.run(); + } + }); } - Queue getChunks() { - return chunks; + Queue getChunkWorkerQueue() { + return chunkWorkerQueue; } } - static class ChunkProcessor { + static class ChunkWorker { private final Client leaderClient; private final Client followerClient; @@ -319,9 +339,9 @@ static class ChunkProcessor { private final Consumer handler; final AtomicInteger retryCounter = new AtomicInteger(0); - ChunkProcessor(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, - BiConsumer> indexVersionChecker, - ShardId leaderShard, ShardId followerShard, Consumer handler) { + ChunkWorker(Client leaderClient, Client followerClient, Queue chunks, Executor ccrExecutor, + BiConsumer> indexVersionChecker, ShardId leaderShard, ShardId followerShard, + Consumer handler) { this.leaderClient = leaderClient; this.followerClient = followerClient; this.chunks = chunks; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java index 9af0d93e9e2bc..e91c345f372bd 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ChunksCoordinatorTests.java @@ -12,7 +12,9 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkProcessor; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunkWorker; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.ChunksCoordinator; import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.IndexMetadataVersionChecker; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction; @@ -26,10 +28,12 @@ import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.LongConsumer; import static org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor.PROCESSOR_RETRY_LIMIT; import static org.hamcrest.CoreMatchers.notNullValue; @@ -50,32 +54,34 @@ public class ChunksCoordinatorTests extends ESTestCase { public void testCreateChunks() { Client client = mock(Client.class); - Executor ccrExecutor = Runnable::run; + ThreadPool threadPool = createMockThreadPool(false); + ShardId leaderShardId = new ShardId("index1", "index1", 0); ShardId followShardId = new ShardId("index2", "index1", 0); IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1024, 1, - Long.MAX_VALUE, leaderShardId, followShardId, e -> {}); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, threadPool, checker, 1024, 1, + Long.MAX_VALUE, + leaderShardId, followShardId, e -> {}, () -> true, value -> {}); coordinator.createChucks(0, 1023); - List result = new ArrayList<>(coordinator.getChunks()); + List result = new ArrayList<>(coordinator.getChunkWorkerQueue()); assertThat(result.size(), equalTo(1)); assertThat(result.get(0)[0], equalTo(0L)); assertThat(result.get(0)[1], equalTo(1023L)); - coordinator.getChunks().clear(); + coordinator.getChunkWorkerQueue().clear(); coordinator.createChucks(0, 2047); - result = new ArrayList<>(coordinator.getChunks()); + result = new ArrayList<>(coordinator.getChunkWorkerQueue()); assertThat(result.size(), equalTo(2)); assertThat(result.get(0)[0], equalTo(0L)); assertThat(result.get(0)[1], equalTo(1023L)); assertThat(result.get(1)[0], equalTo(1024L)); assertThat(result.get(1)[1], equalTo(2047L)); - coordinator.getChunks().clear(); + coordinator.getChunkWorkerQueue().clear(); coordinator.createChucks(0, 4095); - result = new ArrayList<>(coordinator.getChunks()); + result = new ArrayList<>(coordinator.getChunkWorkerQueue()); assertThat(result.size(), equalTo(4)); assertThat(result.get(0)[0], equalTo(0L)); assertThat(result.get(0)[1], equalTo(1023L)); @@ -86,9 +92,9 @@ public void testCreateChunks() { assertThat(result.get(3)[0], equalTo(3072L)); assertThat(result.get(3)[1], equalTo(4095L)); - coordinator.getChunks().clear(); + coordinator.getChunkWorkerQueue().clear(); coordinator.createChucks(4096, 8196); - result = new ArrayList<>(coordinator.getChunks()); + result = new ArrayList<>(coordinator.getChunkWorkerQueue()); assertThat(result.size(), equalTo(5)); assertThat(result.get(0)[0], equalTo(4096L)); assertThat(result.get(0)[1], equalTo(5119L)); @@ -104,10 +110,9 @@ public void testCreateChunks() { public void testCoordinator() throws Exception { Client client = createClientMock(); - + ThreadPool threadPool = createMockThreadPool(false); mockShardChangesApiCall(client); mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = Runnable::run; ShardId leaderShardId = new ShardId("index1", "index1", 0); ShardId followShardId = new ShardId("index2", "index1", 0); @@ -116,21 +121,19 @@ public void testCoordinator() throws Exception { int batchSize = randomIntBetween(1, 1000); IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, batchSize, - concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, threadPool, checker, batchSize, + concurrentProcessors, Long.MAX_VALUE, leaderShardId, followShardId, handler, + () -> true, value -> {}); int numberOfOps = randomIntBetween(batchSize, batchSize * 20); long from = randomInt(1000); long to = from + numberOfOps - 1; - coordinator.createChucks(from, to); int expectedNumberOfChunks = numberOfOps / batchSize; if (numberOfOps % batchSize > 0) { expectedNumberOfChunks++; } - assertThat(coordinator.getChunks().size(), equalTo(expectedNumberOfChunks)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(0)); + coordinator.start(from, to); + assertThat(coordinator.getChunkWorkerQueue().size(), equalTo(0)); verify(client, times(expectedNumberOfChunks)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); verify(client, times(expectedNumberOfChunks)).execute(same(BulkShardOperationsAction.INSTANCE), @@ -140,6 +143,7 @@ public void testCoordinator() throws Exception { public void testCoordinator_failure() throws Exception { Exception expectedException = new RuntimeException("throw me"); Client client = createClientMock(); + ThreadPool threadPool = createMockThreadPool(false); boolean shardChangesActionApiCallFailed; if (randomBoolean()) { shardChangesActionApiCallFailed = true; @@ -161,13 +165,11 @@ public void testCoordinator_failure() throws Exception { }; IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 10, 1, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 19); - assertThat(coordinator.getChunks().size(), equalTo(2)); - - coordinator.start(); - assertThat(coordinator.getChunks().size(), equalTo(1)); + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, threadPool, checker,10, 1, Long.MAX_VALUE, + leaderShardId, followShardId, handler, () -> true, value -> {}); + coordinator.start(0, 19); + + assertThat(coordinator.getChunkWorkerQueue().size(), equalTo(1)); verify(client, times(1)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); verify(client, times(shardChangesActionApiCallFailed ? 0 : 1)).execute(same(BulkShardOperationsAction.INSTANCE), @@ -176,9 +178,9 @@ public void testCoordinator_failure() throws Exception { public void testCoordinator_concurrent() throws Exception { Client client = createClientMock(); + ThreadPool threadPool = createMockThreadPool(true); mockShardChangesApiCall(client); mockBulkShardOperationsApiCall(client); - Executor ccrExecutor = command -> new Thread(command).start(); ShardId leaderShardId = new ShardId("index1", "index1", 0); ShardId followShardId = new ShardId("index2", "index1", 0); @@ -193,21 +195,23 @@ public void testCoordinator_concurrent() throws Exception { latch.countDown(); }; IndexMetadataVersionChecker checker = new IndexMetadataVersionChecker(leaderShardId.getIndex(), - followShardId.getIndex(), client, client); - ChunksCoordinator coordinator = new ChunksCoordinator(client, client, ccrExecutor, checker, 1000, 4, Long.MAX_VALUE, - leaderShardId, followShardId, handler); - coordinator.createChucks(0, 999999); - assertThat(coordinator.getChunks().size(), equalTo(1000)); - - coordinator.start(); + followShardId.getIndex(), client, client); + LongConsumer processedGlobalCheckpointHandler = value -> { + if (value == 999999) { + latch.countDown(); + } + }; + ChunksCoordinator coordinator = new ChunksCoordinator(client, client, threadPool, checker, 1000, 4, Long.MAX_VALUE, + leaderShardId, followShardId, handler, () -> true, processedGlobalCheckpointHandler); + coordinator.start(0, 999999); latch.await(); - assertThat(coordinator.getChunks().size(), equalTo(0)); + assertThat(coordinator.getChunkWorkerQueue().size(), equalTo(0)); verify(client, times(1000)).execute(same(ShardChangesAction.INSTANCE), any(ShardChangesAction.Request.class), any()); verify(client, times(1000)).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); assertThat(calledOnceChecker.get(), is(false)); } - public void testChunkProcessor() { + public void testChunkWorker() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockShardChangesApiCall(client); @@ -221,14 +225,14 @@ public void testChunkProcessor() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); } - public void testChunkProcessorRetry() { + public void testChunkWorkerRetry() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockBulkShardOperationsApiCall(client); @@ -244,15 +248,15 @@ public void testChunkProcessorRetry() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit + 1)); + assertThat(chunkWorker.retryCounter.get(), equalTo(testRetryLimit + 1)); } - public void testChunkProcessorRetryTooManyTimes() { + public void testChunkWorkerRetryTooManyTimes() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockBulkShardOperationsApiCall(client); @@ -268,17 +272,17 @@ public void testChunkProcessorRetryTooManyTimes() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); assertThat(exception[0].getMessage(), equalTo("retrying failed [17] times, aborting...")); assertThat(exception[0].getCause().getMessage(), equalTo("connection exception")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(testRetryLimit)); + assertThat(chunkWorker.retryCounter.get(), equalTo(testRetryLimit)); } - public void testChunkProcessorNoneRetryableError() { + public void testChunkWorkerNoneRetryableError() { Client client = createClientMock(); Queue chunks = new LinkedList<>(); mockBulkShardOperationsApiCall(client); @@ -293,16 +297,16 @@ public void testChunkProcessorNoneRetryableError() { boolean[] invoked = new boolean[1]; Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; - ChunkProcessor chunkProcessor = new ChunkProcessor(client, client, chunks, ccrExecutor, checker, leaderShardId, + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, checker, leaderShardId, followShardId, handler); - chunkProcessor.start(0, 10, Long.MAX_VALUE); + chunkWorker.start(0, 10, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], notNullValue()); assertThat(exception[0].getMessage(), equalTo("unexpected")); - assertThat(chunkProcessor.retryCounter.get(), equalTo(0)); + assertThat(chunkWorker.retryCounter.get(), equalTo(0)); } - public void testChunkProcessorExceedMaxTranslogsBytes() { + public void testChunkWorkerExceedMaxTranslogsBytes() { long from = 0; long to = 20; long actualTo = 10; @@ -333,9 +337,9 @@ public void testChunkProcessorExceedMaxTranslogsBytes() { Exception[] exception = new Exception[1]; Consumer handler = e -> {invoked[0] = true;exception[0] = e;}; BiConsumer> versionChecker = (indexVersiuon, consumer) -> consumer.accept(null); - ChunkProcessor chunkProcessor = - new ChunkProcessor(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, followShardId, handler); - chunkProcessor.start(from, to, Long.MAX_VALUE); + ChunkWorker chunkWorker = new ChunkWorker(client, client, chunks, ccrExecutor, versionChecker, leaderShardId, + followShardId, handler); + chunkWorker.start(from, to, Long.MAX_VALUE); assertThat(invoked[0], is(true)); assertThat(exception[0], nullValue()); assertThat(chunks.size(), equalTo(1)); @@ -403,5 +407,21 @@ private void mockBulkShardOperationsApiCall(Client client) { return null; }).when(client).execute(same(BulkShardOperationsAction.INSTANCE), any(BulkShardOperationsRequest.class), any()); } + + private ThreadPool createMockThreadPool(boolean fork) { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executor = mock(ExecutorService.class); + doAnswer(invocation -> { + Runnable runnable = (Runnable) invocation.getArguments()[0]; + if (fork) { + new Thread(runnable).start(); + } else { + runnable.run(); + } + return null; + }).when(executor).execute(any()); + when(threadPool.executor(Ccr.CCR_THREAD_POOL_NAME)).thenReturn(executor); + return threadPool; + } }