Skip to content

Commit 3dd5a5a

Browse files
authored
Initialize startup CcrRepositories (#36730)
Currently, the CcrRepositoryManger only listens for settings updates and installs new repositories. It does not install the repositories that are in the initial settings. This commit, modifies the manager to install the initial repositories. Additionally, it modifies the ccr integration test to configure the remote leader node at startup, instead of using a settings update.
1 parent 7bf822b commit 3dd5a5a

File tree

3 files changed

+66
-31
lines changed

3 files changed

+66
-31
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
package org.elasticsearch.xpack.ccr;
88

9-
import org.apache.lucene.util.SetOnce;
109
import org.elasticsearch.action.ActionRequest;
1110
import org.elasticsearch.action.ActionResponse;
1211
import org.elasticsearch.client.Client;
@@ -111,7 +110,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
111110
private final boolean enabled;
112111
private final Settings settings;
113112
private final CcrLicenseChecker ccrLicenseChecker;
114-
private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
115113
private Client client;
116114

117115
/**
@@ -152,10 +150,9 @@ public Collection<Object> createComponents(
152150
return emptyList();
153151
}
154152

155-
this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, client));
156-
157153
return Arrays.asList(
158154
ccrLicenseChecker,
155+
new CcrRepositoryManager(settings, clusterService, client),
159156
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
160157
);
161158
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.support.PlainActionFuture;
1111
import org.elasticsearch.client.Client;
1212
import org.elasticsearch.cluster.service.ClusterService;
13+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1314
import org.elasticsearch.common.settings.Settings;
1415
import org.elasticsearch.transport.RemoteClusterAware;
1516
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
@@ -18,31 +19,70 @@
1819
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest;
1920
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
2021

22+
import java.io.IOException;
2123
import java.util.List;
24+
import java.util.Set;
2225

23-
class CcrRepositoryManager extends RemoteClusterAware {
26+
class CcrRepositoryManager extends AbstractLifecycleComponent {
2427

2528
private final Client client;
29+
private final RemoteSettingsUpdateListener updateListener;
2630

2731
CcrRepositoryManager(Settings settings, ClusterService clusterService, Client client) {
2832
super(settings);
2933
this.client = client;
30-
listenForUpdates(clusterService.getClusterSettings());
34+
updateListener = new RemoteSettingsUpdateListener(settings);
35+
updateListener.listenForUpdates(clusterService.getClusterSettings());
3136
}
3237

3338
@Override
34-
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
35-
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
36-
if (addresses.isEmpty()) {
37-
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
38-
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
39-
client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
40-
assert f.isDone() : "Should be completed as it is executed synchronously";
41-
} else {
42-
ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
43-
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
44-
client.execute(PutInternalCcrRepositoryAction.INSTANCE, request, f);
45-
assert f.isDone() : "Should be completed as it is executed synchronously";
39+
protected void doStart() {
40+
updateListener.init();
41+
}
42+
43+
@Override
44+
protected void doStop() {
45+
}
46+
47+
@Override
48+
protected void doClose() throws IOException {
49+
}
50+
51+
private void putRepository(String repositoryName) {
52+
ActionRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
53+
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
54+
client.execute(PutInternalCcrRepositoryAction.INSTANCE, request, f);
55+
assert f.isDone() : "Should be completed as it is executed synchronously";
56+
}
57+
58+
private void deleteRepository(String repositoryName) {
59+
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
60+
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
61+
client.execute(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
62+
assert f.isDone() : "Should be completed as it is executed synchronously";
63+
}
64+
65+
private class RemoteSettingsUpdateListener extends RemoteClusterAware {
66+
67+
private RemoteSettingsUpdateListener(Settings settings) {
68+
super(settings);
69+
}
70+
71+
void init() {
72+
Set<String> clusterAliases = buildRemoteClustersDynamicConfig(settings).keySet();
73+
for (String clusterAlias : clusterAliases) {
74+
putRepository(CcrRepository.NAME_PREFIX + clusterAlias);
75+
}
76+
}
77+
78+
@Override
79+
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
80+
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
81+
if (addresses.isEmpty()) {
82+
deleteRepository(repositoryName);
83+
} else {
84+
putRepository(repositoryName);
85+
}
4686
}
4787
}
4888
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
1313
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1414
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
15-
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1615
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1716
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1817
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
@@ -117,27 +116,23 @@ public final void startClusters() throws Exception {
117116
}
118117

119118
stopClusters();
120-
NodeConfigurationSource nodeConfigurationSource = createNodeConfigurationSource();
121119
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
122120
TestZenDiscovery.TestPlugin.class, MockHttpTransport.TestPlugin.class, getTestTransportPlugin());
123121

124122
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
125-
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "leader", mockPlugins,
123+
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, "leader", mockPlugins,
126124
Function.identity());
125+
leaderCluster.beforeTest(random(), 0.0D);
126+
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
127+
128+
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
127129
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
128-
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, "follower", mockPlugins,
129-
Function.identity());
130+
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, "follower",
131+
mockPlugins, Function.identity());
130132
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);
131133

132-
leaderCluster.beforeTest(random(), 0.0D);
133-
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
134134
followerCluster.beforeTest(random(), 0.0D);
135135
followerCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
136-
137-
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
138-
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
139-
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
140-
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
141136
}
142137

143138
/**
@@ -175,7 +170,7 @@ public void afterTest() throws Exception {
175170
}
176171
}
177172

178-
private NodeConfigurationSource createNodeConfigurationSource() {
173+
private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
179174
Settings.Builder builder = Settings.builder();
180175
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
181176
// Default the watermarks to absurdly low to prevent the tests
@@ -195,6 +190,9 @@ private NodeConfigurationSource createNodeConfigurationSource() {
195190
builder.put(XPackSettings.MACHINE_LEARNING_ENABLED.getKey(), false);
196191
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
197192
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
193+
if (leaderSeedAddress != null) {
194+
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
195+
}
198196
return new NodeConfigurationSource() {
199197
@Override
200198
public Settings nodeSettings(int nodeOrdinal) {

0 commit comments

Comments
 (0)