Skip to content

Move ML Optimistic Concurrency Control to Seq No #38278

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void onTimeout(TimeValue timeout) {
private void clearJobFinishedTime(String jobId, ActionListener<AcknowledgedResponse> listener) {
JobUpdate update = new JobUpdate.Builder(jobId).setClearFinishTime(true).build();

jobConfigProvider.updateJob(jobId, update, null, ActionListener.wrap(
jobConfigProvider.updateJob(jobId, update, null, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
job -> listener.onResponse(new AcknowledgedResponse(true)),
e -> {
logger.error("[" + jobId + "] Failed to clear finished_time", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat

CheckedConsumer<Boolean, Exception> updateConsumer = ok -> {
datafeedConfigProvider.updateDatefeedConfig(request.getUpdate().getId(), request.getUpdate(), headers,
jobConfigProvider::validateDatafeedJob,
jobConfigProvider::validateDatafeedJob, clusterService.state().nodes().getMinNodeVersion(),
ActionListener.wrap(
updatedConfig -> listener.onResponse(new PutDatafeedAction.Response(updatedConfig)),
listener::onFailure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.action;

import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetAction;
import org.elasticsearch.action.get.GetRequest;
Expand All @@ -17,6 +18,7 @@
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
Expand Down Expand Up @@ -52,14 +54,16 @@ public class TransportUpdateFilterAction extends HandledTransportAction<UpdateFi

private final Client client;
private final JobManager jobManager;
private final ClusterService clusterService;

@Inject
public TransportUpdateFilterAction(TransportService transportService, ActionFilters actionFilters, Client client,
JobManager jobManager) {
JobManager jobManager, ClusterService clusterService) {
super(UpdateFilterAction.NAME, transportService, actionFilters,
(Supplier<UpdateFilterAction.Request>) UpdateFilterAction.Request::new);
this.client = client;
this.jobManager = jobManager;
this.clusterService = clusterService;
}

@Override
Expand Down Expand Up @@ -95,13 +99,20 @@ private void updateFilter(FilterWithVersion filterWithVersion, UpdateFilterActio
}

MlFilter updatedFilter = MlFilter.builder(filter.getId()).setDescription(description).setItems(items).build();
indexUpdatedFilter(updatedFilter, filterWithVersion.version, request, listener);
indexUpdatedFilter(
updatedFilter, filterWithVersion.version, filterWithVersion.seqNo, filterWithVersion.primaryTerm, request, listener);
}

private void indexUpdatedFilter(MlFilter filter, long version, UpdateFilterAction.Request request,
private void indexUpdatedFilter(MlFilter filter, final long version, final long seqNo, final long primaryTerm,
UpdateFilterAction.Request request,
ActionListener<PutFilterAction.Response> listener) {
IndexRequest indexRequest = new IndexRequest(MlMetaIndex.INDEX_NAME, MlMetaIndex.TYPE, filter.documentId());
indexRequest.version(version);
if (clusterService.state().nodes().getMinNodeVersion().onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.version(version);
}
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
Expand Down Expand Up @@ -146,7 +157,7 @@ public void onResponse(GetResponse getDocResponse) {
XContentParser parser = XContentFactory.xContent(XContentType.JSON)
.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, stream)) {
MlFilter filter = MlFilter.LENIENT_PARSER.apply(parser, null).build();
listener.onResponse(new FilterWithVersion(filter, getDocResponse.getVersion()));
listener.onResponse(new FilterWithVersion(filter, getDocResponse));
}
} else {
this.onFailure(new ResourceNotFoundException(Messages.getMessage(Messages.FILTER_NOT_FOUND, filterId)));
Expand All @@ -167,10 +178,15 @@ private static class FilterWithVersion {

private final MlFilter filter;
private final long version;
private final long seqNo;
private final long primaryTerm;

private FilterWithVersion(MlFilter filter, long version) {
private FilterWithVersion(MlFilter filter, GetResponse getDocResponse) {
this.filter = filter;
this.version = version;
this.version = getDocResponse.getVersion();
this.seqNo = getDocResponse.getSeqNo();
this.primaryTerm = getDocResponse.getPrimaryTerm();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -19,6 +20,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -262,10 +264,12 @@ public void onFailure(Exception e) {
* @param headers Datafeed headers applied with the update
* @param validator BiConsumer that accepts the updated config and can perform
* extra validations. {@code validator} must call the passed listener
* @param minClusterNodeVersion minimum version of nodes in cluster
* @param updatedConfigListener Updated datafeed config listener
*/
public void updateDatefeedConfig(String datafeedId, DatafeedUpdate update, Map<String, String> headers,
BiConsumer<DatafeedConfig, ActionListener<Boolean>> validator,
Version minClusterNodeVersion,
ActionListener<DatafeedConfig> updatedConfigListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(datafeedId));
Expand All @@ -277,7 +281,9 @@ public void onResponse(GetResponse getResponse) {
updatedConfigListener.onFailure(ExceptionsHelper.missingDatafeedException(datafeedId));
return;
}
long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
DatafeedConfig.Builder configBuilder;
try {
Expand All @@ -298,7 +304,7 @@ public void onResponse(GetResponse getResponse) {

ActionListener<Boolean> validatedListener = ActionListener.wrap(
ok -> {
indexUpdatedConfig(updatedConfig, version, ActionListener.wrap(
indexUpdatedConfig(updatedConfig, version, seqNo, primaryTerm, minClusterNodeVersion, ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedConfigListener.onResponse(updatedConfig);
Expand All @@ -318,17 +324,23 @@ public void onFailure(Exception e) {
});
}

private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, ActionListener<IndexResponse> listener) {
private void indexUpdatedConfig(DatafeedConfig updatedConfig, long version, long seqNo, long primaryTerm,
Version minClusterNodeVersion, ActionListener<IndexResponse> listener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedConfig.toXContent(builder, new ToXContent.MapParams(TO_XCONTENT_PARAMS));
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, DatafeedConfig.documentId(updatedConfig.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);

if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}

executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, listener);
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), listener);

} catch (IOException e) {
listener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ public void updateJob(UpdateJobAction.Request request, ActionListener<PutJobActi

Runnable doUpdate = () -> {
jobConfigProvider.updateJobWithValidation(request.getJobId(), request.getJobUpdate(), maxModelMemoryLimit,
this::validate, ActionListener.wrap(
this::validate, clusterService.state().nodes().getMinNodeVersion(), ActionListener.wrap(
updatedJob -> postJobUpdate(request, updatedJob, actionListener),
actionListener::onFailure
));
Expand Down Expand Up @@ -603,8 +603,8 @@ public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionList
.setModelSnapshotId(modelSnapshot.getSnapshotId())
.build();

jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, ActionListener.wrap(
job -> {
jobConfigProvider.updateJob(request.getJobId(), update, maxModelMemoryLimit, clusterService.state().nodes().getMinNodeVersion(),
ActionListener.wrap(job -> {
auditor.info(request.getJobId(),
Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
updateHandler.accept(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.search.join.ScoreMode;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand Down Expand Up @@ -225,9 +227,12 @@ public void onFailure(Exception e) {
* @param maxModelMemoryLimit The maximum model memory allowed. This can be {@code null}
* if the job's {@link org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits}
* are not changed.
* @param minClusterNodeVersion the minimum version of nodes in the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit, ActionListener<Job> updatedJobListener) {
public void updateJob(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));

Expand All @@ -239,7 +244,9 @@ public void onResponse(GetResponse getResponse) {
return;
}

long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
Job.Builder jobBuilder;
try {
Expand All @@ -259,7 +266,7 @@ public void onResponse(GetResponse getResponse) {
return;
}

indexUpdatedJob(updatedJob, version, updatedJobListener);
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
}

@Override
Expand All @@ -280,17 +287,18 @@ public interface UpdateValidator {
}

/**
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, ActionListener)} but
* Similar to {@link #updateJob(String, JobUpdate, ByteSizeValue, Version, ActionListener)} but
* with an extra validation step which is called before the updated is applied.
*
* @param jobId The Id of the job to update
* @param update The job update
* @param maxModelMemoryLimit The maximum model memory allowed
* @param validator The job update validator
* @param minClusterNodeVersion the minimum version of a node ifn the cluster
* @param updatedJobListener Updated job listener
*/
public void updateJobWithValidation(String jobId, JobUpdate update, ByteSizeValue maxModelMemoryLimit,
UpdateValidator validator, ActionListener<Job> updatedJobListener) {
UpdateValidator validator, Version minClusterNodeVersion, ActionListener<Job> updatedJobListener) {
GetRequest getRequest = new GetRequest(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(jobId));

Expand All @@ -302,7 +310,9 @@ public void onResponse(GetResponse getResponse) {
return;
}

long version = getResponse.getVersion();
final long version = getResponse.getVersion();
final long seqNo = getResponse.getSeqNo();
final long primaryTerm = getResponse.getPrimaryTerm();
BytesReference source = getResponse.getSourceAsBytesRef();
Job originalJob;
try {
Expand All @@ -324,7 +334,7 @@ public void onResponse(GetResponse getResponse) {
return;
}

indexUpdatedJob(updatedJob, version, updatedJobListener);
indexUpdatedJob(updatedJob, version, seqNo, primaryTerm, minClusterNodeVersion, updatedJobListener);
},
updatedJobListener::onFailure
));
Expand All @@ -337,17 +347,22 @@ public void onFailure(Exception e) {
});
}

private void indexUpdatedJob(Job updatedJob, long version, ActionListener<Job> updatedJobListener) {
private void indexUpdatedJob(Job updatedJob, long version, long seqNo, long primaryTerm, Version minClusterNodeVersion,
ActionListener<Job> updatedJobListener) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
XContentBuilder updatedSource = updatedJob.toXContent(builder, ToXContent.EMPTY_PARAMS);
IndexRequest indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
IndexRequestBuilder indexRequest = client.prepareIndex(AnomalyDetectorsIndex.configIndexName(),
ElasticsearchMappings.DOC_TYPE, Job.documentId(updatedJob.getId()))
.setSource(updatedSource)
.setVersion(version)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.request();
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
if (minClusterNodeVersion.onOrAfter(Version.V_6_7_0)) {
indexRequest.setIfSeqNo(seqNo);
indexRequest.setIfPrimaryTerm(primaryTerm);
} else {
indexRequest.setVersion(version);
}

executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest, ActionListener.wrap(
executeAsyncWithOrigin(client, ML_ORIGIN, IndexAction.INSTANCE, indexRequest.request(), ActionListener.wrap(
indexResponse -> {
assert indexResponse.getResult() == DocWriteResponse.Result.UPDATED;
updatedJobListener.onResponse(updatedJob);
Expand Down
Loading