Skip to content

Commit 043a877

Browse files
authored
[ML] Prevent unnecessary job updates. (elastic/x-pack-elasticsearch#4424)
Original commit: elastic/x-pack-elasticsearch@8e06297
1 parent 14ad96e commit 043a877

File tree

4 files changed

+98
-45
lines changed

4 files changed

+98
-45
lines changed

plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateJobAction.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public static UpdateJobAction.Request parseRequest(String jobId, XContentParser
5454

5555
/** Indicates an update that was not triggered by a user */
5656
private boolean isInternal;
57+
private boolean waitForAck = true;
5758

5859
public Request(String jobId, JobUpdate update) {
5960
this(jobId, update, false);
@@ -87,6 +88,14 @@ public boolean isInternal() {
8788
return isInternal;
8889
}
8990

91+
public boolean isWaitForAck() {
92+
return waitForAck;
93+
}
94+
95+
public void setWaitForAck(boolean waitForAck) {
96+
this.waitForAck = waitForAck;
97+
}
98+
9099
@Override
91100
public ActionRequestValidationException validate() {
92101
return null;
@@ -102,6 +111,11 @@ public void readFrom(StreamInput in) throws IOException {
102111
} else {
103112
isInternal = false;
104113
}
114+
if (in.getVersion().onOrAfter(Version.V_6_3_0)) {
115+
waitForAck = in.readBoolean();
116+
} else {
117+
waitForAck = true;
118+
}
105119
}
106120

107121
@Override
@@ -112,6 +126,9 @@ public void writeTo(StreamOutput out) throws IOException {
112126
if (out.getVersion().onOrAfter(Version.V_6_2_2)) {
113127
out.writeBoolean(isInternal);
114128
}
129+
if (out.getVersion().onOrAfter(Version.V_6_3_0)) {
130+
out.writeBoolean(waitForAck);
131+
}
115132
}
116133

117134
@Override

plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/UpdateJobActionRequestTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ protected UpdateJobAction.Request createTestInstance() {
1818
// no need to randomize JobUpdate this is already tested in: JobUpdateTests
1919
JobUpdate.Builder jobUpdate = new JobUpdate.Builder(jobId);
2020
jobUpdate.setAnalysisLimits(new AnalysisLimits(100L, 100L));
21-
return new UpdateJobAction.Request(jobId, jobUpdate.build());
21+
UpdateJobAction.Request request = new UpdateJobAction.Request(jobId, jobUpdate.build());
22+
request.setWaitForAck(randomBoolean());
23+
return request;
2224
}
2325

2426
@Override

plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/JobManager.java

Lines changed: 76 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@
1111
import org.elasticsearch.action.support.WriteRequest;
1212
import org.elasticsearch.client.Client;
1313
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
14+
import org.elasticsearch.cluster.ClusterChangedEvent;
1415
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
1517
import org.elasticsearch.cluster.metadata.MetaData;
1618
import org.elasticsearch.cluster.service.ClusterService;
1719
import org.elasticsearch.common.CheckedConsumer;
@@ -303,57 +305,88 @@ private void validateAnalysisLimitsUpdate(Job job, AnalysisLimits newLimits, Cha
303305
}
304306

305307
private void internalJobUpdate(UpdateJobAction.Request request, ActionListener<PutJobAction.Response> actionListener) {
306-
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
307-
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
308-
private volatile Job updatedJob;
309-
private volatile boolean processUpdateRequired;
310308

311-
@Override
312-
protected PutJobAction.Response newResponse(boolean acknowledged) {
313-
return new PutJobAction.Response(updatedJob);
314-
}
309+
Job job = getJobOrThrowIfUnknown(request.getJobId());
310+
final Job updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit);
311+
if (updatedJob.equals(job)) {
312+
// No change will results in a clusterstate update no-op so don't
313+
// submit the request.
314+
actionListener.onResponse(new PutJobAction.Response(updatedJob));
315+
return;
316+
}
315317

316-
@Override
317-
public ClusterState execute(ClusterState currentState) {
318-
Job job = getJobOrThrowIfUnknown(request.getJobId(), currentState);
319-
updatedJob = request.getJobUpdate().mergeWithJob(job, maxModelMemoryLimit);
320-
if (updatedJob.equals(job)) {
321-
// nothing to do
322-
return currentState;
318+
if (request.isWaitForAck()) {
319+
// Use the ack cluster state update
320+
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(),
321+
new AckedClusterStateUpdateTask<PutJobAction.Response>(request, actionListener) {
322+
323+
@Override
324+
protected PutJobAction.Response newResponse(boolean acknowledged) {
325+
return new PutJobAction.Response(updatedJob);
323326
}
324-
// No change is required if the fields that the C++ uses aren't being updated
325-
processUpdateRequired = request.getJobUpdate().isAutodetectProcessUpdate();
326-
return updateClusterState(updatedJob, true, currentState);
327-
}
328327

329-
@Override
330-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
331-
JobUpdate jobUpdate = request.getJobUpdate();
332-
if (processUpdateRequired && isJobOpen(newState, request.getJobId())) {
333-
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(
334-
isUpdated -> {
335-
if (isUpdated) {
336-
auditJobUpdatedIfNotInternal(request);
337-
}
338-
}, e -> {
339-
// No need to do anything
340-
}
341-
));
342-
} else {
343-
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
344-
try {
345-
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
346-
jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
347-
return Strings.toString(jsonBuilder);
348-
} catch (IOException e) {
349-
return "(unprintable due to " + e.getMessage() + ")";
350-
}
351-
});
328+
@Override
329+
public ClusterState execute(ClusterState currentState) {
330+
return updateClusterState(updatedJob, true, currentState);
331+
}
332+
333+
@Override
334+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
335+
afterClusterStateUpdate(newState, request);
336+
}
337+
});
338+
} else {
339+
clusterService.submitStateUpdateTask("update-job-" + request.getJobId(), new ClusterStateUpdateTask() {
352340

341+
@Override
342+
public ClusterState execute(ClusterState currentState) throws Exception {
343+
return updateClusterState(updatedJob, true, currentState);
344+
}
345+
346+
@Override
347+
public void onFailure(String source, Exception e) {
348+
actionListener.onFailure(e);
349+
}
350+
351+
@Override
352+
public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {
353+
afterClusterStateUpdate(clusterChangedEvent.state(), request);
354+
actionListener.onResponse(new PutJobAction.Response(updatedJob));
355+
356+
}
357+
});
358+
}
359+
}
360+
361+
private void afterClusterStateUpdate(ClusterState newState, UpdateJobAction.Request request) {
362+
JobUpdate jobUpdate = request.getJobUpdate();
363+
364+
// Change is required if the fields that the C++ uses are being updated
365+
boolean processUpdateRequired = jobUpdate.isAutodetectProcessUpdate();
366+
367+
if (processUpdateRequired && isJobOpen(newState, request.getJobId())) {
368+
updateJobProcessNotifier.submitJobUpdate(UpdateParams.fromJobUpdate(jobUpdate), ActionListener.wrap(
369+
isUpdated -> {
370+
if (isUpdated) {
353371
auditJobUpdatedIfNotInternal(request);
354372
}
373+
}, e -> {
374+
// No need to do anything
355375
}
356-
});
376+
));
377+
} else {
378+
logger.debug("[{}] No process update required for job update: {}", () -> request.getJobId(), () -> {
379+
try {
380+
XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
381+
jobUpdate.toXContent(jsonBuilder, ToXContent.EMPTY_PARAMS);
382+
return Strings.toString(jsonBuilder);
383+
} catch (IOException e) {
384+
return "(unprintable due to " + e.getMessage() + ")";
385+
}
386+
});
387+
388+
auditJobUpdatedIfNotInternal(request);
389+
}
357390
}
358391

359392
private void auditJobUpdatedIfNotInternal(UpdateJobAction.Request request) {

plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/output/AutoDetectResultProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,12 @@ public void onFailure(Exception e) {
351351
});
352352
}
353353

354-
protected void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
354+
private void updateEstablishedModelMemoryOnJob(Date latestBucketTimestamp, ModelSizeStats modelSizeStats) {
355355
jobProvider.getEstablishedMemoryUsage(jobId, latestBucketTimestamp, modelSizeStats, establishedModelMemory -> {
356356
JobUpdate update = new JobUpdate.Builder(jobId)
357357
.setEstablishedModelMemory(establishedModelMemory).build();
358358
UpdateJobAction.Request updateRequest = UpdateJobAction.Request.internal(jobId, update);
359+
updateRequest.setWaitForAck(false);
359360

360361
executeAsyncWithOrigin(client, ML_ORIGIN, UpdateJobAction.INSTANCE, updateRequest, new ActionListener<PutJobAction.Response>() {
361362
@Override

0 commit comments

Comments
 (0)