Skip to content

Commit 2ca0181

Browse files
committed
[ML] Check licence when datafeeds use cross cluster search (#31247)
This change prevents a datafeed using cross cluster search from starting if the remote cluster does not have x-pack installed and a sufficient license. The check is made only when starting a datafeed.
1 parent 43c0ee0 commit 2ca0181

File tree

6 files changed

+493
-59
lines changed

6 files changed

+493
-59
lines changed

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040

4141
import java.io.IOException;
4242
import java.util.ArrayList;
43-
import java.util.Arrays;
4443
import java.util.Collections;
4544
import java.util.List;
4645
import java.util.TimeZone;
@@ -193,11 +192,11 @@ public void testDefaults() {
193192

194193
public void testDefaultQueryDelay() {
195194
DatafeedConfig.Builder feedBuilder1 = new DatafeedConfig.Builder("datafeed1", "job1");
196-
feedBuilder1.setIndices(Arrays.asList("foo"));
195+
feedBuilder1.setIndices(Collections.singletonList("foo"));
197196
DatafeedConfig.Builder feedBuilder2 = new DatafeedConfig.Builder("datafeed2", "job1");
198-
feedBuilder2.setIndices(Arrays.asList("foo"));
197+
feedBuilder2.setIndices(Collections.singletonList("foo"));
199198
DatafeedConfig.Builder feedBuilder3 = new DatafeedConfig.Builder("datafeed3", "job2");
200-
feedBuilder3.setIndices(Arrays.asList("foo"));
199+
feedBuilder3.setIndices(Collections.singletonList("foo"));
201200
DatafeedConfig feed1 = feedBuilder1.build();
202201
DatafeedConfig feed2 = feedBuilder2.build();
203202
DatafeedConfig feed3 = feedBuilder3.build();
@@ -208,19 +207,19 @@ public void testDefaultQueryDelay() {
208207
assertThat(feed1.getQueryDelay(), not(equalTo(feed3.getQueryDelay())));
209208
}
210209

211-
public void testCheckValid_GivenNullIndices() throws IOException {
210+
public void testCheckValid_GivenNullIndices() {
212211
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
213212
expectThrows(IllegalArgumentException.class, () -> conf.setIndices(null));
214213
}
215214

216-
public void testCheckValid_GivenEmptyIndices() throws IOException {
215+
public void testCheckValid_GivenEmptyIndices() {
217216
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
218217
conf.setIndices(Collections.emptyList());
219218
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, conf::build);
220219
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[]"), e.getMessage());
221220
}
222221

223-
public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException {
222+
public void testCheckValid_GivenIndicesContainsOnlyNulls() {
224223
List<String> indices = new ArrayList<>();
225224
indices.add(null);
226225
indices.add(null);
@@ -230,7 +229,7 @@ public void testCheckValid_GivenIndicesContainsOnlyNulls() throws IOException {
230229
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[null, null]"), e.getMessage());
231230
}
232231

233-
public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() throws IOException {
232+
public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() {
234233
List<String> indices = new ArrayList<>();
235234
indices.add("");
236235
indices.add("");
@@ -240,27 +239,27 @@ public void testCheckValid_GivenIndicesContainsOnlyEmptyStrings() throws IOExcep
240239
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "indices", "[, ]"), e.getMessage());
241240
}
242241

243-
public void testCheckValid_GivenNegativeQueryDelay() throws IOException {
242+
public void testCheckValid_GivenNegativeQueryDelay() {
244243
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
245244
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
246245
() -> conf.setQueryDelay(TimeValue.timeValueMillis(-10)));
247246
assertEquals("query_delay cannot be less than 0. Value = -10", e.getMessage());
248247
}
249248

250-
public void testCheckValid_GivenZeroFrequency() throws IOException {
249+
public void testCheckValid_GivenZeroFrequency() {
251250
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
252251
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, () -> conf.setFrequency(TimeValue.ZERO));
253252
assertEquals("frequency cannot be less or equal than 0. Value = 0s", e.getMessage());
254253
}
255254

256-
public void testCheckValid_GivenNegativeFrequency() throws IOException {
255+
public void testCheckValid_GivenNegativeFrequency() {
257256
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
258257
IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class,
259258
() -> conf.setFrequency(TimeValue.timeValueMinutes(-1)));
260259
assertEquals("frequency cannot be less or equal than 0. Value = -1", e.getMessage());
261260
}
262261

263-
public void testCheckValid_GivenNegativeScrollSize() throws IOException {
262+
public void testCheckValid_GivenNegativeScrollSize() {
264263
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
265264
ElasticsearchException e = ESTestCase.expectThrows(ElasticsearchException.class, () -> conf.setScrollSize(-1000));
266265
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage());
@@ -414,7 +413,7 @@ public void testDefaultFrequency_GivenNegative() {
414413

415414
public void testDefaultFrequency_GivenNoAggregations() {
416415
DatafeedConfig.Builder datafeedBuilder = new DatafeedConfig.Builder("feed", "job");
417-
datafeedBuilder.setIndices(Arrays.asList("my_index"));
416+
datafeedBuilder.setIndices(Collections.singletonList("my_index"));
418417
DatafeedConfig datafeed = datafeedBuilder.build();
419418

420419
assertEquals(TimeValue.timeValueMinutes(1), datafeed.defaultFrequency(TimeValue.timeValueSeconds(1)));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportStartDatafeedAction.java

Lines changed: 87 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@
4343
import org.elasticsearch.persistent.PersistentTasksExecutor;
4444
import org.elasticsearch.persistent.PersistentTasksService;
4545
import org.elasticsearch.xpack.ml.MachineLearning;
46+
import org.elasticsearch.xpack.ml.datafeed.MlRemoteLicenseChecker;
4647
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
4748
import org.elasticsearch.xpack.ml.datafeed.DatafeedNodeSelector;
4849
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
4950

51+
import java.util.List;
5052
import java.util.Map;
5153
import java.util.function.Predicate;
5254

@@ -111,40 +113,65 @@ protected void masterOperation(StartDatafeedAction.Request request, ClusterState
111113
ActionListener<StartDatafeedAction.Response> listener) {
112114
StartDatafeedAction.DatafeedParams params = request.getParams();
113115
if (licenseState.isMachineLearningAllowed()) {
114-
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>> finalListener =
116+
117+
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>> waitForTaskListener =
115118
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>>() {
116-
@Override
117-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
118-
waitForDatafeedStarted(persistentTask.getId(), params, listener);
119-
}
119+
@Override
120+
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>
121+
persistentTask) {
122+
waitForDatafeedStarted(persistentTask.getId(), params, listener);
123+
}
120124

121-
@Override
122-
public void onFailure(Exception e) {
123-
if (e instanceof ResourceAlreadyExistsException) {
124-
logger.debug("datafeed already started", e);
125-
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
126-
"] because it has already been started", RestStatus.CONFLICT);
127-
}
128-
listener.onFailure(e);
129-
}
130-
};
125+
@Override
126+
public void onFailure(Exception e) {
127+
if (e instanceof ResourceAlreadyExistsException) {
128+
logger.debug("datafeed already started", e);
129+
e = new ElasticsearchStatusException("cannot start datafeed [" + params.getDatafeedId() +
130+
"] because it has already been started", RestStatus.CONFLICT);
131+
}
132+
listener.onFailure(e);
133+
}
134+
};
131135

132136
// Verify data extractor factory can be created, then start persistent task
133137
MlMetadata mlMetadata = MlMetadata.getMlMetadata(state);
134138
PersistentTasksCustomMetaData tasks = state.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
135139
validate(params.getDatafeedId(), mlMetadata, tasks);
136140
DatafeedConfig datafeed = mlMetadata.getDatafeed(params.getDatafeedId());
137141
Job job = mlMetadata.getJobs().get(datafeed.getJobId());
138-
DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap(
139-
dataExtractorFactory ->
140-
persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()),
141-
StartDatafeedAction.TASK_NAME, params, finalListener)
142-
, listener::onFailure));
142+
143+
if (MlRemoteLicenseChecker.containsRemoteIndex(datafeed.getIndices())) {
144+
MlRemoteLicenseChecker remoteLicenseChecker = new MlRemoteLicenseChecker(client);
145+
remoteLicenseChecker.checkRemoteClusterLicenses(MlRemoteLicenseChecker.remoteClusterNames(datafeed.getIndices()),
146+
ActionListener.wrap(
147+
response -> {
148+
if (response.isViolated()) {
149+
listener.onFailure(createUnlicensedError(datafeed.getId(), response));
150+
} else {
151+
createDataExtractor(job, datafeed, params, waitForTaskListener);
152+
}
153+
},
154+
e -> listener.onFailure(createUnknownLicenseError(datafeed.getId(),
155+
MlRemoteLicenseChecker.remoteIndices(datafeed.getIndices()), e))
156+
));
157+
} else {
158+
createDataExtractor(job, datafeed, params, waitForTaskListener);
159+
}
143160
} else {
144161
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
145162
}
146163
}
147164

165+
private void createDataExtractor(Job job, DatafeedConfig datafeed, StartDatafeedAction.DatafeedParams params,
166+
ActionListener<PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>>
167+
listener) {
168+
DataExtractorFactory.create(client, datafeed, job, ActionListener.wrap(
169+
dataExtractorFactory ->
170+
persistentTasksService.sendStartRequest(MLMetadataField.datafeedTaskId(params.getDatafeedId()),
171+
StartDatafeedAction.TASK_NAME, params, listener)
172+
, listener::onFailure));
173+
}
174+
148175
@Override
149176
protected ClusterBlockException checkBlock(StartDatafeedAction.Request request, ClusterState state) {
150177
// We only delegate here to PersistentTasksService, but if there is a metadata writeblock,
@@ -158,28 +185,29 @@ private void waitForDatafeedStarted(String taskId, StartDatafeedAction.DatafeedP
158185
DatafeedPredicate predicate = new DatafeedPredicate();
159186
persistentTasksService.waitForPersistentTaskCondition(taskId, predicate, params.getTimeout(),
160187
new PersistentTasksService.WaitForPersistentTaskListener<StartDatafeedAction.DatafeedParams>() {
161-
@Override
162-
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask) {
163-
if (predicate.exception != null) {
164-
// We want to return to the caller without leaving an unassigned persistent task, to match
165-
// what would have happened if the error had been detected in the "fast fail" validation
166-
cancelDatafeedStart(persistentTask, predicate.exception, listener);
167-
} else {
168-
listener.onResponse(new StartDatafeedAction.Response(true));
169-
}
170-
}
188+
@Override
189+
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams>
190+
persistentTask) {
191+
if (predicate.exception != null) {
192+
// We want to return to the caller without leaving an unassigned persistent task, to match
193+
// what would have happened if the error had been detected in the "fast fail" validation
194+
cancelDatafeedStart(persistentTask, predicate.exception, listener);
195+
} else {
196+
listener.onResponse(new StartDatafeedAction.Response(true));
197+
}
198+
}
171199

172-
@Override
173-
public void onFailure(Exception e) {
174-
listener.onFailure(e);
175-
}
200+
@Override
201+
public void onFailure(Exception e) {
202+
listener.onFailure(e);
203+
}
176204

177-
@Override
178-
public void onTimeout(TimeValue timeout) {
179-
listener.onFailure(new ElasticsearchException("Starting datafeed ["
180-
+ params.getDatafeedId() + "] timed out after [" + timeout + "]"));
181-
}
182-
});
205+
@Override
206+
public void onTimeout(TimeValue timeout) {
207+
listener.onFailure(new ElasticsearchException("Starting datafeed ["
208+
+ params.getDatafeedId() + "] timed out after [" + timeout + "]"));
209+
}
210+
});
183211
}
184212

