Skip to content

Commit 1a5553d

Browse files
MINOR: Cleanup Runnables in SnapshotsService (#35796)
* Simplify complex `Runnable` by moving to `AbstractRunnable`
1 parent 60db06e commit 1a5553d

File tree

1 file changed

+123
-111
lines changed

1 file changed

+123
-111
lines changed

server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java

Lines changed: 123 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.common.inject.Inject;
5959
import org.elasticsearch.common.settings.Settings;
6060
import org.elasticsearch.common.unit.TimeValue;
61+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
6162
import org.elasticsearch.index.Index;
6263
import org.elasticsearch.index.shard.ShardId;
6364
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@@ -280,9 +281,7 @@ public void onFailure(String source, Exception e) {
280281
@Override
281282
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
282283
if (newSnapshot != null) {
283-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() ->
284-
beginSnapshot(newState, newSnapshot, request.partial(), listener)
285-
);
284+
beginSnapshot(newState, newSnapshot, request.partial(), listener);
286285
}
287286
}
288287

@@ -349,125 +348,133 @@ private void beginSnapshot(final ClusterState clusterState,
349348
final SnapshotsInProgress.Entry snapshot,
350349
final boolean partial,
351350
final CreateSnapshotListener userCreateSnapshotListener) {
352-
boolean snapshotCreated = false;
353-
try {
354-
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());
355-
356-
MetaData metaData = clusterState.metaData();
357-
if (!snapshot.includeGlobalState()) {
358-
// Remove global state from the cluster state
359-
MetaData.Builder builder = MetaData.builder();
360-
for (IndexId index : snapshot.indices()) {
361-
builder.put(metaData.index(index.getName()), false);
362-
}
363-
metaData = builder.build();
364-
}
351+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
365352

366-
repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
367-
snapshotCreated = true;
353+
boolean snapshotCreated;
368354

369-
logger.info("snapshot [{}] started", snapshot.snapshot());
370-
if (snapshot.indices().isEmpty()) {
371-
// No indices in this snapshot - we are done
372-
userCreateSnapshotListener.onResponse();
373-
endSnapshot(snapshot);
374-
return;
375-
}
376-
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
355+
@Override
356+
protected void doRun() {
357+
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());
358+
359+
MetaData metaData = clusterState.metaData();
360+
if (!snapshot.includeGlobalState()) {
361+
// Remove global state from the cluster state
362+
MetaData.Builder builder = MetaData.builder();
363+
for (IndexId index : snapshot.indices()) {
364+
builder.put(metaData.index(index.getName()), false);
365+
}
366+
metaData = builder.build();
367+
}
377368

378-
SnapshotsInProgress.Entry endSnapshot;
379-
String failure = null;
369+
repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
370+
snapshotCreated = true;
380371

381-
@Override
382-
public ClusterState execute(ClusterState currentState) {
383-
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
384-
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
385-
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
386-
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
387-
entries.add(entry);
388-
continue;
389-
}
372+
logger.info("snapshot [{}] started", snapshot.snapshot());
373+
if (snapshot.indices().isEmpty()) {
374+
// No indices in this snapshot - we are done
375+
userCreateSnapshotListener.onResponse();
376+
endSnapshot(snapshot);
377+
return;
378+
}
379+
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
380+
381+
SnapshotsInProgress.Entry endSnapshot;
382+
String failure;
383+
384+
@Override
385+
public ClusterState execute(ClusterState currentState) {
386+
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
387+
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
388+
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
389+
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
390+
entries.add(entry);
391+
continue;
392+
}
390393

391-
if (entry.state() != State.ABORTED) {
392-
// Replace the snapshot that was just intialized
393-
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
394-
if (!partial) {
395-
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
396-
Set<String> missing = indicesWithMissingShards.v1();
397-
Set<String> closed = indicesWithMissingShards.v2();
398-
if (missing.isEmpty() == false || closed.isEmpty() == false) {
399-
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
400-
entries.add(endSnapshot);
401-
402-
final StringBuilder failureMessage = new StringBuilder();
403-
if (missing.isEmpty() == false) {
404-
failureMessage.append("Indices don't have primary shards ");
405-
failureMessage.append(missing);
406-
}
407-
if (closed.isEmpty() == false) {
408-
if (failureMessage.length() > 0) {
409-
failureMessage.append("; ");
394+
if (entry.state() != State.ABORTED) {
395+
// Replace the snapshot that was just intialized
396+
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
397+
if (!partial) {
398+
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
399+
Set<String> missing = indicesWithMissingShards.v1();
400+
Set<String> closed = indicesWithMissingShards.v2();
401+
if (missing.isEmpty() == false || closed.isEmpty() == false) {
402+
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
403+
entries.add(endSnapshot);
404+
405+
final StringBuilder failureMessage = new StringBuilder();
406+
if (missing.isEmpty() == false) {
407+
failureMessage.append("Indices don't have primary shards ");
408+
failureMessage.append(missing);
409+
}
410+
if (closed.isEmpty() == false) {
411+
if (failureMessage.length() > 0) {
412+
failureMessage.append("; ");
413+
}
414+
failureMessage.append("Indices are closed ");
415+
failureMessage.append(closed);
410416
}
411-
failureMessage.append("Indices are closed ");
412-
failureMessage.append(closed);
417+
failure = failureMessage.toString();
418+
continue;
413419
}
414-
failure = failureMessage.toString();
415-
continue;
416420
}
421+
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
422+
entries.add(updatedSnapshot);
423+
if (completed(shards.values())) {
424+
endSnapshot = updatedSnapshot;
425+
}
426+
} else {
427+
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
428+
failure = "snapshot was aborted during initialization";
429+
endSnapshot = entry;
430+
entries.add(endSnapshot);
417431
}
418-
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
419-
entries.add(updatedSnapshot);
420-
if (completed(shards.values())) {
421-
endSnapshot = updatedSnapshot;
422-
}
423-
} else {
424-
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
425-
failure = "snapshot was aborted during initialization";
426-
endSnapshot = entry;
427-
entries.add(endSnapshot);
428432
}
433+
return ClusterState.builder(currentState)
434+
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries)))
435+
.build();
429436
}
430-
return ClusterState.builder(currentState)
431-
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(Collections.unmodifiableList(entries)))
432-
.build();
433-
}
434437

