Skip to content

Commit a08e2d9

Browse files
imotovmartijnvg
authored andcommitted
Persistent tasks: require allocation id on task completion (elastic#1107)
Persistent tasks should verify that completion notification is done for correct version of the task, otherwise a delayed notification from an old node can accidentally close a newly reassigned task.
1 parent 76cd7b1 commit a08e2d9

8 files changed

+70
-25
lines changed

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public void markAsFailed(Exception e) {
132132
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
133133
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
134134
if (prevState == State.COMPLETED) {
135-
logger.warn("attempt to complete task {} in the {} state", getPersistentTaskId(), prevState);
135+
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
136136
} else {
137137
if (failure != null) {
138138
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
@@ -141,18 +141,20 @@ private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
141141
try {
142142
this.failure = failure;
143143
if (prevState == State.STARTED) {
144-
logger.trace("sending notification for completed task {}", getPersistentTaskId());
145-
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), failure, new
144+
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
145+
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
146146
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
147147
@Override
148148
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
149-
logger.trace("notification for task {} was successful", getId());
149+
logger.trace("notification for task [{}] with id [{}] was successful", getAction(),
150+
getPersistentTaskId());
150151
}
151152

152153
@Override
153154
public void onFailure(Exception e) {
154155
logger.warn((Supplier<?>) () ->
155-
new ParameterizedMessage("notification for task {} failed", getPersistentTaskId()), e);
156+
new ParameterizedMessage("notification for task [{}] with id [{}] failed",
157+
getAction(), getPersistentTaskId()), e);
156158
}
157159
});
158160
}

