Skip to content

Commit 6bfea09

Browse files
imotovmartijnvg
authored andcommitted
Persistent Tasks: switch from long task ids to string task ids (elastic#1035)
This commit switches from long persistent task ids to caller-supplied string persistent task ids.
1 parent 0a1f255 commit 6bfea09

19 files changed

+287
-251
lines changed

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
* Represents a executor node operation that corresponds to a persistent task
3737
*/
3838
public class AllocatedPersistentTask extends CancellableTask {
39-
private long persistentTaskId;
39+
private String persistentTaskId;
4040
private long allocationId;
4141

4242
private final AtomicReference<State> state;
@@ -81,11 +81,11 @@ public void updatePersistentStatus(Task.Status status, ActionListener<Persistent
8181
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
8282
}
8383

84-
public long getPersistentTaskId() {
84+
public String getPersistentTaskId() {
8585
return persistentTaskId;
8686
}
8787

88-
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, long persistentTaskId, long
88+
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
8989
allocationId) {
9090
this.persistentTasksService = persistentTasksService;
9191
this.logger = logger;

server/src/main/java/org/elasticsearch/persistent/CompletionPersistentTaskAction.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.io.IOException;
4343
import java.util.Objects;
4444

45+
import static org.elasticsearch.action.ValidateActions.addValidationError;
46+
4547
/**
4648
* Action that is used by executor node to indicate that the persistent action finished or failed on the node and needs to be
4749
* removed from the cluster state in case of successful completion or restarted on some other node in case of failure.
@@ -69,44 +71,48 @@ public PersistentTaskResponse newResponse() {
6971

7072
public static class Request extends MasterNodeRequest<Request> {
7173

72-
private long taskId;
74+
private String taskId;
7375

7476
private Exception exception;
7577

7678
public Request() {
7779

7880
}
7981

80-
public Request(long taskId, Exception exception) {
82+
public Request(String taskId, Exception exception) {
8183
this.taskId = taskId;
8284
this.exception = exception;
8385
}
8486

8587
@Override
8688
public void readFrom(StreamInput in) throws IOException {
8789
super.readFrom(in);
88-
taskId = in.readLong();
90+
taskId = in.readString();
8991
exception = in.readException();
9092
}
9193

9294
@Override
9395
public void writeTo(StreamOutput out) throws IOException {
9496
super.writeTo(out);
95-
out.writeLong(taskId);
97+
out.writeString(taskId);
9698
out.writeException(exception);
9799
}
98100

99101
@Override
100102
public ActionRequestValidationException validate() {
101-
return null;
103+
ActionRequestValidationException validationException = null;
104+
if (taskId == null) {
105+
validationException = addValidationError("task id is missing", validationException);
106+
}
107+
return validationException;
102108
}
103109

104110
@Override
105111
public boolean equals(Object o) {
106112
if (this == o) return true;
107113
if (o == null || getClass() != o.getClass()) return false;
108114
Request request = (Request) o;
109-
return taskId == request.taskId &&
115+
return Objects.equals(taskId, request.taskId) &&
110116
Objects.equals(exception, request.exception);
111117
}
112118

server/src/main/java/org/elasticsearch/persistent/CreatePersistentTaskAction.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ public PersistentTaskResponse newResponse() {
6666

6767
public static class Request extends MasterNodeRequest<Request> {
6868

69+
private String taskId;
70+
6971
private String action;
7072

7173
private PersistentTaskRequest request;
@@ -74,28 +76,34 @@ public Request() {
7476

7577
}
7678

77-
public Request(String action, PersistentTaskRequest request) {
79+
public Request(String taskId, String action, PersistentTaskRequest request) {
80+
this.taskId = taskId;
7881
this.action = action;
7982
this.request = request;
8083
}
8184

8285
@Override
8386
public void readFrom(StreamInput in) throws IOException {
8487
super.readFrom(in);
88+
taskId = in.readString();
8589
action = in.readString();
8690
request = in.readNamedWriteable(PersistentTaskRequest.class);
8791
}
8892

8993
@Override
9094
public void writeTo(StreamOutput out) throws IOException {
9195
super.writeTo(out);
96+
out.writeString(taskId);
9297
out.writeString(action);
9398
out.writeNamedWriteable(request);
9499
}
95100

96101
@Override
97102
public ActionRequestValidationException validate() {
98103
ActionRequestValidationException validationException = null;
104+
if (this.taskId == null) {
105+
validationException = addValidationError("task id must be specified", validationException);
106+
}
99107
if (this.action == null) {
100108
validationException = addValidationError("action must be specified", validationException);
101109
}
@@ -110,13 +118,13 @@ public boolean equals(Object o) {
110118
if (this == o) return true;
111119
if (o == null || getClass() != o.getClass()) return false;
112120
Request request1 = (Request) o;
113-
return Objects.equals(action, request1.action) &&
121+
return Objects.equals(taskId, request1.taskId) && Objects.equals(action, request1.action) &&
114122
Objects.equals(request, request1.request);
115123
}
116124

117125
@Override
118126
public int hashCode() {
119-
return Objects.hash(action, request);
127+
return Objects.hash(taskId, action, request);
120128
}
121129

122130
public String getAction() {
@@ -127,6 +135,14 @@ public void setAction(String action) {
127135
this.action = action;
128136
}
129137

138+
public String getTaskId() {
139+
return taskId;
140+
}
141+
142+
public void setTaskId(String taskId) {
143+
this.taskId = taskId;
144+
}
145+
130146
public PersistentTaskRequest getRequest() {
131147
return request;
132148
}
@@ -144,6 +160,11 @@ protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction
144160
super(client, action, new Request());
145161
}
146162

163+
public RequestBuilder setTaskId(String taskId) {
164+
request.setTaskId(taskId);
165+
return this;
166+
}
167+
147168
public RequestBuilder setAction(String action) {
148169
request.setAction(action);
149170
return this;
@@ -194,7 +215,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
194215
@Override
195216
protected final void masterOperation(final Request request, ClusterState state,
196217
final ActionListener<PersistentTaskResponse> listener) {
197-
persistentTasksClusterService.createPersistentTask(request.action, request.request,
218+
persistentTasksClusterService.createPersistentTask(request.taskId, request.action, request.request,
198219
new ActionListener<PersistentTask<?>>() {
199220

200221
@Override

server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.persistent;
2121

2222
import org.apache.logging.log4j.Logger;
23+
import org.elasticsearch.ResourceAlreadyExistsException;
2324
import org.elasticsearch.ResourceNotFoundException;
2425
import org.elasticsearch.action.ActionListener;
2526
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -60,15 +61,19 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
6061
* @param request request
6162
* @param listener the listener that will be called when task is started
6263
*/
63-
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
64+
public <Request extends PersistentTaskRequest> void createPersistentTask(String taskId, String action, Request request,
6465
ActionListener<PersistentTask<?>> listener) {
6566
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
6667
@Override
6768
public ClusterState execute(ClusterState currentState) throws Exception {
69+
PersistentTasksCustomMetaData.Builder builder = builder(currentState);
70+
if (builder.hasTask(taskId)) {
71+
throw new ResourceAlreadyExistsException("task with id {" + taskId + "} already exist");
72+
}
6873
validate(action, clusterService.state(), request);
6974
final Assignment assignment;
7075
assignment = getAssignement(action, currentState, request);
71-
return update(currentState, builder(currentState).addTask(action, request, assignment));
76+
return update(currentState, builder.addTask(taskId, action, request, assignment));
7277
}
7378

7479
@Override
@@ -81,7 +86,7 @@ public void onFailure(String source, Exception e) {
8186
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
8287
PersistentTasksCustomMetaData tasks = newState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
8388
if (tasks != null) {
84-
listener.onResponse(tasks.getTask(tasks.getCurrentId()));
89+
listener.onResponse(tasks.getTask(taskId));
8590
} else {
8691
listener.onResponse(null);
8792
}
@@ -97,7 +102,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
97102
* @param failure the reason for restarting the task or null if the task completed successfully
98103
* @param listener the listener that will be called when task is removed
99104
*/
100-
public void completePersistentTask(long id, Exception failure, ActionListener<PersistentTask<?>> listener) {
105+
public void completePersistentTask(String id, Exception failure, ActionListener<PersistentTask<?>> listener) {
101106
final String source;
102107
if (failure != null) {
103108
logger.warn("persistent task " + id + " failed", failure);
@@ -138,7 +143,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
138143
* @param id the id of a persistent task
139144
* @param listener the listener that will be called when task is removed
140145
*/
141-
public void removePersistentTask(long id, ActionListener<PersistentTask<?>> listener) {
146+
public void removePersistentTask(String id, ActionListener<PersistentTask<?>> listener) {
142147
clusterService.submitStateUpdateTask("remove persistent task", new ClusterStateUpdateTask() {
143148
@Override
144149
public ClusterState execute(ClusterState currentState) throws Exception {
@@ -166,12 +171,12 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
166171
/**
167172
* Update task status
168173
*
169-
* @param id the id of a persistent task
170-
* @param allocationId the expected allocation id of the persistent task
171-
* @param status new status
172-
* @param listener the listener that will be called when task is removed
174+
* @param id the id of a persistent task
175+
* @param allocationId the expected allocation id of the persistent task
176+
* @param status new status
177+
* @param listener the listener that will be called when task is removed
173178
*/
174-
public void updatePersistentTaskStatus(long id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
179+
public void updatePersistentTaskStatus(String id, long allocationId, Task.Status status, ActionListener<PersistentTask<?>> listener) {
175180
clusterService.submitStateUpdateTask("update task status", new ClusterStateUpdateTask() {
176181
@Override
177182
public ClusterState execute(ClusterState currentState) throws Exception {

0 commit comments

Comments
 (0)