435-
@Override
436-
public void onFailure(String source, Exception e) {
437-
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e);
438-
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
439-
}
440-
441-
@Override
442-
public void onNoLongerMaster(String source) {
443-
// We are not longer a master - we shouldn't try to do any cleanup
444-
// The new master will take care of it
445-
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
446-
userCreateSnapshotListener.onFailure(
447-
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
448-
}
438+
@Override
439+
public void onFailure(String source, Exception e) {
440+
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e);
441+
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, true, userCreateSnapshotListener, e));
442+
}
449443

450-
@Override
451-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
452-
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
453-
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
454-
// completion listener in this method. For the snapshot completion to work properly, the snapshot
455-
// should still exist when listener is registered.
456-
userCreateSnapshotListener.onResponse();
444+
@Override
445+
public void onNoLongerMaster(String source) {
446+
// We are not longer a master - we shouldn't try to do any cleanup
447+
// The new master will take care of it
448+
logger.warn("[{}] failed to create snapshot - no longer a master", snapshot.snapshot().getSnapshotId());
449+
userCreateSnapshotListener.onFailure(
450+
new SnapshotException(snapshot.snapshot(), "master changed during snapshot initialization"));
451+
}
457452

458-
// Now that snapshot completion listener is registered we can end the snapshot if needed
459-
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
460-
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
461-
// go ahead and continue working on this snapshot rather then end here.
462-
if (endSnapshot != null) {
463-
endSnapshot(endSnapshot, failure);
453+
@Override
454+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
455+
// The userCreateSnapshotListener.onResponse() notifies caller that the snapshot was accepted
456+
// for processing. If client wants to wait for the snapshot completion, it can register snapshot
457+
// completion listener in this method. For the snapshot completion to work properly, the snapshot
458+
// should still exist when listener is registered.
459+
userCreateSnapshotListener.onResponse();
460+
461+
// Now that snapshot completion listener is registered we can end the snapshot if needed
462+
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
463+
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
464+
// go ahead and continue working on this snapshot rather then end here.
465+
if (endSnapshot != null) {
466+
endSnapshot(endSnapshot, failure);
467+
}
464468
}
465-
}
466-
});
467-
} catch (Exception e) {
468-
logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e);
469-
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
470-
}
469+
});
470+
}
471+
472+
@Override
473+
public void onFailure(Exception e) {
474+
logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e);
475+
removeSnapshotFromClusterState(snapshot.snapshot(), null, e, new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
476+
}
477+
});
471478
}
472479

473480
private class CleanupAfterErrorListener implements ActionListener<SnapshotInfo> {
@@ -958,9 +965,10 @@ void endSnapshot(final SnapshotsInProgress.Entry entry) {
958965
* @param failure failure reason or null if snapshot was successful
959966
*/
960967
private void endSnapshot(final SnapshotsInProgress.Entry entry, final String failure) {
961-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
962-
final Snapshot snapshot = entry.snapshot();
963-
try {
968+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
969+
@Override
970+
protected void doRun() {
971+
final Snapshot snapshot = entry.snapshot();
964972
final Repository repository = repositoriesService.repository(snapshot.getRepository());
965973
logger.trace("[{}] finalizing snapshot in repository, state: [{}], failure[{}]", snapshot, entry.state(), failure);
966974
ArrayList<SnapshotShardFailure> shardFailures = new ArrayList<>();
@@ -982,7 +990,11 @@ private void endSnapshot(final SnapshotsInProgress.Entry entry, final String fai
982990
entry.includeGlobalState());
983991
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
984992
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
985-
} catch (Exception e) {
993+
}
994+
995+
@Override
996+
public void onFailure(final Exception e) {
997+
Snapshot snapshot = entry.snapshot();
986998
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
987999
removeSnapshotFromClusterState(snapshot, null, e);
9881000
}

0 commit comments

Comments
 (0)