Skip to content

Commit bfcb11b

Browse files
authored
6.x Backport of #38278: Move ML Optimistic Concurrency Control to Seq No
This commit moves the usage of internal versioning for CAS operations to use sequence numbers and primary terms Relates to #36148 Relates to #10708
1 parent 48f3c11 commit bfcb11b

File tree

9 files changed

+90
-47
lines changed

9 files changed

+90
-47
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -629,15 +629,14 @@ public void onFailure(String source, Exception e) {
629629
}
630630

631631
@Override
632-
public void clusterStateProcessed(String source, ClusterState oldState,
633-
ClusterState newState) {
632+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
634633
listener.onResponse(new AcknowledgedResponse(true));
635634
}
636635
});
637636
} else {
638637
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();
639638

640-
jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
639+
jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
641640
job -> listener.onResponse(new AcknowledgedResponse(true)),
642641
e -> {
643642
logger.error("[" + jobId + "] Failed to clear finished_time", e);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateDatafeedAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void updateDatafeedInIndex(UpdateDatafeedAction.Request request, Cluster
103103

104104
CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
105105
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
106-
jobConfigProvider::validateDatafeedJob,
106+
jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(),
107107
ActionListener.wrap(
108108
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
109109
listener::onFailure

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportUpdateFilterAction.java

+22-7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package org.elasticsearch.xpack.ml.action;
77

88
import org.elasticsearch.ResourceNotFoundException;
9+
import org.elasticsearch.Version;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.get.GetAction;
1112
import org.elasticsearch.action.get.GetRequest;
@@ -18,6 +19,7 @@
1819
import org.elasticsearch.action.support.WriteRequest;
1920
import org.elasticsearch.client.Client;
2021
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.elasticsearch.cluster.service.ClusterService;
2123
import org.elasticsearch.common.bytes.BytesReference;
2224
import org.elasticsearch.common.inject.Inject;
2325
import org.elasticsearch.common.settings.Settings;
@@ -53,15 +55,17 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi
5355

5456
private final Client client;
5557
private final JobManager jobManager;
58+
private final ClusterService clusterService;
5659

5760
@Inject
5861
public TransportUpdateFilterAction(Settings settings, ThreadPool threadPool, TransportService transportService,
5962
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, Client client,
60-
JobManager jobManager) {
63+
JobManager jobManager, ClusterService clusterService) {
6164
super(settings, UpdateFilterAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
6265
UpdateFilterAction.Request::new);
6366
this.client = client;
6467
this.jobManager = jobManager;
68+
this.clusterService = clusterService;
6569
}
6670

6771
@Override
@@ -97,13 +101,20 @@ private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterActio
97101
}
98102

99103
MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build();
100-
indexUpdatedFilter(updatedFilter, filterWithVersion.version, request, listener);
104+
indexUpdatedFilter(
105+
updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener);
101106
}
102107

103-
private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterAction.Request request,
108+
private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm,
109+
UpdateFilterAction.Request request,
104110
ActionListener<PutFilterAction.Response> listener) {
105111
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
106-
indexRequest.version(version);
112+
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
113+
indexRequest.setIfSeqNo(seqNo);
114+
indexRequest.setIfPrimaryTerm(primaryTerm);
115+
} else {
116+
indexRequest.version(version);
117+
}
107118
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
108119

109120
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
@@ -148,7 +159,7 @@ public void onResponse(GetResponse getDocResponse) {
148159
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
149160
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
150161
MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build();
151-
listener.onResponse(new FilterWithVersion(filter, getDocResponse.getVersion()));
162+
listener.onResponse(new FilterWithVersion(filter, getDocResponse));
152163
}
153164
} else {
154165
this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId)));
@@ -169,10 +180,14 @@ private static class FilterWithVersion {
169180

170181
private final MlFilter filter;
171182
private final long version;
183+
private final long seqNo;
184+
private final long primaryTerm;
172185

173-
private FilterWithVersion(MlFilter filter, long version) {
186+
private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) {
174187
this.filter = filter;
175-
this.version = version;
188+
this.version = getDocResponse.getVersion();
189+
this.seqNo = getDocResponse.getSeqNo();
190+
this.primaryTerm = getDocResponse.getPrimaryTerm();
176191
}
177192
}
178193
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/persistence/DatafeedConfigProvider.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.logging.log4j.LogManager;
99
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.ElasticsearchParseException;
11+
import org.elasticsearch.Version;
1112
import org.elasticsearch.action.ActionListener;
1213
import org.elasticsearch.action.DocWriteRequest;
1314
import org.elasticsearch.action.DocWriteResponse;
@@ -19,6 +20,7 @@
1920
import org.elasticsearch.action.get.GetResponse;
2021
import org.elasticsearch.action.index.IndexAction;
2122
import org.elasticsearch.action.index.IndexRequest;
23+
import org.elasticsearch.action.index.IndexRequestBuilder;
2224
import org.elasticsearch.action.index.IndexResponse;
2325
import org.elasticsearch.action.search.SearchRequest;
2426
import org.elasticsearch.action.search.SearchResponse;
@@ -268,10 +270,12 @@ public void onFailure(Exception e) {
268270
* @param headers Datafeed headers applied with the update
269271
* @param validator BiConsumer that accepts the updated config and can perform
270272
* extra validations. {@code validator} must call the passed listener
273+
* @param minClusterNodeVersion minimum version of nodes in cluster
271274
* @param updatedConfigListener Updated datafeed config listener
272275
*/
273276
public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers,
274277
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validator,
278+
Version minClusterNodeVersion,
275279
ActionListener<DatafeedConfig> updatedConfigListener) {
276280
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
277281
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId));
@@ -283,7 +287,9 @@ public void onResponse(GetResponse getResponse) {
283287
updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId));
284288
return;
285289
}
286-
long version = getResponse.getVersion();
290+
final long version = getResponse.getVersion();
291+
final long seqNo = getResponse.getSeqNo();
292+
final long primaryTerm = getResponse.getPrimaryTerm();
287293
BytesReference source = getResponse.getSourceAsBytesRef();
288294
DatafeedConfig.Builder configBuilder;
289295
try {
@@ -304,7 +310,7 @@ public void onResponse(GetResponse getResponse) {
304310

305311
ActionListener<Boolean> validatedListener = ActionListener.wrap(
306312
ok -> {
307-
indexUpdatedConfig(updatedConfig, version, ActionListener.wrap(
313+
indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap(
308314
indexResponse -> {
309315
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
310316
updatedConfigListener.onResponse(updatedConfig);
@@ -328,17 +334,23 @@ public void onFailure(Exception e) {
328334
});
329335
}
330336

331-
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener<IndexResponse> listener) {
337+
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm,
338+
Version minClusterNodeVersion, ActionListener<IndexResponse> listener) {
332339
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
333340
XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
334-
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
341+
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
335342
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId()))
336343
.setSource(updatedSource)
337-
.setVersion(version)
338-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
339-
.request();
344+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
345+
346+
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
347+
indexRequest.setIfSeqNo(seqNo);
348+
indexRequest.setIfPrimaryTerm(primaryTerm);
349+
} else {
350+
indexRequest.setVersion(version);
351+
}
340352

341-
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
353+
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener);
342354

343355
} catch (IOException e) {
344356
listener.onFailure(

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,7 @@ private void postJobUpdate(ClusterState clusterState, UpdateJobAction.Request re
522522

523523
private void updateJobIndex(UpdateJobAction.Request request, ActionListener<Job> updatedJobListener) {
524524
jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
525-
this::validate, updatedJobListener);
525+
this::validate, clusterService.state().nodes().getMinNodeVersion(), updatedJobListener);
526526
}
527527

528528
private void updateJobClusterState(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
@@ -846,7 +846,8 @@ public ClusterState execute(ClusterState currentState) {
846846
.setEstablishedModelMemory(response)
847847
.build();
848848

849-
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap(
849+
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit,
850+
clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
850851
job -> {
851852
auditor.info(request.getJobId(),
852853
Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobConfigProvider.java

+28-13
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.lucene.search.join.ScoreMode;
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.ElasticsearchParseException;
13+
import org.elasticsearch.Version;
1314
import org.elasticsearch.action.ActionListener;
1415
import org.elasticsearch.action.DocWriteRequest;
1516
import org.elasticsearch.action.DocWriteResponse;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.action.get.MultiGetResponse;
2526
import org.elasticsearch.action.index.IndexAction;
2627
import org.elasticsearch.action.index.IndexRequest;
28+
import org.elasticsearch.action.index.IndexRequestBuilder;
2729
import org.elasticsearch.action.index.IndexResponse;
2830
import org.elasticsearch.action.search.SearchRequest;
2931
import org.elasticsearch.action.search.SearchResponse;
@@ -282,9 +284,12 @@ public void onFailure(Exception e) {
282284
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
283285
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
284286
* are not changed.
287+
* @param minClusterNodeVersion the minimum version of nodes in the cluster
285288
* @param updatedJobListener Updated job listener
286289
*/
287-
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener<Job> updatedJobListener) {
290+
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
291+
Version minClusterNodeVersion,
292+
ActionListener<Job> updatedJobListener) {
288293
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
289294
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
290295

@@ -296,7 +301,9 @@ public void onResponse(GetResponse getResponse) {
296301
return;
297302
}
298303

299-
long version = getResponse.getVersion();
304+
final long version = getResponse.getVersion();
305+
final long seqNo = getResponse.getSeqNo();
306+
final long primaryTerm = getResponse.getPrimaryTerm();
300307
BytesReference source = getResponse.getSourceAsBytesRef();
301308
Job.Builder jobBuilder;
302309
try {
@@ -316,7 +323,7 @@ public void onResponse(GetResponse getResponse) {
316323
return;
317324
}
318325

319-
indexUpdatedJob(updatedJob, version, updatedJobListener);
326+
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
320327
}
321328

322329
@Override
@@ -341,17 +348,18 @@ public interface UpdateValidator {
341348
}
342349

343350
/**
344-
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but
351+
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but
345352
* with an extra validation step which is called before the updated is applied.
346353
*
347354
* @param jobId The Id of the job to update
348355
* @param update The job update
349356
* @param maxModelMemoryLimit The maximum model memory allowed
350357
* @param validator The job update validator
358+
* @param minClusterNodeVersion the minimum version of a node ifn the cluster
351359
* @param updatedJobListener Updated job listener
352360
*/
353361
public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
354-
UpdateValidator validator, ActionListener<Job> updatedJobListener) {
362+
UpdateValidator validator, Version minClusterNodeVersion, ActionListener<Job> updatedJobListener) {
355363
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
356364
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));
357365

@@ -363,7 +371,9 @@ public void onResponse(GetResponse getResponse) {
363371
return;
364372
}
365373

366-
long version = getResponse.getVersion();
374+
final long version = getResponse.getVersion();
375+
final long seqNo = getResponse.getSeqNo();
376+
final long primaryTerm = getResponse.getPrimaryTerm();
367377
BytesReference source = getResponse.getSourceAsBytesRef();
368378
Job originalJob;
369379
try {
@@ -385,7 +395,7 @@ public void onResponse(GetResponse getResponse) {
385395
return;
386396
}
387397

388-
indexUpdatedJob(updatedJob, version, updatedJobListener);
398+
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
389399
},
390400
updatedJobListener::onFailure
391401
));
@@ -402,17 +412,22 @@ public void onFailure(Exception e) {
402412
});
403413
}
404414

405-
private void indexUpdatedJob(Job updatedJob, long version, ActionListener<Job> updatedJobListener) {
415+
private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion,
416+
ActionListener<Job> updatedJobListener) {
406417
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
407418
XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS);
408-
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
419+
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
409420
ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId()))
410421
.setSource(updatedSource)
411-
.setVersion(version)
412-
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
413-
.request();
422+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
423+
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
424+
indexRequest.setIfSeqNo(seqNo);
425+
indexRequest.setIfPrimaryTerm(primaryTerm);
426+
} else {
427+
indexRequest.setVersion(version);
428+
}
414429

415-
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
430+
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap(
416431
indexResponse -> {
417432
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
418433
updatedJobListener.onResponse(updatedJob);

0 commit comments

Comments
 (0)