Skip to content

Commit 7d3d12f

Browse files
authored
Initialize startup CcrRepositories (#36730) (#36919)
Currently, the CcrRepositoryManger only listens for settings updates and installs new repositories. It does not install the repositories that are in the initial settings. This commit, modifies the manager to install the initial repositories. Additionally, it modifies the ccr integration test to configure the remote leader node at startup, instead of using a settings update.
1 parent e487f14 commit 7d3d12f

File tree

3 files changed

+68
-33
lines changed

3 files changed

+68
-33
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
117117
private final boolean enabled;
118118
private final Settings settings;
119119
private final CcrLicenseChecker ccrLicenseChecker;
120-
121-
private final SetOnce<CcrRepositoryManager> repositoryManager = new SetOnce<>();
122120
private final SetOnce<CcrRestoreSourceService> restoreSourceService = new SetOnce<>();
123121
private Client client;
124122

@@ -165,13 +163,12 @@ public Collection<Object> createComponents(
165163
return emptyList();
166164
}
167165

168-
this.repositoryManager.set(new CcrRepositoryManager(settings, clusterService, (NodeClient) client));
169166
CcrRestoreSourceService restoreSourceService = new CcrRestoreSourceService(settings);
170167
this.restoreSourceService.set(restoreSourceService);
171168
return Arrays.asList(
172169
ccrLicenseChecker,
173170
restoreSourceService,
174-
repositoryManager.get(),
171+
new CcrRepositoryManager(settings, clusterService, (NodeClient) client),
175172
new AutoFollowCoordinator(client, clusterService, ccrLicenseChecker, threadPool::relativeTimeInMillis)
176173
);
177174
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrRepositoryManager.java

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.support.PlainActionFuture;
1010
import org.elasticsearch.client.node.NodeClient;
1111
import org.elasticsearch.cluster.service.ClusterService;
12+
import org.elasticsearch.common.component.AbstractLifecycleComponent;
1213
import org.elasticsearch.common.settings.Settings;
1314
import org.elasticsearch.transport.RemoteClusterAware;
1415
import org.elasticsearch.xpack.ccr.action.repositories.DeleteInternalCcrRepositoryAction;
@@ -17,31 +18,70 @@
1718
import org.elasticsearch.xpack.ccr.action.repositories.PutInternalCcrRepositoryRequest;
1819
import org.elasticsearch.xpack.ccr.repository.CcrRepository;
1920

21+
import java.io.IOException;
2022
import java.util.List;
23+
import java.util.Set;
2124

22-
class CcrRepositoryManager extends RemoteClusterAware {
25+
class CcrRepositoryManager extends AbstractLifecycleComponent {
2326

2427
private final NodeClient client;
28+
private final RemoteSettingsUpdateListener updateListener;
2529

2630
CcrRepositoryManager(Settings settings, ClusterService clusterService, NodeClient client) {
2731
super(settings);
2832
this.client = client;
29-
listenForUpdates(clusterService.getClusterSettings());
33+
updateListener = new RemoteSettingsUpdateListener(settings);
34+
updateListener.listenForUpdates(clusterService.getClusterSettings());
3035
}
3136

3237
@Override
33-
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxyAddress) {
34-
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
35-
if (addresses.isEmpty()) {
36-
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
37-
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
38-
client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
39-
assert f.isDone() : "Should be completed as it is executed synchronously";
40-
} else {
41-
PutInternalCcrRepositoryRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
42-
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
43-
client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f);
44-
assert f.isDone() : "Should be completed as it is executed synchronously";
38+
protected void doStart() {
39+
updateListener.init();
40+
}
41+
42+
@Override
43+
protected void doStop() {
44+
}
45+
46+
@Override
47+
protected void doClose() throws IOException {
48+
}
49+
50+
private void putRepository(String repositoryName) {
51+
PutInternalCcrRepositoryRequest request = new PutInternalCcrRepositoryRequest(repositoryName, CcrRepository.TYPE);
52+
PlainActionFuture<PutInternalCcrRepositoryAction.PutInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
53+
client.executeLocally(PutInternalCcrRepositoryAction.INSTANCE, request, f);
54+
assert f.isDone() : "Should be completed as it is executed synchronously";
55+
}
56+
57+
private void deleteRepository(String repositoryName) {
58+
DeleteInternalCcrRepositoryRequest request = new DeleteInternalCcrRepositoryRequest(repositoryName);
59+
PlainActionFuture<DeleteInternalCcrRepositoryAction.DeleteInternalCcrRepositoryResponse> f = PlainActionFuture.newFuture();
60+
client.executeLocally(DeleteInternalCcrRepositoryAction.INSTANCE, request, f);
61+
assert f.isDone() : "Should be completed as it is executed synchronously";
62+
}
63+
64+
private class RemoteSettingsUpdateListener extends RemoteClusterAware {
65+
66+
private RemoteSettingsUpdateListener(Settings settings) {
67+
super(settings);
68+
}
69+
70+
void init() {
71+
Set<String> clusterAliases = buildRemoteClustersDynamicConfig(settings).keySet();
72+
for (String clusterAlias : clusterAliases) {
73+
putRepository(CcrRepository.NAME_PREFIX + clusterAlias);
74+
}
75+
}
76+
77+
@Override
78+
protected void updateRemoteCluster(String clusterAlias, List<String> addresses, String proxy) {
79+
String repositoryName = CcrRepository.NAME_PREFIX + clusterAlias;
80+
if (addresses.isEmpty()) {
81+
deleteRepository(repositoryName);
82+
} else {
83+
putRepository(repositoryName);
84+
}
4585
}
4686
}
4787
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/CcrIntegTestCase.java

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksAction;
1313
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
1414
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
15-
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1615
import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
1716
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1817
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
@@ -122,35 +121,31 @@ public final void startClusters() throws Exception {
122121
}
123122

124123
stopClusters();
125-
NodeConfigurationSource nodeConfigurationSource = createNodeConfigurationSource();
126124
Collection<Class<? extends Plugin>> mockPlugins = Arrays.asList(ESIntegTestCase.TestSeedPlugin.class,
127125
TestZenDiscovery.TestPlugin.class, getTestTransportPlugin());
128126

129127
InternalTestCluster leaderCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
130-
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "leader", mockPlugins,
131-
Function.identity());
132-
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
133-
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), nodeConfigurationSource, 0, false, "follower", mockPlugins,
134-
Function.identity());
135-
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);
136-
128+
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(null), 0, false, "leader",
129+
mockPlugins, Function.identity());
137130
leaderCluster.beforeTest(random(), 0.0D);
138131
leaderCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
139132
assertBusy(() -> {
140133
ClusterService clusterService = leaderCluster.getInstance(ClusterService.class);
141134
assertNotNull(clusterService.state().metaData().custom(LicensesMetaData.TYPE));
142135
});
136+
137+
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
138+
InternalTestCluster followerCluster = new InternalTestCluster(randomLong(), createTempDir(), true, true, numberOfNodesPerCluster(),
139+
numberOfNodesPerCluster(), UUIDs.randomBase64UUID(random()), createNodeConfigurationSource(address), 0, false, "follower",
140+
mockPlugins, Function.identity());
141+
clusterGroup = new ClusterGroup(leaderCluster, followerCluster);
142+
143143
followerCluster.beforeTest(random(), 0.0D);
144144
followerCluster.ensureAtLeastNumDataNodes(numberOfNodesPerCluster());
145145
assertBusy(() -> {
146146
ClusterService clusterService = followerCluster.getInstance(ClusterService.class);
147147
assertNotNull(clusterService.state().metaData().custom(LicensesMetaData.TYPE));
148148
});
149-
150-
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
151-
String address = leaderCluster.getDataNodeInstance(TransportService.class).boundAddress().publishAddress().toString();
152-
updateSettingsRequest.persistentSettings(Settings.builder().put("cluster.remote.leader_cluster.seeds", address));
153-
assertAcked(followerClient().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
154149
}
155150

156151
/**
@@ -188,7 +183,7 @@ public void afterTest() throws Exception {
188183
}
189184
}
190185

191-
private NodeConfigurationSource createNodeConfigurationSource() {
186+
private NodeConfigurationSource createNodeConfigurationSource(String leaderSeedAddress) {
192187
Settings.Builder builder = Settings.builder();
193188
builder.put(NodeEnvironment.MAX_LOCAL_STORAGE_NODES_SETTING.getKey(), Integer.MAX_VALUE);
194189
// Default the watermarks to absurdly low to prevent the tests
@@ -209,6 +204,9 @@ private NodeConfigurationSource createNodeConfigurationSource() {
209204
builder.put(XPackSettings.LOGSTASH_ENABLED.getKey(), false);
210205
builder.put(LicenseService.SELF_GENERATED_LICENSE_TYPE.getKey(), "trial");
211206
builder.put(NetworkModule.HTTP_ENABLED.getKey(), false);
207+
if (leaderSeedAddress != null) {
208+
builder.put("cluster.remote.leader_cluster.seeds", leaderSeedAddress);
209+
}
212210
return new NodeConfigurationSource() {
213211
@Override
214212
public Settings nodeSettings(int nodeOrdinal) {

0 commit comments

Comments
 (0)