server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -75,26 +75,31 @@ public static class Request extends MasterNodeRequest<Request> {
7575

7676
private Exception exception;
7777

78+
private long allocationId = -1;
79+
7880
public Request() {
7981

8082
}
8183

82-
public Request(String taskId, Exception exception) {
84+
public Request(String taskId, long allocationId, Exception exception) {
8385
this.taskId = taskId;
8486
this.exception = exception;
87+
this.allocationId = allocationId;
8588
}
8689

8790
@Override
8891
public void readFrom(StreamInput in) throws IOException {
8992
super.readFrom(in);
9093
taskId = in.readString();
94+
allocationId = in.readLong();
9195
exception = in.readException();
9296
}
9397

9498
@Override
9599
public void writeTo(StreamOutput out) throws IOException {
96100
super.writeTo(out);
97101
out.writeString(taskId);
102+
out.writeLong(allocationId);
98103
out.writeException(exception);
99104
}
100105

@@ -104,6 +109,9 @@ public ActionRequestValidationException validate() {
104109
if (taskId == null) {
105110
validationException = addValidationError("task id is missing", validationException);
106111
}
112+
if (allocationId < 0) {
113+
validationException = addValidationError("allocation id is negative or missing", validationException);
114+
}
107115
return validationException;
108116
}
109117

@@ -113,12 +121,13 @@ public boolean equals(Object o) {
113121
if (o == null || getClass() != o.getClass()) return false;
114122
Request request = (Request) o;
115123
return Objects.equals(taskId, request.taskId) &&
124+
allocationId == request.allocationId &&
116125
Objects.equals(exception, request.exception);
117126
}
118127

119128
@Override
120129
public int hashCode() {
121-
return Objects.hash(taskId, exception);
130+
return Objects.hash(taskId, allocationId, exception);
122131
}
123132
}
124133

@@ -163,7 +172,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
163172
@Override
164173
protected final void masterOperation(final Request request, ClusterState state,
165174
final ActionListener<PersistentTaskResponse> listener) {
166-
persistentTasksClusterService.completePersistentTask(request.taskId, request.exception,
175+
persistentTasksClusterService.completePersistentTask(request.taskId, request.allocationId, request.exception,
167176
new ActionListener<PersistentTask<?>>() {
168177
@Override
169178
public void onResponse(PersistentTask<?> task) {

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

+14-9
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
5959
* Creates a new persistent task on master node
6060
*
6161
* @param action the action name
62-
* @param params params
62+
* @param params params
6363
* @param listener the listener that will be called when task is started
6464
*/
6565
public <Params extends PersistentTaskParams> void createPersistentTask(String taskId, String action, @Nullable Params params,
@@ -99,11 +99,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
9999
/**
100100
* Restarts a record about a running persistent task from cluster state
101101
*
102-
* @param id the id of a persistent task
103-
* @param failure the reason for restarting the task or null if the task completed successfully
104-
* @param listener the listener that will be called when task is removed
102+
* @param id the id of the persistent task
103+
* @param allocationId the allocation id of the persistent task
104+
* @param failure the reason for restarting the task or null if the task completed successfully
105+
* @param listener the listener that will be called when task is removed
105106
*/
106-
public void completePersistentTask(String id, Exception failure, ActionListener<PersistentTask<?>> listener) {
107+
public void completePersistentTask(String id, long allocationId, Exception failure, ActionListener<PersistentTask<?>> listener) {
107108
final String source;
108109
if (failure != null) {
109110
logger.warn("persistent task " + id + " failed", failure);
@@ -115,13 +116,17 @@ public void completePersistentTask(String id, Exception failure, ActionListener<
115116
@Override
116117
public ClusterState execute(ClusterState currentState) throws Exception {
117118
PersistentTasksCustomMetaData.Builder tasksInProgress = builder(currentState);
118-
if (tasksInProgress.hasTask(id)) {
119+
if (tasksInProgress.hasTask(id, allocationId)) {
119120
tasksInProgress.finishTask(id);
120121
return update(currentState, tasksInProgress);
121122
} else {
122-
// we don't send the error message back to the caller becase that would cause an infinite loop of notifications
123-
logger.warn("The task {} wasn't found, status is not updated", id);
124-
return currentState;
123+
if (tasksInProgress.hasTask(id)) {
124+
logger.warn("The task [{}] with id [{}] was found but it has a different allocation id [{}], status is not updated",
125+
PersistentTasksCustomMetaData.getTaskWithId(currentState, id).getTaskName(), id, allocationId);
126+
} else {
127+
logger.warn("The task [{}] wasn't found, status is not updated", id);
128+
}
129+
throw new ResourceNotFoundException("the task with id [" + id + "] and allocation id [" + allocationId + "] not found");
125130
}
126131
}
127132

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.io.stream.StreamOutput;
3131
import org.elasticsearch.common.settings.Settings;
3232
import org.elasticsearch.common.xcontent.XContentBuilder;
33+
import org.elasticsearch.gateway.GatewayService;
3334
import org.elasticsearch.tasks.Task;
3435
import org.elasticsearch.tasks.TaskAwareRequest;
3536
import org.elasticsearch.tasks.TaskId;
@@ -70,6 +71,11 @@ public PersistentTasksNodeService(Settings settings,
7071

7172
@Override
7273
public void clusterChanged(ClusterChangedEvent event) {
74+
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
75+
// wait until the gateway has recovered from disk, otherwise if the only master restarts
76+
// we start cancelling all local tasks before cluster has a chance to recover.
77+
return;
78+
}
7379
PersistentTasksCustomMetaData tasks = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
7480
PersistentTasksCustomMetaData previousTasks = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
7581

@@ -120,11 +126,14 @@ public void clusterChanged(ClusterChangedEvent event) {
120126
AllocatedPersistentTask task = runningTasks.get(id);
121127
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
122128
// Result was sent to the caller and the caller acknowledged acceptance of the result
129+
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
130+
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
123131
runningTasks.remove(id);
124132
} else {
125133
// task is running locally, but master doesn't know about it - that means that the persistent task was removed
126134
// cancel the task without notifying master
127-
logger.trace("Found unregistered persistent task with id {} - cancelling ", id);
135+
logger.trace("Found unregistered persistent task [{}] with id [{}] and allocation id [{}] - cancelling",
136+
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());
128137
cancelTask(id);
129138
}
130139
}
@@ -160,6 +169,8 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId)
160169
boolean processed = false;
161170
try {
162171
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
172+
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
173+
task.getPersistentTaskId(), task.getAllocationId());
163174
try {
164175
runningTasks.put(taskInProgress.getAllocationId(), task);
165176
nodePersistentTasksExecutor.executeTask(taskInProgress.getParams(), task, executor);
@@ -171,6 +182,8 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId)
171182
} finally {
172183
if (processed == false) {
173184
// something went wrong - unregistering task
185+
logger.warn("Persistent task [{}] with id [{}] and allocation id [{}] failed to create", task.getAction(),
186+
task.getPersistentTaskId(), task.getAllocationId());
174187
taskManager.unregister(task);
175188
}
176189
}
@@ -187,14 +200,16 @@ private void cancelTask(Long allocationId) {
187200
persistentTasksService.sendTaskManagerCancellation(task.getId(), new ActionListener<CancelTasksResponse>() {
188201
@Override
189202
public void onResponse(CancelTasksResponse cancelTasksResponse) {
190-
logger.trace("Persistent task with id {} was cancelled", task.getId());
191-
203+
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was cancelled", task.getAction(),
204+
task.getPersistentTaskId(), task.getAllocationId());
192205
}
193206

194207
@Override
195208
public void onFailure(Exception e) {
196209
// There is really nothing we can do in case of failure here
197-
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed to cancel task {}", task.getPersistentTaskId()), e);
210+
logger.warn((Supplier<?>) () ->
211+
new ParameterizedMessage("failed to cancel task [{}] with id [{}] and allocation id [{}]", task.getAction(),
212+
task.getPersistentTaskId(), task.getAllocationId()), e);
198213
}
199214
});
200215
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksService.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ public <Params extends PersistentTaskParams> void startPersistentTask(String tas
7373
/**
7474
* Notifies the PersistentTasksClusterService about successful (failure == null) completion of a task or its failure
7575
*/
76-
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
77-
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, failure);
76+
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
77+
ActionListener<PersistentTask<?>> listener) {
78+
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
7879
try {
7980
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
8081
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));

server/src/test/java/org/elasticsearch/persistent/PersistentTasksExecutorIT.java

+12
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,18 @@ public void testPersistentActionCompletion() throws Exception {
122122
assertThat(firstRunningTask.getParentTaskId().getId(), equalTo(allocationId));
123123
assertThat(firstRunningTask.getParentTaskId().getNodeId(), equalTo("cluster"));
124124
assertThat(firstRunningTask.getDescription(), equalTo("id=" + taskId));
125+
126+
if (randomBoolean()) {
127+
logger.info("Simulating errant completion notification");
128+
//try sending completion request with incorrect allocation id
129+
PlainActionFuture<PersistentTask<?>> failedCompletionNotificationFuture = new PlainActionFuture<>();
130+
persistentTasksService.sendCompletionNotification(taskId, Long.MAX_VALUE, null, failedCompletionNotificationFuture);
131+
assertThrows(failedCompletionNotificationFuture, ResourceNotFoundException.class);
132+
// Make sure that the task is still running
133+
assertThat(client().admin().cluster().prepareListTasks().setActions(TestPersistentTasksExecutor.NAME + "[c]")
134+
.setDetailed(true).get().getTasks().size(), equalTo(1));
135+
}
136+
125137
stopOrCancelTask(firstRunningTask.getTaskId());
126138
}
127139

server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ public void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksR
184184
}
185185

186186
@Override
187-
public void sendCompletionNotification(String taskId, Exception failure, ActionListener<PersistentTask<?>> listener) {
187+
public void sendCompletionNotification(String taskId, long allocationId, Exception failure,
188+
ActionListener<PersistentTask<?>> listener) {
188189
fail("Shouldn't be called during Cluster State cancellation");
189190
}
190191
};

server/src/test/java/org/elasticsearch/persistent/RestartPersistentTaskRequestTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public class RestartPersistentTaskRequestTests extends AbstractStreamableTestCas
2525

2626
@Override
2727
protected Request createTestInstance() {
28-
return new Request(randomAlphaOfLength(10), null);
28+
return new Request(randomAlphaOfLength(10), randomLong(), null);
2929
}
3030

3131
@Override

0 commit comments

Comments
 (0)