Skip to content

Make Ccr recovery file chunk size configurable #38370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ public final class CcrSettings {
Setting.byteSizeSetting("ccr.indices.recovery.max_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB),
Setting.Property.Dynamic, Setting.Property.NodeScope);

/**
* File chunk size to send during recovery
*/
public static final Setting<ByteSizeValue> RECOVERY_CHUNK_SIZE =
Setting.byteSizeSetting("ccr.indices.recovery.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB),
new ByteSizeValue(1, ByteSizeUnit.KB), new ByteSizeValue(1, ByteSizeUnit.GB), Setting.Property.Dynamic,
Setting.Property.NodeScope);

/**
* The leader must open resources for a ccr recovery. If there is no activity for this interval of time,
* the leader will close the restore session.
Expand Down Expand Up @@ -77,22 +85,30 @@ static List<Setting<?>> getSettings() {
INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CCR_AUTO_FOLLOW_WAIT_FOR_METADATA_TIMEOUT,
RECOVERY_CHUNK_SIZE,
CCR_WAIT_FOR_METADATA_TIMEOUT);
}

private final CombinedRateLimiter ccrRateLimiter;
private volatile TimeValue recoveryActivityTimeout;
private volatile TimeValue recoveryActionTimeout;
private volatile ByteSizeValue chunkSize;

public CcrSettings(Settings settings, ClusterSettings clusterSettings) {
this.recoveryActivityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.recoveryActionTimeout = INDICES_RECOVERY_ACTION_TIMEOUT_SETTING.get(settings);
this.ccrRateLimiter = new CombinedRateLimiter(RECOVERY_MAX_BYTES_PER_SECOND.get(settings));
this.chunkSize = RECOVERY_MAX_BYTES_PER_SECOND.get(settings);
clusterSettings.addSettingsUpdateConsumer(RECOVERY_MAX_BYTES_PER_SECOND, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(RECOVERY_CHUNK_SIZE, this::setChunkSize);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, this::setRecoveryActivityTimeout);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_ACTION_TIMEOUT_SETTING, this::setRecoveryActionTimeout);
}

private void setChunkSize(ByteSizeValue chunkSize) {
this.chunkSize = chunkSize;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
ccrRateLimiter.setMBPerSec(maxBytesPerSec);
}
Expand All @@ -105,6 +121,10 @@ private void setRecoveryActionTimeout(TimeValue recoveryActionTimeout) {
this.recoveryActionTimeout = recoveryActionTimeout;
}

public ByteSizeValue getChunkSize() {
return chunkSize;
}

public CombinedRateLimiter getRateLimiter() {
return ccrRateLimiter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,6 @@ private RestoreSession openSession(String repositoryName, Client remoteClient, S

private static class RestoreSession extends FileRestoreContext implements Closeable {

private static final int BUFFER_SIZE = 1 << 16;

private final Client remoteClient;
private final String sessionUUID;
private final DiscoveryNode node;
Expand All @@ -342,7 +340,7 @@ private static class RestoreSession extends FileRestoreContext implements Closea
RestoreSession(String repositoryName, Client remoteClient, String sessionUUID, DiscoveryNode node, IndexShard indexShard,
RecoveryState recoveryState, Store.MetadataSnapshot sourceMetaData, long mappingVersion,
CcrSettings ccrSettings, LongConsumer throttleListener) {
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, BUFFER_SIZE);
super(repositoryName, indexShard, SNAPSHOT_ID, recoveryState, Math.toIntExact(ccrSettings.getChunkSize().getBytes()));
this.remoteClient = remoteClient;
this.sessionUUID = sessionUUID;
this.node = node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,12 @@ public void testThatRepositoryRecoversEmptyIndexBasedOnLeaderSettings() throws I
assertNotEquals(leaderMetadata.getIndexUUID(), followerMetadata.getIndexUUID());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38100")
public void testDocsAreRecovered() throws Exception {
ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest();
String chunkSize = randomFrom("4KB", "128KB", "1MB");
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), chunkSize));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());

String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";
Expand Down Expand Up @@ -243,6 +247,11 @@ public void testDocsAreRecovered() throws Exception {

isRunning.set(false);
thread.join();

settingsRequest = new ClusterUpdateSettingsRequest();
ByteSizeValue defaultValue = CcrSettings.RECOVERY_CHUNK_SIZE.getDefault(Settings.EMPTY);
settingsRequest.persistentSettings(Settings.builder().put(CcrSettings.RECOVERY_CHUNK_SIZE.getKey(), defaultValue));
assertAcked(followerClient().admin().cluster().updateSettings(settingsRequest).actionGet());
}

public void testRateLimitingIsEmployed() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ public void setUp() throws Exception {
Settings settings = Settings.builder().put(NODE_NAME_SETTING.getKey(), "node").build();
taskQueue = new DeterministicTaskQueue(settings, random());
Set<Setting<?>> registeredSettings = Sets.newHashSet(CcrSettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING);
CcrSettings.RECOVERY_MAX_BYTES_PER_SECOND, CcrSettings.INDICES_RECOVERY_ACTION_TIMEOUT_SETTING,
CcrSettings.RECOVERY_CHUNK_SIZE);
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, registeredSettings);
restoreSourceService = new CcrRestoreSourceService(taskQueue.getThreadPool(), new CcrSettings(Settings.EMPTY, clusterSettings));
}
Expand Down