Skip to content

Commit 74956ad

Browse files
committed
Implement CCR licensing (#33002)
This commit implements licensing for CCR. CCR will require a platinum license, and administrative endpoints will be disabled when a license is non-compliant.
1 parent 5df0ba5 commit 74956ad

File tree

17 files changed

+604
-88
lines changed

17 files changed

+604
-88
lines changed

x-pack/plugin/ccr/build.gradle

+5-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ task internalClusterTest(type: RandomizedTestingTask,
3333
}
3434

3535
internalClusterTest.mustRunAfter test
36-
check.dependsOn(internalClusterTest, 'qa:multi-cluster:followClusterTest', 'qa:multi-cluster-with-security:followClusterTest')
36+
check.dependsOn(
37+
internalClusterTest,
38+
'qa:multi-cluster:followClusterTest',
39+
'qa:multi-cluster-with-incompatible-license:followClusterTest',
40+
'qa:multi-cluster-with-security:followClusterTest')
3741

3842
dependencies {
3943
compileOnly "org.elasticsearch:elasticsearch:${version}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import org.elasticsearch.gradle.test.RestIntegTestTask
2+
3+
apply plugin: 'elasticsearch.standalone-test'
4+
5+
dependencies {
6+
testCompile project(path: xpackModule('core'), configuration: 'shadow')
7+
testCompile project(path: xpackModule('ccr'), configuration: 'runtime')
8+
}
9+
10+
task leaderClusterTest(type: RestIntegTestTask) {
11+
mustRunAfter(precommit)
12+
}
13+
14+
leaderClusterTestCluster {
15+
numNodes = 1
16+
clusterName = 'leader-cluster'
17+
}
18+
19+
leaderClusterTestRunner {
20+
systemProperty 'tests.is_leader_cluster', 'true'
21+
}
22+
23+
task followClusterTest(type: RestIntegTestTask) {}
24+
25+
followClusterTestCluster {
26+
dependsOn leaderClusterTestRunner
27+
numNodes = 1
28+
clusterName = 'follow-cluster'
29+
setting 'xpack.license.self_generated.type', 'trial'
30+
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
31+
}
32+
33+
followClusterTestRunner {
34+
systemProperty 'tests.is_leader_cluster', 'false'
35+
systemProperty 'tests.leader_host', "${-> leaderClusterTest.nodes.get(0).httpUri()}"
36+
finalizedBy 'leaderClusterTestCluster#stop'
37+
}
38+
39+
test.enabled = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.client.Request;
10+
import org.elasticsearch.client.ResponseException;
11+
import org.elasticsearch.common.Booleans;
12+
import org.elasticsearch.test.rest.ESRestTestCase;
13+
14+
import java.util.Locale;
15+
16+
import static org.hamcrest.Matchers.containsString;
17+
import static org.hamcrest.Matchers.hasToString;
18+
19+
public class CcrMultiClusterLicenseIT extends ESRestTestCase {
20+
21+
private final boolean runningAgainstLeaderCluster = Booleans.parseBoolean(System.getProperty("tests.is_leader_cluster"));
22+
23+
@Override
24+
protected boolean preserveClusterUponCompletion() {
25+
return true;
26+
}
27+
28+
public void testFollowIndex() {
29+
if (runningAgainstLeaderCluster == false) {
30+
final Request request = new Request("POST", "/follower/_ccr/follow");
31+
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
32+
assertLicenseIncompatible(request);
33+
}
34+
}
35+
36+
public void testCreateAndFollowIndex() {
37+
if (runningAgainstLeaderCluster == false) {
38+
final Request request = new Request("POST", "/follower/_ccr/create_and_follow");
39+
request.setJsonEntity("{\"leader_index\": \"leader_cluster:leader\"}");
40+
assertLicenseIncompatible(request);
41+
}
42+
}
43+
44+
private static void assertLicenseIncompatible(final Request request) {
45+
final ResponseException e = expectThrows(ResponseException.class, () -> client().performRequest(request));
46+
final String expected = String.format(
47+
Locale.ROOT,
48+
"can not fetch remote index [%s] metadata as the remote cluster [%s] is not licensed for [ccr]; " +
49+
"the license mode [BASIC] on cluster [%s] does not enable [ccr]",
50+
"leader_cluster:leader",
51+
"leader_cluster",
52+
"leader_cluster");
53+
assertThat(e, hasToString(containsString(expected)));
54+
}
55+
56+
}

x-pack/plugin/ccr/qa/multi-cluster/build.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ task leaderClusterTest(type: RestIntegTestTask) {
1414
leaderClusterTestCluster {
1515
numNodes = 1
1616
clusterName = 'leader-cluster'
17+
setting 'xpack.license.self_generated.type', 'trial'
1718
}
1819

1920
leaderClusterTestRunner {
@@ -26,6 +27,7 @@ followClusterTestCluster {
2627
dependsOn leaderClusterTestRunner
2728
numNodes = 1
2829
clusterName = 'follow-cluster'
30+
setting 'xpack.license.self_generated.type', 'trial'
2931
setting 'search.remote.leader_cluster.seeds', "\"${-> leaderClusterTest.nodes.get(0).transportUri()}\""
3032
}
3133

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

+33
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.elasticsearch.common.settings.Settings;
2121
import org.elasticsearch.common.settings.SettingsFilter;
2222
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
23+
import org.elasticsearch.env.Environment;
24+
import org.elasticsearch.env.NodeEnvironment;
2325
import org.elasticsearch.index.IndexSettings;
2426
import org.elasticsearch.index.engine.EngineFactory;
2527
import org.elasticsearch.license.XPackLicenseState;
@@ -31,10 +33,12 @@
3133
import org.elasticsearch.plugins.Plugin;
3234
import org.elasticsearch.rest.RestController;
3335
import org.elasticsearch.rest.RestHandler;
36+
import org.elasticsearch.script.ScriptService;
3437
import org.elasticsearch.tasks.Task;
3538
import org.elasticsearch.threadpool.ExecutorBuilder;
3639
import org.elasticsearch.threadpool.FixedExecutorBuilder;
3740
import org.elasticsearch.threadpool.ThreadPool;
41+
import org.elasticsearch.watcher.ResourceWatcherService;
3842
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
3943
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
4044
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
@@ -55,8 +59,10 @@
5559
import org.elasticsearch.xpack.core.XPackPlugin;
5660

5761
import java.util.Arrays;
62+
import java.util.Collection;
5863
import java.util.Collections;
5964
import java.util.List;
65+
import java.util.Objects;
6066
import java.util.Optional;
6167
import java.util.function.Supplier;
6268

@@ -73,6 +79,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
7379

7480
private final boolean enabled;
7581
private final Settings settings;
82+
private final CcrLicenseChecker ccrLicenseChecker;
7683

7784
private final boolean tribeNode;
7885
private final boolean tribeNodeClient;
@@ -82,11 +89,37 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
8289
*
8390
* @param settings the settings
8491
*/
92+
@SuppressWarnings("unused") // constructed reflectively by the plugin infrastructure
8593
public Ccr(final Settings settings) {
94+
this(settings, new CcrLicenseChecker());
95+
}
96+
97+
/**
98+
* Construct an instance of the CCR container with the specified settings and license checker.
99+
*
100+
* @param settings the settings
101+
* @param ccrLicenseChecker the CCR license checker
102+
*/
103+
Ccr(final Settings settings, final CcrLicenseChecker ccrLicenseChecker) {
86104
this.settings = settings;
87105
this.enabled = CCR_ENABLED_SETTING.get(settings);
88106
this.tribeNode = XPackClientActionPlugin.isTribeNode(settings);
89107
this.tribeNodeClient = XPackClientActionPlugin.isTribeClientNode(settings);
108+
this.ccrLicenseChecker = Objects.requireNonNull(ccrLicenseChecker);
109+
}
110+
111+
@Override
112+
public Collection<Object> createComponents(
113+
final Client client,
114+
final ClusterService clusterService,
115+
final ThreadPool threadPool,
116+
final ResourceWatcherService resourceWatcherService,
117+
final ScriptService scriptService,
118+
final NamedXContentRegistry xContentRegistry,
119+
final Environment environment,
120+
final NodeEnvironment nodeEnvironment,
121+
final NamedWriteableRegistry namedWriteableRegistry) {
122+
return Collections.singleton(ccrLicenseChecker);
90123
}
91124

92125
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.ElasticsearchStatusException;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
12+
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
13+
import org.elasticsearch.client.Client;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.metadata.IndexMetaData;
16+
import org.elasticsearch.license.RemoteClusterLicenseChecker;
17+
import org.elasticsearch.license.XPackLicenseState;
18+
import org.elasticsearch.rest.RestStatus;
19+
import org.elasticsearch.xpack.core.XPackPlugin;
20+
21+
import java.util.Collections;
22+
import java.util.Locale;
23+
import java.util.Objects;
24+
import java.util.function.BooleanSupplier;
25+
import java.util.function.Consumer;
26+
27+
/**
28+
* Encapsulates licensing checking for CCR.
29+
*/
30+
public final class CcrLicenseChecker {
31+
32+
private final BooleanSupplier isCcrAllowed;
33+
34+
/**
35+
* Constructs a CCR license checker with the default rule based on the license state for checking if CCR is allowed.
36+
*/
37+
CcrLicenseChecker() {
38+
this(XPackPlugin.getSharedLicenseState()::isCcrAllowed);
39+
}
40+
41+
/**
42+
* Constructs a CCR license checker with the specified boolean supplier.
43+
*
44+
* @param isCcrAllowed a boolean supplier that should return true if CCR is allowed and false otherwise
45+
*/
46+
CcrLicenseChecker(final BooleanSupplier isCcrAllowed) {
47+
this.isCcrAllowed = Objects.requireNonNull(isCcrAllowed);
48+
}
49+
50+
/**
51+
* Returns whether or not CCR is allowed.
52+
*
53+
* @return true if CCR is allowed, otherwise false
54+
*/
55+
public boolean isCcrAllowed() {
56+
return isCcrAllowed.getAsBoolean();
57+
}
58+
59+
/**
60+
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for
61+
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@link ActionListener#onFailure(Exception)} method
62+
* of the specified listener is invoked. Otherwise, the specified consumer is invoked with the leader index metadata fetched from the
63+
* remote cluster.
64+
*
65+
* @param client the client
66+
* @param clusterAlias the remote cluster alias
67+
* @param leaderIndex the name of the leader index
68+
* @param listener the listener
69+
* @param leaderIndexMetadataConsumer the leader index metadata consumer
70+
* @param <T> the type of response the listener is waiting for
71+
*/
72+
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata(
73+
final Client client,
74+
final String clusterAlias,
75+
final String leaderIndex,
76+
final ActionListener<T> listener,
77+
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) {
78+
// we have to check the license on the remote cluster
79+
new RemoteClusterLicenseChecker(client, XPackLicenseState::isCcrAllowedForOperationMode).checkRemoteClusterLicenses(
80+
Collections.singletonList(clusterAlias),
81+
new ActionListener<RemoteClusterLicenseChecker.LicenseCheck>() {
82+
83+
@Override
84+
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
85+
if (licenseCheck.isSuccess()) {
86+
final Client remoteClient = client.getRemoteClusterClient(clusterAlias);
87+
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
88+
clusterStateRequest.clear();
89+
clusterStateRequest.metaData(true);
90+
clusterStateRequest.indices(leaderIndex);
91+
final ActionListener<ClusterStateResponse> clusterStateListener = ActionListener.wrap(
92+
r -> {
93+
final ClusterState remoteClusterState = r.getState();
94+
final IndexMetaData leaderIndexMetadata =
95+
remoteClusterState.getMetaData().index(leaderIndex);
96+
leaderIndexMetadataConsumer.accept(leaderIndexMetadata);
97+
},
98+
listener::onFailure);
99+
// following an index in remote cluster, so use remote client to fetch leader index metadata
100+
remoteClient.admin().cluster().state(clusterStateRequest, clusterStateListener);
101+
} else {
102+
listener.onFailure(incompatibleRemoteLicense(leaderIndex, licenseCheck));
103+
}
104+
}
105+
106+
@Override
107+
public void onFailure(final Exception e) {
108+
listener.onFailure(unknownRemoteLicense(leaderIndex, clusterAlias, e));
109+
}
110+
111+
});
112+
}
113+
114+
private static ElasticsearchStatusException incompatibleRemoteLicense(
115+
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
116+
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias();
117+
final String message = String.format(
118+
Locale.ROOT,
119+
"can not fetch remote index [%s:%s] metadata as the remote cluster [%s] is not licensed for [ccr]; %s",
120+
clusterAlias,
121+
leaderIndex,
122+
clusterAlias,
123+
RemoteClusterLicenseChecker.buildErrorMessage(
124+
"ccr",
125+
licenseCheck.remoteClusterLicenseInfo(),
126+
RemoteClusterLicenseChecker::isLicensePlatinumOrTrial));
127+
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
128+
}
129+
130+
private static ElasticsearchStatusException unknownRemoteLicense(
131+
final String leaderIndex, final String clusterAlias, final Exception cause) {
132+
final String message = String.format(
133+
Locale.ROOT,
134+
"can not fetch remote index [%s:%s] metadata as the license state of the remote cluster [%s] could not be determined",
135+
clusterAlias,
136+
leaderIndex,
137+
clusterAlias);
138+
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, cause);
139+
}
140+
141+
}

0 commit comments

Comments
 (0)