Skip to content

Commit c5baf47

Browse files
authored
Fix race condition in Feature Migration Status API (#80572)
Prior to this commit, there is a race condition in the Feature Migration Status API where the returned status can be `MIGRATION_NEEDED`, even if a migration is already in progress (and therefore the returned value should have been `IN_PROGRESS`). This commit adds a test for this case which reliably fails without the fix, and fixes the bug. The fix is straightforward: While we already examine the persistent task metadata to determine progress, the part of that metadata that we examined did was not updated until the task's been running for a bit. However, if we check for the *existence* of the task metadata, that is guaranteed to be in the cluster state by the time the request to start the migration completes (and is removed immediately after the task finishes - that's why we have separate metadata for the migration results instead of just using the task state).
1 parent 2cb182f commit c5baf47

File tree

2 files changed

+56
-11
lines changed

2 files changed

+56
-11
lines changed

modules/reindex/src/internalClusterTest/java/org/elasticsearch/migration/FeatureMigrationIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,43 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
8585
return plugins;
8686
}
8787

88+
public void testStartMigrationAndImmediatelyCheckStatus() throws Exception {
89+
createSystemIndexForDescriptor(INTERNAL_MANAGED);
90+
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);
91+
createSystemIndexForDescriptor(EXTERNAL_MANAGED);
92+
createSystemIndexForDescriptor(EXTERNAL_UNMANAGED);
93+
94+
TestPlugin.preMigrationHook.set((state) -> Collections.emptyMap());
95+
TestPlugin.postMigrationHook.set((state, metadata) -> {});
96+
97+
ensureGreen();
98+
99+
PostFeatureUpgradeRequest migrationRequest = new PostFeatureUpgradeRequest();
100+
GetFeatureUpgradeStatusRequest getStatusRequest = new GetFeatureUpgradeStatusRequest();
101+
102+
// Start the migration and *immediately* request the status. We're trying to detect a race condition with this test, so we need to
103+
// do this as fast as possible, but not before the request to start the migration completes.
104+
PostFeatureUpgradeResponse migrationResponse = client().execute(PostFeatureUpgradeAction.INSTANCE, migrationRequest).get();
105+
GetFeatureUpgradeStatusResponse statusResponse = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest).get();
106+
107+
// Make sure we actually started the migration
108+
final Set<String> migratingFeatures = migrationResponse.getFeatures()
109+
.stream()
110+
.map(PostFeatureUpgradeResponse.Feature::getFeatureName)
111+
.collect(Collectors.toSet());
112+
assertThat(migratingFeatures, hasItem(FEATURE_NAME));
113+
114+
// We should see that the migration is in progress even though we just started the migration.
115+
assertThat(statusResponse.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.IN_PROGRESS));
116+
117+
// Now wait for the migration to finish (otherwise the test infra explodes)
118+
assertBusy(() -> {
119+
GetFeatureUpgradeStatusResponse statusResp = client().execute(GetFeatureUpgradeStatusAction.INSTANCE, getStatusRequest).get();
120+
logger.info(Strings.toString(statusResp));
121+
assertThat(statusResp.getUpgradeStatus(), equalTo(GetFeatureUpgradeStatusResponse.UpgradeStatus.NO_MIGRATION_NEEDED));
122+
});
123+
}
124+
88125
public void testMigrateInternalManagedSystemIndex() throws Exception {
89126
createSystemIndexForDescriptor(INTERNAL_MANAGED);
90127
createSystemIndexForDescriptor(INTERNAL_UNMANAGED);

server/src/main/java/org/elasticsearch/action/admin/cluster/migration/TransportGetFeatureUpgradeStatusAction.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,20 @@
2020
import org.elasticsearch.common.inject.Inject;
2121
import org.elasticsearch.indices.SystemIndices;
2222
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
23+
import org.elasticsearch.persistent.PersistentTasksService;
2324
import org.elasticsearch.tasks.Task;
2425
import org.elasticsearch.threadpool.ThreadPool;
2526
import org.elasticsearch.transport.TransportService;
2627
import org.elasticsearch.upgrades.FeatureMigrationResults;
2728
import org.elasticsearch.upgrades.SingleFeatureMigrationResult;
29+
import org.elasticsearch.upgrades.SystemIndexMigrationTaskParams;
2830
import org.elasticsearch.upgrades.SystemIndexMigrationTaskState;
2931

3032
import java.util.Comparator;
3133
import java.util.List;
3234
import java.util.Optional;
3335
import java.util.stream.Collectors;
36+
import java.util.stream.Stream;
3437

3538
import static org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.UpgradeStatus.ERROR;
3639
import static org.elasticsearch.action.admin.cluster.migration.GetFeatureUpgradeStatusResponse.UpgradeStatus.IN_PROGRESS;
@@ -51,6 +54,7 @@ public class TransportGetFeatureUpgradeStatusAction extends TransportMasterNodeA
5154
public static final Version NO_UPGRADE_REQUIRED_VERSION = Version.V_8_0_0;
5255

5356
private final SystemIndices systemIndices;
57+
PersistentTasksService persistentTasksService;
5458

5559
@Inject
5660
public TransportGetFeatureUpgradeStatusAction(
@@ -59,6 +63,7 @@ public TransportGetFeatureUpgradeStatusAction(
5963
ActionFilters actionFilters,
6064
ClusterService clusterService,
6165
IndexNameExpressionResolver indexNameExpressionResolver,
66+
PersistentTasksService persistentTasksService,
6267
SystemIndices systemIndices
6368
) {
6469
super(
@@ -73,6 +78,7 @@ public TransportGetFeatureUpgradeStatusAction(
7378
ThreadPool.Names.SAME
7479
);
7580
this.systemIndices = systemIndices;
81+
this.persistentTasksService = persistentTasksService;
7682
}
7783

7884
@Override
@@ -90,24 +96,26 @@ protected void masterOperation(
9096
.map(feature -> getFeatureUpgradeStatus(state, feature))
9197
.collect(Collectors.toList());
9298

93-
GetFeatureUpgradeStatusResponse.UpgradeStatus status = features.stream()
94-
.map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getUpgradeStatus)
95-
.reduce(GetFeatureUpgradeStatusResponse.UpgradeStatus::combine)
96-
.orElseGet(() -> {
97-
assert false : "get feature statuses API doesn't have any features";
98-
return NO_MIGRATION_NEEDED;
99-
});
99+
boolean migrationTaskExists = PersistentTasksCustomMetadata.getTaskWithId(state, SYSTEM_INDEX_UPGRADE_TASK_NAME) != null;
100+
GetFeatureUpgradeStatusResponse.UpgradeStatus initalStatus = migrationTaskExists ? IN_PROGRESS : NO_MIGRATION_NEEDED;
101+
102+
GetFeatureUpgradeStatusResponse.UpgradeStatus status = Stream.concat(
103+
Stream.of(initalStatus),
104+
features.stream().map(GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus::getUpgradeStatus)
105+
).reduce(GetFeatureUpgradeStatusResponse.UpgradeStatus::combine).orElseGet(() -> {
106+
assert false : "get feature statuses API doesn't have any features";
107+
return NO_MIGRATION_NEEDED;
108+
});
100109

101110
listener.onResponse(new GetFeatureUpgradeStatusResponse(features, status));
102111
}
103112

104113
static GetFeatureUpgradeStatusResponse.FeatureUpgradeStatus getFeatureUpgradeStatus(ClusterState state, SystemIndices.Feature feature) {
105114
String featureName = feature.getName();
106115

107-
final String currentFeature = Optional.ofNullable(
108-
state.metadata().<PersistentTasksCustomMetadata>custom(PersistentTasksCustomMetadata.TYPE)
109-
)
110-
.map(tasksMetdata -> tasksMetdata.getTask(SYSTEM_INDEX_UPGRADE_TASK_NAME))
116+
PersistentTasksCustomMetadata.PersistentTask<SystemIndexMigrationTaskParams> migrationTask = PersistentTasksCustomMetadata
117+
.getTaskWithId(state, SYSTEM_INDEX_UPGRADE_TASK_NAME);
118+
final String currentFeature = Optional.ofNullable(migrationTask)
111119
.map(task -> task.getState())
112120
.map(taskState -> ((SystemIndexMigrationTaskState) taskState).getCurrentFeature())
113121
.orElse(null);

0 commit comments

Comments
 (0)