Skip to content

Commit 9088d81

Browse files
committed
Generalize remote license checker (#32971)
Machine learning has baked a remote license checker for use in checking license compatibility of a remote license. This remote license checker has general usage for any feature that relies on a remote cluster. For example, cross-cluster replication will pull changes from a remote cluster and require that the local and remote clusters have platinum licenses. This commit generalizes the remote cluster license check for use in cross-cluster replication.
1 parent 712862c commit 9088d81

File tree

7 files changed

+736
-415
lines changed

7 files changed

+736
-415
lines changed
Lines changed: 281 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,281 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.license;
8+
9+
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.action.support.ContextPreservingActionListener;
12+
import org.elasticsearch.client.Client;
13+
import org.elasticsearch.common.util.concurrent.ThreadContext;
14+
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
15+
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
16+
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
17+
import org.elasticsearch.transport.RemoteClusterAware;
18+
import org.elasticsearch.xpack.core.action.XPackInfoAction;
19+
20+
import java.util.EnumSet;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.Locale;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.function.Predicate;
26+
import java.util.stream.Collectors;
27+
28+
/**
29+
* Checks remote clusters for license compatibility with a specified license predicate.
30+
*/
31+
public final class RemoteClusterLicenseChecker {
32+
33+
/**
34+
* Encapsulates the license info of a remote cluster.
35+
*/
36+
public static final class RemoteClusterLicenseInfo {
37+
38+
private final String clusterAlias;
39+
40+
/**
41+
* The alias of the remote cluster.
42+
*
43+
* @return the cluster alias
44+
*/
45+
public String clusterAlias() {
46+
return clusterAlias;
47+
}
48+
49+
private final XPackInfoResponse.LicenseInfo licenseInfo;
50+
51+
/**
52+
* The license info of the remote cluster.
53+
*
54+
* @return the license info
55+
*/
56+
public XPackInfoResponse.LicenseInfo licenseInfo() {
57+
return licenseInfo;
58+
}
59+
60+
RemoteClusterLicenseInfo(final String clusterAlias, final XPackInfoResponse.LicenseInfo licenseInfo) {
61+
this.clusterAlias = clusterAlias;
62+
this.licenseInfo = licenseInfo;
63+
}
64+
65+
}
66+
67+
/**
68+
* Encapsulates a remote cluster license check. The check is either successful if the license of the remote cluster is compatible with
69+
* the predicate used to check license compatibility, or the check is a failure.
70+
*/
71+
public static final class LicenseCheck {
72+
73+
private final RemoteClusterLicenseInfo remoteClusterLicenseInfo;
74+
75+
/**
76+
* The remote cluster license info. This method should only be invoked if this instance represents a failing license check.
77+
*
78+
* @return the remote cluster license info
79+
*/
80+
public RemoteClusterLicenseInfo remoteClusterLicenseInfo() {
81+
assert isSuccess() == false;
82+
return remoteClusterLicenseInfo;
83+
}
84+
85+
private static final LicenseCheck SUCCESS = new LicenseCheck(null);
86+
87+
/**
88+
* A successful license check.
89+
*
90+
* @return a successful license check instance
91+
*/
92+
public static LicenseCheck success() {
93+
return SUCCESS;
94+
}
95+
96+
/**
97+
* Test if this instance represents a successful license check.
98+
*
99+
* @return true if this instance represents a successful license check, otherwise false
100+
*/
101+
public boolean isSuccess() {
102+
return this == SUCCESS;
103+
}
104+
105+
/**
106+
* Creates a failing license check encapsulating the specified remote cluster license info.
107+
*
108+
* @param remoteClusterLicenseInfo the remote cluster license info
109+
* @return a failing license check
110+
*/
111+
public static LicenseCheck failure(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) {
112+
return new LicenseCheck(remoteClusterLicenseInfo);
113+
}
114+
115+
private LicenseCheck(final RemoteClusterLicenseInfo remoteClusterLicenseInfo) {
116+
this.remoteClusterLicenseInfo = remoteClusterLicenseInfo;
117+
}
118+
119+
}
120+
121+
private final Client client;
122+
private final Predicate<XPackInfoResponse.LicenseInfo> predicate;
123+
124+
/**
125+
* Constructs a remote cluster license checker with the specified license predicate for checking license compatibility. The predicate
126+
* does not need to check for the active license state as this is handled by the remote cluster license checker.
127+
*
128+
* @param client the client
129+
* @param predicate the license predicate
130+
*/
131+
public RemoteClusterLicenseChecker(final Client client, final Predicate<XPackInfoResponse.LicenseInfo> predicate) {
132+
this.client = client;
133+
this.predicate = predicate;
134+
}
135+
136+
public static boolean isLicensePlatinumOrTrial(final XPackInfoResponse.LicenseInfo licenseInfo) {
137+
final License.OperationMode mode = License.OperationMode.resolve(licenseInfo.getMode());
138+
return mode == License.OperationMode.PLATINUM || mode == License.OperationMode.TRIAL;
139+
}
140+
141+
/**
142+
* Checks the specified clusters for license compatibility. The specified callback will be invoked once if all clusters are
143+
* license-compatible, otherwise the specified callback will be invoked once on the first cluster that is not license-compatible.
144+
*
145+
* @param clusterAliases the cluster aliases to check
146+
* @param listener a callback
147+
*/
148+
public void checkRemoteClusterLicenses(final List<String> clusterAliases, final ActionListener<LicenseCheck> listener) {
149+
final Iterator<String> clusterAliasesIterator = clusterAliases.iterator();
150+
if (clusterAliasesIterator.hasNext() == false) {
151+
listener.onResponse(LicenseCheck.success());
152+
return;
153+
}
154+
155+
final AtomicReference<String> clusterAlias = new AtomicReference<>();
156+
157+
final ActionListener<XPackInfoResponse> infoListener = new ActionListener<XPackInfoResponse>() {
158+
159+
@Override
160+
public void onResponse(final XPackInfoResponse xPackInfoResponse) {
161+
final XPackInfoResponse.LicenseInfo licenseInfo = xPackInfoResponse.getLicenseInfo();
162+
if ((licenseInfo.getStatus() == LicenseStatus.ACTIVE) == false || predicate.test(licenseInfo) == false) {
163+
listener.onResponse(LicenseCheck.failure(new RemoteClusterLicenseInfo(clusterAlias.get(), licenseInfo)));
164+
return;
165+
}
166+
167+
if (clusterAliasesIterator.hasNext()) {
168+
clusterAlias.set(clusterAliasesIterator.next());
169+
// recurse to the next cluster
170+
remoteClusterLicense(clusterAlias.get(), this);
171+
} else {
172+
listener.onResponse(LicenseCheck.success());
173+
}
174+
}
175+
176+
@Override
177+
public void onFailure(final Exception e) {
178+
final String message = "could not determine the license type for cluster [" + clusterAlias.get() + "]";
179+
listener.onFailure(new ElasticsearchException(message, e));
180+
}
181+
182+
};
183+
184+
// check the license on the first cluster, and then we recursively check licenses on the remaining clusters
185+
clusterAlias.set(clusterAliasesIterator.next());
186+
remoteClusterLicense(clusterAlias.get(), infoListener);
187+
}
188+
189+
private void remoteClusterLicense(final String clusterAlias, final ActionListener<XPackInfoResponse> listener) {
190+
final ThreadContext threadContext = client.threadPool().getThreadContext();
191+
final ContextPreservingActionListener<XPackInfoResponse> contextPreservingActionListener =
192+
new ContextPreservingActionListener<>(threadContext.newRestorableContext(false), listener);
193+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
194+
// we stash any context here since this is an internal execution and should not leak any existing context information
195+
threadContext.markAsSystemContext();
196+
197+
final XPackInfoRequest request = new XPackInfoRequest();
198+
request.setCategories(EnumSet.of(XPackInfoRequest.Category.LICENSE));
199+
try {
200+
client.getRemoteClusterClient(clusterAlias).execute(XPackInfoAction.INSTANCE, request, contextPreservingActionListener);
201+
} catch (final Exception e) {
202+
contextPreservingActionListener.onFailure(e);
203+
}
204+
}
205+
}
206+
207+
/**
208+
* Predicate to test if the index name represents the name of a remote index.
209+
*
210+
* @param index the index name
211+
* @return true if the collection of indices contains a remote index, otherwise false
212+
*/
213+
public static boolean isRemoteIndex(final String index) {
214+
return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1;
215+
}
216+
217+
/**
218+
* Predicate to test if the collection of index names contains any that represent the name of a remote index.
219+
*
220+
* @param indices the collection of index names
221+
* @return true if the collection of index names contains a name that represents a remote index, otherwise false
222+
*/
223+
public static boolean containsRemoteIndex(final List<String> indices) {
224+
return indices.stream().anyMatch(RemoteClusterLicenseChecker::isRemoteIndex);
225+
}
226+
227+
/**
228+
* Filters the collection of index names for names that represent a remote index. Remote index names are of the form
229+
* {@code cluster_name:index_name}.
230+
*
231+
* @param indices the collection of index names
232+
* @return list of index names that represent remote index names
233+
*/
234+
public static List<String> remoteIndices(final List<String> indices) {
235+
return indices.stream().filter(RemoteClusterLicenseChecker::isRemoteIndex).collect(Collectors.toList());
236+
}
237+
238+
/**
239+
* Extract the list of remote cluster aliases from the list of index names. Remote index names are of the form
240+
* {@code cluster_alias:index_name} and the cluster_alias is extracted for each index name that represents a remote index.
241+
*
242+
* @param indices the collection of index names
243+
* @return the remote cluster names
244+
*/
245+
public static List<String> remoteClusterAliases(final List<String> indices) {
246+
return indices.stream()
247+
.filter(RemoteClusterLicenseChecker::isRemoteIndex)
248+
.map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR)))
249+
.distinct()
250+
.collect(Collectors.toList());
251+
}
252+
253+
/**
254+
* Constructs an error message for license incompatibility.
255+
*
256+
* @param feature the name of the feature that initiated the remote cluster license check.
257+
* @param remoteClusterLicenseInfo the remote cluster license info of the cluster that failed the license check
258+
* @return an error message representing license incompatibility
259+
*/
260+
public static String buildErrorMessage(
261+
final String feature,
262+
final RemoteClusterLicenseInfo remoteClusterLicenseInfo,
263+
final Predicate<XPackInfoResponse.LicenseInfo> predicate) {
264+
final StringBuilder error = new StringBuilder();
265+
if (remoteClusterLicenseInfo.licenseInfo().getStatus() != LicenseStatus.ACTIVE) {
266+
error.append(String.format(Locale.ROOT, "the license on cluster [%s] is not active", remoteClusterLicenseInfo.clusterAlias()));
267+
} else {
268+
assert predicate.test(remoteClusterLicenseInfo.licenseInfo()) == false : "license must be incompatible to build error message";
269+
final String message = String.format(
270+
Locale.ROOT,
271+
"the license mode [%s] on cluster [%s] does not enable [%s]",
272+
License.OperationMode.resolve(remoteClusterLicenseInfo.licenseInfo().getMode()),
273+
remoteClusterLicenseInfo.clusterAlias(),
274+
feature);
275+
error.append(message);
276+
}
277+
278+
return error.toString();
279+
}
280+
281+
}

0 commit comments

Comments
 (0)