Skip to content

Commit f336c74

Browse files
committed
Revert "Refactor AllocatedPersistentTask#init(), move rollup logic out of ctor (#46288)"
This reverts commit d999942.
1 parent cde5011 commit f336c74

File tree

4 files changed

+40
-94
lines changed

4 files changed

+40
-94
lines changed

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package org.elasticsearch.persistent;
2020

21-
import org.apache.logging.log4j.LogManager;
2221
import org.apache.logging.log4j.Logger;
2322
import org.apache.logging.log4j.message.ParameterizedMessage;
2423
import org.elasticsearch.action.ActionListener;
@@ -38,13 +37,13 @@
3837
*/
3938
public class AllocatedPersistentTask extends CancellableTask {
4039

41-
private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class);
4240
private final AtomicReference<State> state;
4341

4442
private volatile String persistentTaskId;
4543
private volatile long allocationId;
4644
private volatile @Nullable Exception failure;
4745
private volatile PersistentTasksService persistentTasksService;
46+
private volatile Logger logger;
4847
private volatile TaskManager taskManager;
4948

5049
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
@@ -86,9 +85,10 @@ public String getPersistentTaskId() {
8685
return persistentTaskId;
8786
}
8887

89-
protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
90-
String persistentTaskId, long allocationId) {
88+
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
89+
allocationId) {
9190
this.persistentTasksService = persistentTasksService;
91+
this.logger = logger;
9292
this.taskManager = taskManager;
9393
this.persistentTaskId = persistentTaskId;
9494
this.allocationId = allocationId;

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
183183

184184
boolean processed = false;
185185
try {
186-
task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
186+
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
187187
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
188188
task.getPersistentTaskId(), task.getAllocationId());
189189
try {

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

+16-33
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@
2222
import org.elasticsearch.persistent.PersistentTaskState;
2323
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2424
import org.elasticsearch.persistent.PersistentTasksExecutor;
25-
import org.elasticsearch.persistent.PersistentTasksService;
2625
import org.elasticsearch.tasks.TaskId;
27-
import org.elasticsearch.tasks.TaskManager;
2826
import org.elasticsearch.threadpool.ThreadPool;
2927
import org.elasticsearch.xpack.core.ClientHelper;
3028
import org.elasticsearch.xpack.core.indexing.IndexerState;
@@ -152,59 +150,44 @@ protected void onAbort() {
152150
private final RollupJob job;
153151
private final SchedulerEngine schedulerEngine;
154152
private final ThreadPool threadPool;
155-
private final Client client;
156-
private final IndexerState initialIndexerState;
157-
private final Map<String, Object> initialPosition;
158-
private RollupIndexer indexer;
153+
private final RollupIndexer indexer;
159154

160155
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
161156
Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
162157
super(id, type, action, RollupField.NAME + "_" + job.getConfig().getId(), parentTask, headers);
163158
this.job = job;
164159
this.schedulerEngine = schedulerEngine;
165160
this.threadPool = threadPool;
166-
this.client = client;
167-
if (state == null) {
168-
this.initialIndexerState = null;
169-
this.initialPosition = null;
170-
} else {
171-
this.initialIndexerState = state.getIndexerState();
172-
this.initialPosition = state.getPosition();
173-
}
174-
175-
}
176161

177-
@Override
178-
protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
179-
String persistentTaskId, long allocationId) {
180-
super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
181-
182-
// If initial position is not null, we are resuming rather than starting fresh.
183-
IndexerState indexerState = IndexerState.STOPPED;
184-
if (initialPosition != null) {
185-
logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " +
186-
"and current position to [" + initialPosition + "] for job [" + job.getConfig().getId() + "]");
187-
if (initialIndexerState.equals(IndexerState.INDEXING)) {
162+
// If status is not null, we are resuming rather than starting fresh.
163+
Map<String, Object> initialPosition = null;
164+
IndexerState initialState = IndexerState.STOPPED;
165+
if (state != null) {
166+
final IndexerState existingState = state.getIndexerState();
167+
logger.debug("We have existing state, setting state to [" + existingState + "] " +
168+
"and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
169+
if (existingState.equals(IndexerState.INDEXING)) {
188170
/*
189171
* If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
190172
* it is indexing but without the actual indexing thread running.
191173
*/
192-
indexerState = IndexerState.STARTED;
174+
initialState = IndexerState.STARTED;
193175

194-
} else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) {
176+
} else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
195177
// It shouldn't be possible to persist ABORTING, but if for some reason it does,
196178
// play it safe and restore the job as STOPPED. An admin will have to clean it up,
197179
// but it won't be running, and won't delete itself either. Safest option.
198180
// If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
199181
// to restore as STOPPED
200-
indexerState = IndexerState.STOPPED;
182+
initialState = IndexerState.STOPPED;
201183
} else {
202-
indexerState = initialIndexerState;
184+
initialState = existingState;
203185
}
186+
initialPosition = state.getPosition();
204187

205188
}
206-
this.indexer = new ClientRollupPageManager(job, indexerState, initialPosition,
207-
new ParentTaskAssigningClient(client, getParentTaskId()));
189+
this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
190+
new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())));
208191
}
209192

210193
@Override

0 commit comments

Comments
 (0)