185213
private void cancelDatafeedStart(PersistentTasksCustomMetaData.PersistentTask<StartDatafeedAction.DatafeedParams> persistentTask,
@@ -203,6 +231,25 @@ public void onFailure(Exception e) {
203231
);
204232
}
205233

234+
private ElasticsearchStatusException createUnlicensedError(String datafeedId,
235+
MlRemoteLicenseChecker.LicenseViolation licenseViolation) {
236+
String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use "
237+
+ "indices on a remote cluster [" + licenseViolation.get().getClusterName()
238+
+ "] that is not licensed for Machine Learning. "
239+
+ MlRemoteLicenseChecker.buildErrorMessage(licenseViolation.get());
240+
241+
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST);
242+
}
243+
244+
private ElasticsearchStatusException createUnknownLicenseError(String datafeedId, List<String> remoteIndices,
245+
Exception cause) {
246+
String message = "Cannot start datafeed [" + datafeedId + "] as it is configured to use"
247+
+ " indices on a remote cluster " + remoteIndices
248+
+ " but the license type could not be verified";
249+
250+
return new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST, new Exception(cause.getMessage()));
251+
}
252+
206253
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<StartDatafeedAction.DatafeedParams> {
207254
private final DatafeedManager datafeedManager;
208255
private final IndexNameExpressionResolver resolver;

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedNodeSelector.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) {
9191
List<String> indices = datafeed.getIndices();
9292
for (String index : indices) {
9393

94-
if (isRemoteIndex(index)) {
94+
if (MlRemoteLicenseChecker.isRemoteIndex(index)) {
9595
// We cannot verify remote indices
9696
continue;
9797
}
@@ -122,10 +122,6 @@ private AssignmentFailure verifyIndicesActive(DatafeedConfig datafeed) {
122122
return null;
123123
}
124124

125-
private boolean isRemoteIndex(String index) {
126-
return index.indexOf(':') != -1;
127-
}
128-
129125
private static class AssignmentFailure {
130126
private final String reason;
131127
private final boolean isCriticalForTaskCreation;

0 commit comments

Comments
 (0)