Skip to content

Commit 6e48066

Browse files
authored
Remove AllocatedPersistentTask.getState() (#30858)
This commit removes the method AllocatedPersistentTask.getState() that exposes the internal state of an AllocatedPersistentTask and replaces it with a new isCompleted() method. Related to #29608.
1 parent 6577f5b commit 6e48066

File tree

3 files changed

+28
-28
lines changed

3 files changed

+28
-28
lines changed

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

+18-22
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.elasticsearch.common.unit.TimeValue;
2727
import org.elasticsearch.tasks.CancellableTask;
2828
import org.elasticsearch.tasks.Task;
29-
import org.elasticsearch.tasks.TaskCancelledException;
3029
import org.elasticsearch.tasks.TaskId;
3130
import org.elasticsearch.tasks.TaskManager;
3231

@@ -38,18 +37,16 @@
3837
* Represents a executor node operation that corresponds to a persistent task
3938
*/
4039
public class AllocatedPersistentTask extends CancellableTask {
41-
private volatile String persistentTaskId;
42-
private volatile long allocationId;
4340

4441
private final AtomicReference<State> state;
45-
@Nullable
46-
private volatile Exception failure;
4742

43+
private volatile String persistentTaskId;
44+
private volatile long allocationId;
45+
private volatile @Nullable Exception failure;
4846
private volatile PersistentTasksService persistentTasksService;
4947
private volatile Logger logger;
5048
private volatile TaskManager taskManager;
5149

52-
5350
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
5451
Map<String, String> headers) {
5552
super(id, type, action, description, parentTask, headers);
@@ -101,24 +98,10 @@ public Exception getFailure() {
10198
return failure;
10299
}
103100

104-
boolean markAsCancelled() {
105-
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
106-
}
107-
108-
public State getState() {
109-
return state.get();
110-
}
111-
112101
public long getAllocationId() {
113102
return allocationId;
114103
}
115104

116-
public enum State {
117-
STARTED, // the task is currently running
118-
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
119-
COMPLETED // the task is done running and trying to notify caller
120-
}
121-
122105
/**
123106
* Waits for this persistent task to have the desired state.
124107
*/
@@ -128,6 +111,14 @@ public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.
128111
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
129112
}
130113

114+
final boolean isCompleted() {
115+
return state.get() == State.COMPLETED;
116+
}
117+
118+
boolean markAsCancelled() {
119+
return state.compareAndSet(State.STARTED, State.PENDING_CANCEL);
120+
}
121+
131122
public void markAsCompleted() {
132123
completeAndNotifyIfNeeded(null);
133124
}
@@ -138,11 +129,10 @@ public void markAsFailed(Exception e) {
138129
} else {
139130
completeAndNotifyIfNeeded(e);
140131
}
141-
142132
}
143133

144134
private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
145-
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
135+
final State prevState = state.getAndSet(State.COMPLETED);
146136
if (prevState == State.COMPLETED) {
147137
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
148138
} else {
@@ -173,4 +163,10 @@ public void onFailure(Exception e) {
173163
}
174164
}
175165
}
166+
167+
public enum State {
168+
STARTED, // the task is currently running
169+
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
170+
COMPLETED // the task is done running and trying to notify caller
171+
}
176172
}

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void clusterChanged(ClusterChangedEvent event) {
123123

124124
for (Long id : notVisitedTasks) {
125125
AllocatedPersistentTask task = runningTasks.get(id);
126-
if (task.getState() == AllocatedPersistentTask.State.COMPLETED) {
126+
if (task.isCompleted()) {
127127
// Result was sent to the caller and the caller acknowledged acceptance of the result
128128
logger.trace("Found completed persistent task [{}] with id [{}] and allocation id [{}] - removing",
129129
task.getAction(), task.getPersistentTaskId(), task.getAllocationId());

server/src/test/java/org/elasticsearch/persistent/PersistentTasksNodeServiceTests.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import java.util.concurrent.atomic.AtomicReference;
5353

5454
import static org.hamcrest.Matchers.empty;
55+
import static org.hamcrest.Matchers.is;
5556
import static org.hamcrest.Matchers.sameInstance;
5657
import static org.hamcrest.core.IsEqual.equalTo;
5758
import static org.mockito.Matchers.any;
@@ -73,7 +74,6 @@ public void setUp() throws Exception {
7374
threadPool = new TestThreadPool(getClass().getName());
7475
}
7576

76-
7777
@Override
7878
@After
7979
public void tearDown() throws Exception {
@@ -95,7 +95,7 @@ private ClusterState createInitialClusterState(int nonLocalNodesCount, Settings
9595
return state.build();
9696
}
9797

98-
public void testStartTask() throws Exception {
98+
public void testStartTask() {
9999
PersistentTasksService persistentTasksService = mock(PersistentTasksService.class);
100100
@SuppressWarnings("unchecked") PersistentTasksExecutor<TestParams> action = mock(PersistentTasksExecutor.class);
101101
when(action.getExecutor()).thenReturn(ThreadPool.Names.SAME);
@@ -131,8 +131,8 @@ public void testStartTask() throws Exception {
131131

132132
if (added == false) {
133133
logger.info("No local node action was added");
134-
135134
}
135+
136136
MetaData.Builder metaData = MetaData.builder(state.metaData());
137137
metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks.build());
138138
ClusterState newClusterState = ClusterState.builder(state).metaData(metaData).build();
@@ -149,6 +149,7 @@ public void testStartTask() throws Exception {
149149

150150
// Make sure action wasn't called again
151151
assertThat(executor.executions.size(), equalTo(1));
152+
assertThat(executor.get(0).task.isCompleted(), is(false));
152153

153154
// Start another task on this node
154155
state = newClusterState;
@@ -157,10 +158,15 @@ public void testStartTask() throws Exception {
157158

158159
// Make sure action was called this time
159160
assertThat(executor.size(), equalTo(2));
161+
assertThat(executor.get(1).task.isCompleted(), is(false));
160162

161163
// Finish both tasks
162164
executor.get(0).task.markAsFailed(new RuntimeException());
163165
executor.get(1).task.markAsCompleted();
166+
167+
assertThat(executor.get(0).task.isCompleted(), is(true));
168+
assertThat(executor.get(1).task.isCompleted(), is(true));
169+
164170
String failedTaskId = executor.get(0).task.getPersistentTaskId();
165171
String finishedTaskId = executor.get(1).task.getPersistentTaskId();
166172
executor.clear();
@@ -186,7 +192,6 @@ public void testStartTask() throws Exception {
186192
// Make sure action was only allocated on this node once
187193
assertThat(executor.size(), equalTo(1));
188194
}
189-
190195
}
191196

192197
public void testParamsStatusAndNodeTaskAreDelegated() throws Exception {
@@ -300,7 +305,6 @@ public void sendCompletionNotification(String taskId, long allocationId, Excepti
300305

301306
// Check the the task is now removed from task manager
302307
assertThat(taskManager.getTasks().values(), empty());
303-
304308
}
305309

306310
private <Params extends PersistentTaskParams> ClusterState addTask(ClusterState state, String action, Params params,

0 commit comments

Comments
 (0)