diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java index 625429dc0abc6..0e147f66d6ebc 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java @@ -49,6 +49,14 @@ public final class CcrSettings { Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB), Setting.Property.Dynamic, Setting.Property.NodeScope); + /** + * File chunk size to send during recovery + */ + public static final Setting RECOVERY_CHUNK_SIZE = + Setting.byteSizeSetting("ccr.indices.recovery.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB), + new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic, + Setting.Property.NodeScope); + /** * The leader must open resources for a ccr recovery. If there is no activity for this interval of time, * the leader will close the restore session. @@ -77,22 +85,30 @@ static List> getSettings() { INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT, + RECOVERY_CHUNK_SIZE, CCR_WAIT_FOR_METADATA_TIMEOUT); } private final CombinedRateLimiter ccrRateLimiter; private volatile TimeValue recoveryActivityTimeout; private volatile TimeValue recoveryActionTimeout; + private volatile ByteSizeValue chunkSize; public CcrSettings(Settings settings, ClusterSettings clusterSettings) { this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings); this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings)); + this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings); clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout); } + private void setChunkSize(ByteSizeValue chunkSize) { + this.chunkSize = chunkSize; + } + private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { ccrRateLimiter.setMBPerSec(maxBytesPerSec); } @@ -105,6 +121,10 @@ private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) { this.recoveryActionTimeout = recoveryActionTimeout; } + public ByteSizeValue getChunkSize() { + return chunkSize; + } + public CombinedRateLimiter getRateLimiter() { return ccrRateLimiter; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index baad95d5a94df..7ceaeb903ec0e 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -329,8 +329,6 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S private static class RestoreSession extends FileRestoreContext implements Closeable { - private static final int BUFFER_SIZE = 1 << 16; - private final Client remoteClient; private final String sessionUUID; private final DiscoveryNode node; @@ -342,7 +340,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard, RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion, CcrSettings ccrSettings, LongConsumer throttleListener) { - super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE); + super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes())); this.remoteClient = remoteClient; this.sessionUUID = sessionUUID; this.node = node; diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java index d4d6d13f7a292..9f061b9c33099 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java @@ -169,8 +169,12 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID()); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38100") public void testDocsAreRecovered() throws Exception { + ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); + String chunkSize = randomFrom("4KB", "128KB", "1MB"); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); + String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster"; String leaderIndex = "index1"; String followerIndex = "index2"; @@ -243,6 +247,11 @@ public void testDocsAreRecovered() throws Exception { isRunning.set(false); thread.join(); + + settingsRequest = new ClusterUpdateSettingsRequest(); + ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY); + settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue)); + assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet()); } public void testRateLimitingIsEmployed() throws Exception { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java index 3035b96b5bcac..1c3c0da3d3c8a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java @@ -40,7 +40,8 @@ public void setUp() throws Exception { Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build(); taskQueue = new DeterministicTaskQueue(settings, random()); Set> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, - CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING); + CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, + CcrSettings.RECOVERY_CHUNK_SIZE); ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings); restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings)); }