Skip to content

Commit 555ff55

Browse files
authored
ESQL: Enterprise license enforcement for CCS (#118102)
ES|QL CCS is an enterprise licensed feature. This PR enforces that no ES|QL CCS query can proceed unless a valid enterprise or trial license is present on the querying cluster. If a valid license is not present a 400 Bad Request error is returned explaining that an enterprise license is needed and showing what license (if any) was found. If a valid license is found, then the license usage timestamp will be updated. Subsequent calls to the `GET /_license/feature_usage` endpoint will show an entry for `esql-ccs` with the last timestamp that it was checked and used. ``` { "features": [ { "family": null, "name": "esql-ccs", "context": null, "last_used": "2024-12-09T19:54:38.767Z", "license_level": "enterprise" } ] } ```
1 parent ce990a5 commit 555ff55

File tree

21 files changed

+935
-472
lines changed

21 files changed

+935
-472
lines changed

docs/changelog/118102.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 118102
2+
summary: "ESQL: Enterprise license enforcement for CCS"
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicensedFeature.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public boolean isNeedsActive() {
104104
return needsActive;
105105
}
106106

107-
/** Create a momentary feature for hte given license level */
107+
/** Create a momentary feature for the given license level */
108108
public static Momentary momentary(String family, String name, License.OperationMode licenseLevel) {
109109
return new Momentary(family, name, licenseLevel, true);
110110
}

x-pack/plugin/core/src/main/java/org/elasticsearch/license/XPackLicenseState.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public class XPackLicenseState {
106106
messages.put(XPackField.CCR, XPackLicenseState::ccrAcknowledgementMessages);
107107
messages.put(XPackField.ENTERPRISE_SEARCH, XPackLicenseState::enterpriseSearchAcknowledgementMessages);
108108
messages.put(XPackField.REDACT_PROCESSOR, XPackLicenseState::redactProcessorAcknowledgementMessages);
109+
messages.put(XPackField.ESQL, XPackLicenseState::esqlAcknowledgementMessages);
109110
ACKNOWLEDGMENT_MESSAGES = Collections.unmodifiableMap(messages);
110111
}
111112

@@ -243,6 +244,26 @@ private static String[] enterpriseSearchAcknowledgementMessages(OperationMode cu
243244
return Strings.EMPTY_ARRAY;
244245
}
245246

247+
private static String[] esqlAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
248+
/*
249+
* Provide an acknowledgement warning to customers that downgrade from Trial or Enterprise to a lower
250+
* license level (Basic, Standard, Gold or Premium) that they will no longer be able to do CCS in ES|QL.
251+
*/
252+
switch (newMode) {
253+
case BASIC:
254+
case STANDARD:
255+
case GOLD:
256+
case PLATINUM:
257+
switch (currentMode) {
258+
case TRIAL:
259+
case ENTERPRISE:
260+
return new String[] { "ES|QL cross-cluster search will be disabled." };
261+
}
262+
break;
263+
}
264+
return Strings.EMPTY_ARRAY;
265+
}
266+
246267
private static String[] machineLearningAcknowledgementMessages(OperationMode currentMode, OperationMode newMode) {
247268
switch (newMode) {
248269
case BASIC:

x-pack/plugin/core/src/test/java/org/elasticsearch/license/XPackLicenseStateTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.core.XPackField;
1414

1515
import java.util.Arrays;
16+
import java.util.List;
1617
import java.util.Map;
1718
import java.util.Set;
1819
import java.util.concurrent.atomic.AtomicInteger;
@@ -59,6 +60,12 @@ void assertAckMessages(String feature, OperationMode from, OperationMode to, int
5960
assertEquals(expectedMessages, gotMessages.length);
6061
}
6162

63+
void assertAckMessages(String feature, OperationMode from, OperationMode to, Set<String> expectedMessages) {
64+
String[] gotMessages = XPackLicenseState.ACKNOWLEDGMENT_MESSAGES.get(feature).apply(from, to);
65+
Set<String> actualMessages = Arrays.stream(gotMessages).collect(Collectors.toSet());
66+
assertThat(actualMessages, equalTo(expectedMessages));
67+
}
68+
6269
static <T> T randomFrom(T[] values, Predicate<T> filter) {
6370
return randomFrom(Arrays.stream(values).filter(filter).collect(Collectors.toList()));
6471
}
@@ -143,6 +150,16 @@ public void testCcrAckTrialOrPlatinumToNotTrialOrPlatinum() {
143150
assertAckMessages(XPackField.CCR, randomTrialOrPlatinumMode(), randomBasicStandardOrGold(), 1);
144151
}
145152

153+
public void testEsqlAckToTrialOrPlatinum() {
154+
assertAckMessages(XPackField.ESQL, randomMode(), randomFrom(TRIAL, ENTERPRISE), 0);
155+
}
156+
157+
public void testEsqlAckTrialOrEnterpriseToNotTrialOrEnterprise() {
158+
for (OperationMode to : List.of(BASIC, STANDARD, GOLD, PLATINUM)) {
159+
assertAckMessages(XPackField.ESQL, randomFrom(TRIAL, ENTERPRISE), to, Set.of("ES|QL cross-cluster search will be disabled."));
160+
}
161+
}
162+
146163
public void testExpiredLicense() {
147164
// use standard feature which would normally be allowed at all license levels
148165
LicensedFeature feature = LicensedFeature.momentary("family", "enterpriseFeature", STANDARD);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.action;
9+
10+
import org.elasticsearch.action.ActionType;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.TransportAction;
13+
import org.elasticsearch.client.internal.Client;
14+
import org.elasticsearch.client.internal.node.NodeClient;
15+
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.common.util.CollectionUtils;
17+
import org.elasticsearch.core.Tuple;
18+
import org.elasticsearch.ingest.common.IngestCommonPlugin;
19+
import org.elasticsearch.injection.guice.Inject;
20+
import org.elasticsearch.license.LicenseService;
21+
import org.elasticsearch.license.XPackLicenseState;
22+
import org.elasticsearch.plugins.Plugin;
23+
import org.elasticsearch.protocol.xpack.XPackInfoRequest;
24+
import org.elasticsearch.protocol.xpack.XPackInfoResponse;
25+
import org.elasticsearch.reindex.ReindexPlugin;
26+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
27+
import org.elasticsearch.transport.TransportService;
28+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
29+
import org.elasticsearch.xpack.core.XPackSettings;
30+
import org.elasticsearch.xpack.core.action.TransportXPackInfoAction;
31+
import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
32+
import org.elasticsearch.xpack.core.action.XPackInfoFeatureResponse;
33+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
34+
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
35+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
36+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
37+
import org.elasticsearch.xpack.enrich.EnrichPlugin;
38+
import org.elasticsearch.xpack.esql.EsqlTestUtils;
39+
import org.elasticsearch.xpack.esql.plan.logical.Enrich;
40+
import org.junit.After;
41+
import org.junit.Before;
42+
43+
import java.nio.file.Path;
44+
import java.util.ArrayList;
45+
import java.util.Collection;
46+
import java.util.Collections;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Set;
50+
import java.util.concurrent.TimeUnit;
51+
52+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
53+
import static org.hamcrest.Matchers.containsString;
54+
55+
public abstract class AbstractEnrichBasedCrossClusterTestCase extends AbstractMultiClustersTestCase {
56+
57+
public static String REMOTE_CLUSTER_1 = "c1";
58+
public static String REMOTE_CLUSTER_2 = "c2";
59+
60+
/**
61+
* subclasses should override if they don't want enrich policies wiped after each test method run
62+
*/
63+
protected boolean tolerateErrorsWhenWipingEnrichPolicies() {
64+
return false;
65+
}
66+
67+
@Override
68+
protected List<String> remoteClusterAlias() {
69+
return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2);
70+
}
71+
72+
protected Collection<String> allClusters() {
73+
return CollectionUtils.appendToCopy(remoteClusterAlias(), LOCAL_CLUSTER);
74+
}
75+
76+
@Override
77+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
78+
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
79+
plugins.add(CrossClustersEnrichIT.LocalStateEnrich.class);
80+
plugins.add(IngestCommonPlugin.class);
81+
plugins.add(ReindexPlugin.class);
82+
return plugins;
83+
}
84+
85+
@Override
86+
protected Settings nodeSettings() {
87+
return Settings.builder().put(super.nodeSettings()).put(XPackSettings.SECURITY_ENABLED.getKey(), false).build();
88+
}
89+
90+
static final EnrichPolicy hostPolicy = new EnrichPolicy("match", null, List.of("hosts"), "ip", List.of("ip", "os"));
91+
static final EnrichPolicy vendorPolicy = new EnrichPolicy("match", null, List.of("vendors"), "os", List.of("os", "vendor"));
92+
93+
@Before
94+
public void setupHostsEnrich() {
95+
// the hosts policy are identical on every node
96+
Map<String, String> allHosts = Map.of(
97+
"192.168.1.2",
98+
"Windows",
99+
"192.168.1.3",
100+
"MacOS",
101+
"192.168.1.4",
102+
"Linux",
103+
"192.168.1.5",
104+
"Android",
105+
"192.168.1.6",
106+
"iOS",
107+
"192.168.1.7",
108+
"Windows",
109+
"192.168.1.8",
110+
"MacOS",
111+
"192.168.1.9",
112+
"Linux",
113+
"192.168.1.10",
114+
"Linux",
115+
"192.168.1.11",
116+
"Windows"
117+
);
118+
for (String cluster : allClusters()) {
119+
Client client = client(cluster);
120+
client.admin().indices().prepareCreate("hosts").setMapping("ip", "type=ip", "os", "type=keyword").get();
121+
for (Map.Entry<String, String> h : allHosts.entrySet()) {
122+
client.prepareIndex("hosts").setSource("ip", h.getKey(), "os", h.getValue()).get();
123+
}
124+
client.admin().indices().prepareRefresh("hosts").get();
125+
client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts", hostPolicy))
126+
.actionGet();
127+
client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "hosts"))
128+
.actionGet();
129+
assertAcked(client.admin().indices().prepareDelete("hosts"));
130+
}
131+
}
132+
133+
@Before
134+
public void setupVendorPolicy() {
135+
var localVendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Samsung", "Linux", "Redhat");
136+
var c1Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Google", "Linux", "Suse");
137+
var c2Vendors = Map.of("Windows", "Microsoft", "MacOS", "Apple", "iOS", "Apple", "Android", "Sony", "Linux", "Ubuntu");
138+
var vendors = Map.of(LOCAL_CLUSTER, localVendors, REMOTE_CLUSTER_1, c1Vendors, REMOTE_CLUSTER_2, c2Vendors);
139+
for (Map.Entry<String, Map<String, String>> e : vendors.entrySet()) {
140+
Client client = client(e.getKey());
141+
client.admin().indices().prepareCreate("vendors").setMapping("os", "type=keyword", "vendor", "type=keyword").get();
142+
for (Map.Entry<String, String> v : e.getValue().entrySet()) {
143+
client.prepareIndex("vendors").setSource("os", v.getKey(), "vendor", v.getValue()).get();
144+
}
145+
client.admin().indices().prepareRefresh("vendors").get();
146+
client.execute(PutEnrichPolicyAction.INSTANCE, new PutEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors", vendorPolicy))
147+
.actionGet();
148+
client.execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, "vendors"))
149+
.actionGet();
150+
assertAcked(client.admin().indices().prepareDelete("vendors"));
151+
}
152+
}
153+
154+
@Before
155+
public void setupEventsIndices() {
156+
record Event(long timestamp, String user, String host) {
157+
158+
}
159+
List<Event> e0 = List.of(
160+
new Event(1, "matthew", "192.168.1.3"),
161+
new Event(2, "simon", "192.168.1.5"),
162+
new Event(3, "park", "192.168.1.2"),
163+
new Event(4, "andrew", "192.168.1.7"),
164+
new Event(5, "simon", "192.168.1.20"),
165+
new Event(6, "kevin", "192.168.1.2"),
166+
new Event(7, "akio", "192.168.1.5"),
167+
new Event(8, "luke", "192.168.1.2"),
168+
new Event(9, "jack", "192.168.1.4")
169+
);
170+
List<Event> e1 = List.of(
171+
new Event(1, "andres", "192.168.1.2"),
172+
new Event(2, "sergio", "192.168.1.6"),
173+
new Event(3, "kylian", "192.168.1.8"),
174+
new Event(4, "andrew", "192.168.1.9"),
175+
new Event(5, "jack", "192.168.1.3"),
176+
new Event(6, "kevin", "192.168.1.4"),
177+
new Event(7, "akio", "192.168.1.7"),
178+
new Event(8, "kevin", "192.168.1.21"),
179+
new Event(9, "andres", "192.168.1.8")
180+
);
181+
List<Event> e2 = List.of(
182+
new Event(1, "park", "192.168.1.25"),
183+
new Event(2, "akio", "192.168.1.5"),
184+
new Event(3, "park", "192.168.1.2"),
185+
new Event(4, "kevin", "192.168.1.3")
186+
);
187+
for (var c : Map.of(LOCAL_CLUSTER, e0, REMOTE_CLUSTER_1, e1, REMOTE_CLUSTER_2, e2).entrySet()) {
188+
Client client = client(c.getKey());
189+
client.admin()
190+
.indices()
191+
.prepareCreate("events")
192+
.setMapping("timestamp", "type=long", "user", "type=keyword", "host", "type=ip")
193+
.get();
194+
for (var e : c.getValue()) {
195+
client.prepareIndex("events").setSource("timestamp", e.timestamp, "user", e.user, "host", e.host).get();
196+
}
197+
client.admin().indices().prepareRefresh("events").get();
198+
}
199+
}
200+
201+
@After
202+
public void wipeEnrichPolicies() {
203+
for (String cluster : allClusters()) {
204+
cluster(cluster).wipe(Set.of());
205+
for (String policy : List.of("hosts", "vendors")) {
206+
if (tolerateErrorsWhenWipingEnrichPolicies()) {
207+
try {
208+
client(cluster).execute(
209+
DeleteEnrichPolicyAction.INSTANCE,
210+
new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
211+
);
212+
} catch (Exception e) {
213+
assertThat(e.getMessage(), containsString("Cluster is already closed"));
214+
}
215+
216+
} else {
217+
client(cluster).execute(
218+
DeleteEnrichPolicyAction.INSTANCE,
219+
new DeleteEnrichPolicyAction.Request(TEST_REQUEST_TIMEOUT, policy)
220+
);
221+
}
222+
}
223+
}
224+
}
225+
226+
static String enrichHosts(Enrich.Mode mode) {
227+
return EsqlTestUtils.randomEnrichCommand("hosts", mode, hostPolicy.getMatchField(), hostPolicy.getEnrichFields());
228+
}
229+
230+
static String enrichVendors(Enrich.Mode mode) {
231+
return EsqlTestUtils.randomEnrichCommand("vendors", mode, vendorPolicy.getMatchField(), vendorPolicy.getEnrichFields());
232+
}
233+
234+
protected EsqlQueryResponse runQuery(String query, Boolean ccsMetadataInResponse) {
235+
EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest();
236+
request.query(query);
237+
request.pragmas(AbstractEsqlIntegTestCase.randomPragmas());
238+
if (randomBoolean()) {
239+
request.profile(true);
240+
}
241+
if (ccsMetadataInResponse != null) {
242+
request.includeCCSMetadata(ccsMetadataInResponse);
243+
}
244+
return client(LOCAL_CLUSTER).execute(EsqlQueryAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
245+
}
246+
247+
public static Tuple<Boolean, Boolean> randomIncludeCCSMetadata() {
248+
return switch (randomIntBetween(1, 3)) {
249+
case 1 -> new Tuple<>(Boolean.TRUE, Boolean.TRUE);
250+
case 2 -> new Tuple<>(Boolean.FALSE, Boolean.FALSE);
251+
case 3 -> new Tuple<>(null, Boolean.FALSE);
252+
default -> throw new AssertionError("should not get here");
253+
};
254+
}
255+
256+
public static class LocalStateEnrich extends LocalStateCompositeXPackPlugin {
257+
public LocalStateEnrich(final Settings settings, final Path configPath) throws Exception {
258+
super(settings, configPath);
259+
260+
plugins.add(new EnrichPlugin(settings) {
261+
@Override
262+
protected XPackLicenseState getLicenseState() {
263+
return this.getLicenseState();
264+
}
265+
});
266+
}
267+
268+
public static class EnrichTransportXPackInfoAction extends TransportXPackInfoAction {
269+
@Inject
270+
public EnrichTransportXPackInfoAction(
271+
TransportService transportService,
272+
ActionFilters actionFilters,
273+
LicenseService licenseService,
274+
NodeClient client
275+
) {
276+
super(transportService, actionFilters, licenseService, client);
277+
}
278+
279+
@Override
280+
protected List<ActionType<XPackInfoFeatureResponse>> infoActions() {
281+
return Collections.singletonList(XPackInfoFeatureAction.ENRICH);
282+
}
283+
}
284+
285+
@Override
286+
protected Class<? extends TransportAction<XPackInfoRequest, XPackInfoResponse>> getInfoAction() {
287+
return CrossClustersQueriesWithInvalidLicenseIT.LocalStateEnrich.EnrichTransportXPackInfoAction.class;
288+
}
289+
}
290+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncQueryIT.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
3636
import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
3737
import org.elasticsearch.xpack.core.async.TransportDeleteAsyncResultAction;
38-
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
3938
import org.junit.Before;
4039

4140
import java.io.IOException;
@@ -78,7 +77,7 @@ protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
7877
@Override
7978
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
8079
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
81-
plugins.add(EsqlPlugin.class);
80+
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
8281
plugins.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class); // allows the async_search DELETE action
8382
plugins.add(InternalExchangePlugin.class);
8483
plugins.add(PauseFieldPlugin.class);

0 commit comments

Comments
 (0)