Skip to content

Commit 4a15e2b

Browse files
authored
Make Ccr recovery file chunk size configurable (#38370)
This commit adds a byte setting `ccr.indices.recovery.chunk_size`. This setting configs the size of file chunk requested while recovering from remote.
1 parent c2a8fe1 commit 4a15e2b

File tree

4 files changed

+33
-5
lines changed

4 files changed

+33
-5
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@ public final class CcrSettings {
4949
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
5050
Setting.Property.Dynamic, Setting.Property.NodeScope);
5151

52+
/**
53+
* File chunk size to send during recovery
54+
*/
55+
public static final Setting<ByteSizeValue> RECOVERY_CHUNK_SIZE =
56+
Setting.byteSizeSetting("ccr.indices.recovery.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB),
57+
new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic,
58+
Setting.Property.NodeScope);
59+
5260
/**
5361
* The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
5462
* the leader will close the restore session.
@@ -77,22 +85,30 @@ static List<Setting<?>> getSettings() {
7785
INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
7886
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
7987
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
88+
RECOVERY_CHUNK_SIZE,
8089
CCR_WAIT_FOR_METADATA_TIMEOUT);
8190
}
8291

8392
private final CombinedRateLimiter ccrRateLimiter;
8493
private volatile TimeValue recoveryActivityTimeout;
8594
private volatile TimeValue recoveryActionTimeout;
95+
private volatile ByteSizeValue chunkSize;
8696

8797
public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
8898
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
8999
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
90100
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
101+
this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings);
91102
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
103+
clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize);
92104
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
93105
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
94106
}
95107

108+
private void setChunkSize(ByteSizeValue chunkSize) {
109+
this.chunkSize = chunkSize;
110+
}
111+
96112
private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
97113
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
98114
}
@@ -105,6 +121,10 @@ private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) {
105121
this.recoveryActionTimeout = recoveryActionTimeout;
106122
}
107123

124+
public ByteSizeValue getChunkSize() {
125+
return chunkSize;
126+
}
127+
108128
public CombinedRateLimiter getRateLimiter() {
109129
return ccrRateLimiter;
110130
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -329,8 +329,6 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S
329329

330330
private static class RestoreSession extends FileRestoreContext implements Closeable {
331331

332-
private static final int BUFFER_SIZE = 1 << 16;
333-
334332
private final Client remoteClient;
335333
private final String sessionUUID;
336334
private final DiscoveryNode node;
@@ -342,7 +340,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea
342340
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
343341
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
344342
CcrSettings ccrSettings, LongConsumer throttleListener) {
345-
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
343+
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
346344
this.remoteClient = remoteClient;
347345
this.sessionUUID = sessionUUID;
348346
this.node = node;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRepositoryIT.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,12 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I
169169
assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID());
170170
}
171171

172-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38100")
173172
public void testDocsAreRecovered() throws Exception {
173+
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
174+
String chunkSize = randomFrom("4KB", "128KB", "1MB");
175+
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
176+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
177+
174178
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
175179
String leaderIndex = "index1";
176180
String followerIndex = "index2";
@@ -243,6 +247,11 @@ public void testDocsAreRecovered() throws Exception {
243247

244248
isRunning.set(false);
245249
thread.join();
250+
251+
settingsRequest = new ClusterUpdateSettingsRequest();
252+
ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY);
253+
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue));
254+
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
246255
}
247256

248257
public void testRateLimitingIsEmployed() throws Exception {

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/repository/CcrRestoreSourceServiceTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public void setUp() throws Exception {
4040
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
4141
taskQueue = new DeterministicTaskQueue(settings, random());
4242
Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
43-
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING);
43+
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
44+
CcrSettings.RECOVERY_CHUNK_SIZE);
4445
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
4546
restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
4647
}

0 commit comments

Comments
 (0)