Skip to content

Commit d999942

Browse files
authored
Refactor AllocatedPersistentTask#init(), move rollup logic out of ctor (#46288)
This makes the AllocatedPersistentTask#init() method protected so that implementing classes can perform their initialization logic there, instead of the constructor. Rollup's task is adjusted to use this init method. It also slightly refactors the methods to se a static logger in the AllocatedTask instead of passing it in via an argument. This is simpler, logged messages come from the task instead of the service, and is easier for tests
1 parent 9d3467a commit d999942

File tree

4 files changed

+94
-40
lines changed

4 files changed

+94
-40
lines changed

server/src/main/java/org/elasticsearch/persistent/AllocatedPersistentTask.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.persistent;
2020

21+
import org.apache.logging.log4j.LogManager;
2122
import org.apache.logging.log4j.Logger;
2223
import org.apache.logging.log4j.message.ParameterizedMessage;
2324
import org.elasticsearch.action.ActionListener;
@@ -37,13 +38,13 @@
3738
*/
3839
public class AllocatedPersistentTask extends CancellableTask {
3940

41+
private static final Logger logger = LogManager.getLogger(AllocatedPersistentTask.class);
4042
private final AtomicReference<State> state;
4143

4244
private volatile String persistentTaskId;
4345
private volatile long allocationId;
4446
private volatile @Nullable Exception failure;
4547
private volatile PersistentTasksService persistentTasksService;
46-
private volatile Logger logger;
4748
private volatile TaskManager taskManager;
4849

4950
public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
@@ -85,10 +86,9 @@ public String getPersistentTaskId() {
8586
return persistentTaskId;
8687
}
8788

88-
void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
89-
allocationId) {
89+
protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
90+
String persistentTaskId, long allocationId) {
9091
this.persistentTasksService = persistentTasksService;
91-
this.logger = logger;
9292
this.taskManager = taskManager;
9393
this.persistentTaskId = persistentTaskId;
9494
this.allocationId = allocationId;

server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public Task createTask(long id, String type, String action, TaskId parentTaskId,
183183

184184
boolean processed = false;
185185
try {
186-
task.init(persistentTasksService, taskManager, logger, taskInProgress.getId(), taskInProgress.getAllocationId());
186+
task.init(persistentTasksService, taskManager, taskInProgress.getId(), taskInProgress.getAllocationId());
187187
logger.trace("Persistent task [{}] with id [{}] and allocation id [{}] was created", task.getAction(),
188188
task.getPersistentTaskId(), task.getAllocationId());
189189
try {

x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/job/RollupJobTask.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.elasticsearch.persistent.PersistentTaskState;
2323
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2424
import org.elasticsearch.persistent.PersistentTasksExecutor;
25+
import org.elasticsearch.persistent.PersistentTasksService;
2526
import org.elasticsearch.tasks.TaskId;
27+
import org.elasticsearch.tasks.TaskManager;
2628
import org.elasticsearch.threadpool.ThreadPool;
2729
import org.elasticsearch.xpack.core.ClientHelper;
2830
import org.elasticsearch.xpack.core.indexing.IndexerState;
@@ -150,44 +152,59 @@ protected void onAbort() {
150152
private final RollupJob job;
151153
private final SchedulerEngine schedulerEngine;
152154
private final ThreadPool threadPool;
153-
private final RollupIndexer indexer;
155+
private final Client client;
156+
private final IndexerState initialIndexerState;
157+
private final Map<String, Object> initialPosition;
158+
private RollupIndexer indexer;
154159

155160
RollupJobTask(long id, String type, String action, TaskId parentTask, RollupJob job, RollupJobStatus state,
156161
Client client, SchedulerEngine schedulerEngine, ThreadPool threadPool, Map<String, String> headers) {
157162
super(id, type, action, RollupField.NAME + "_" + job.getConfig().getId(), parentTask, headers);
158163
this.job = job;
159164
this.schedulerEngine = schedulerEngine;
160165
this.threadPool = threadPool;
166+
this.client = client;
167+
if (state == null) {
168+
this.initialIndexerState = null;
169+
this.initialPosition = null;
170+
} else {
171+
this.initialIndexerState = state.getIndexerState();
172+
this.initialPosition = state.getPosition();
173+
}
174+
175+
}
161176

162-
// If status is not null, we are resuming rather than starting fresh.
163-
Map<String, Object> initialPosition = null;
164-
IndexerState initialState = IndexerState.STOPPED;
165-
if (state != null) {
166-
final IndexerState existingState = state.getIndexerState();
167-
logger.debug("We have existing state, setting state to [" + existingState + "] " +
168-
"and current position to [" + state.getPosition() + "] for job [" + job.getConfig().getId() + "]");
169-
if (existingState.equals(IndexerState.INDEXING)) {
177+
@Override
178+
protected void init(PersistentTasksService persistentTasksService, TaskManager taskManager,
179+
String persistentTaskId, long allocationId) {
180+
super.init(persistentTasksService, taskManager, persistentTaskId, allocationId);
181+
182+
// If initial position is not null, we are resuming rather than starting fresh.
183+
IndexerState indexerState = IndexerState.STOPPED;
184+
if (initialPosition != null) {
185+
logger.debug("We have existing state, setting state to [" + initialIndexerState + "] " +
186+
"and current position to [" + initialPosition + "] for job [" + job.getConfig().getId() + "]");
187+
if (initialIndexerState.equals(IndexerState.INDEXING)) {
170188
/*
171189
* If we were indexing, we have to reset back to STARTED otherwise the indexer will be "stuck" thinking
172190
* it is indexing but without the actual indexing thread running.
173191
*/
174-
initialState = IndexerState.STARTED;
192+
indexerState = IndexerState.STARTED;
175193

176-
} else if (existingState.equals(IndexerState.ABORTING) || existingState.equals(IndexerState.STOPPING)) {
194+
} else if (initialIndexerState.equals(IndexerState.ABORTING) || initialIndexerState.equals(IndexerState.STOPPING)) {
177195
// It shouldn't be possible to persist ABORTING, but if for some reason it does,
178196
// play it safe and restore the job as STOPPED. An admin will have to clean it up,
179197
// but it won't be running, and won't delete itself either. Safest option.
180198
// If we were STOPPING, that means it persisted but was killed before finally stopped... so ok
181199
// to restore as STOPPED
182-
initialState = IndexerState.STOPPED;
200+
indexerState = IndexerState.STOPPED;
183201
} else {
184-
initialState = existingState;
202+
indexerState = initialIndexerState;
185203
}
186-
initialPosition = state.getPosition();
187204

188205
}
189-
this.indexer = new ClientRollupPageManager(job, initialState, initialPosition,
190-
new ParentTaskAssigningClient(client, new TaskId(getPersistentTaskId())));
206+
this.indexer = new ClientRollupPageManager(job, indexerState, initialPosition,
207+
new ParentTaskAssigningClient(client, getParentTaskId()));
191208
}
192209

193210
@Override

0 commit comments

Comments
 (0)