Skip to content

Commit 1479a9e

Browse files
committed
Fix synchronization in ShardFollowNodeTask (#60490)
The leader mapping, settings, and aliases versions in a shard follow-task are updated without proper synchronization and can go backward.
1 parent a479a2a commit 1479a9e

File tree

2 files changed

+140
-22
lines changed

2 files changed

+140
-22
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -151,33 +151,31 @@ void start(
151151
// updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
152152
updateMapping(0L, leaderMappingVersion -> {
153153
synchronized (ShardFollowNodeTask.this) {
154-
currentMappingVersion = leaderMappingVersion;
154+
currentMappingVersion = Math.max(currentMappingVersion, leaderMappingVersion);
155155
}
156156
updateSettings(leaderSettingsVersion -> {
157157
synchronized (ShardFollowNodeTask.this) {
158-
currentSettingsVersion = leaderSettingsVersion;
158+
currentSettingsVersion = Math.max(currentSettingsVersion, leaderSettingsVersion);
159159
}
160-
});
161-
updateAliases(leaderAliasesVersion -> {
162-
synchronized (ShardFollowNodeTask.this) {
163-
currentAliasesVersion = leaderAliasesVersion;
164-
}
165-
});
166-
synchronized (ShardFollowNodeTask.this) {
167-
LOGGER.info(
168-
"{} following leader shard {}, " +
160+
updateAliases(leaderAliasesVersion -> {
161+
synchronized (ShardFollowNodeTask.this) {
162+
currentAliasesVersion = Math.max(currentAliasesVersion, leaderAliasesVersion);
163+
LOGGER.info(
164+
"{} following leader shard {}, " +
169165
"follower global checkpoint=[{}], " +
170166
"mapping version=[{}], " +
171167
"settings version=[{}], " +
172168
"aliases version=[{}]",
173-
params.getFollowShardId(),
174-
params.getLeaderShardId(),
175-
followerGlobalCheckpoint,
176-
currentMappingVersion,
177-
currentSettingsVersion,
178-
currentAliasesVersion);
179-
}
180-
coordinateReads();
169+
params.getFollowShardId(),
170+
params.getLeaderShardId(),
171+
followerGlobalCheckpoint,
172+
currentMappingVersion,
173+
currentSettingsVersion,
174+
currentAliasesVersion);
175+
}
176+
coordinateReads();
177+
});
178+
});
181179
});
182180
}
183181

@@ -446,7 +444,9 @@ private synchronized void maybeUpdateMapping(long minimumRequiredMappingVersion,
446444
LOGGER.trace("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]",
447445
params.getFollowShardId(), currentMappingVersion, minimumRequiredMappingVersion);
448446
updateMapping(minimumRequiredMappingVersion, mappingVersion -> {
449-
currentMappingVersion = mappingVersion;
447+
synchronized (ShardFollowNodeTask.this) {
448+
currentMappingVersion = Math.max(currentMappingVersion, mappingVersion);
449+
}
450450
task.run();
451451
});
452452
}
@@ -461,7 +461,9 @@ private synchronized void maybeUpdateSettings(final Long minimumRequiredSettings
461461
LOGGER.trace("{} updating settings, settings version [{}] is lower than minimum required settings version [{}]",
462462
params.getFollowShardId(), currentSettingsVersion, minimumRequiredSettingsVersion);
463463
updateSettings(settingsVersion -> {
464-
currentSettingsVersion = settingsVersion;
464+
synchronized (ShardFollowNodeTask.this) {
465+
currentSettingsVersion = Math.max(currentSettingsVersion, settingsVersion);
466+
}
465467
task.run();
466468
});
467469
}
@@ -482,7 +484,9 @@ private synchronized void maybeUpdateAliases(final Long minimumRequiredAliasesVe
482484
currentAliasesVersion,
483485
minimumRequiredAliasesVersion);
484486
updateAliases(aliasesVersion -> {
485-
currentAliasesVersion = aliasesVersion;
487+
synchronized (ShardFollowNodeTask.this) {
488+
currentAliasesVersion = Math.max(currentAliasesVersion, aliasesVersion);
489+
}
486490
task.run();
487491
});
488492
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.elasticsearch.index.translog.Translog;
2121
import org.elasticsearch.test.ESTestCase;
2222
import org.elasticsearch.threadpool.Scheduler;
23+
import org.elasticsearch.threadpool.TestThreadPool;
24+
import org.elasticsearch.threadpool.ThreadPool;
2325
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
2426
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
2527

@@ -33,6 +35,7 @@
3335
import java.util.Map;
3436
import java.util.Queue;
3537
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.Phaser;
3639
import java.util.concurrent.ScheduledFuture;
3740
import java.util.concurrent.ScheduledThreadPoolExecutor;
3841
import java.util.concurrent.TimeUnit;
@@ -1122,6 +1125,117 @@ public void testRetentionLeaseRenewal() throws InterruptedException {
11221125
}
11231126
}
11241127

1128+
public void testUpdateMappingSettingsAndAliasesConcurrently() throws Exception {
1129+
final ShardFollowTask followTask = new ShardFollowTask(
1130+
"test",
1131+
new ShardId("leader_index", "", 0),
1132+
new ShardId("follow_index", "", 0),
1133+
Integer.MAX_VALUE,
1134+
Integer.MAX_VALUE,
1135+
Integer.MAX_VALUE,
1136+
Integer.MAX_VALUE,
1137+
new ByteSizeValue(Long.MAX_VALUE),
1138+
new ByteSizeValue(Long.MAX_VALUE),
1139+
Integer.MAX_VALUE,
1140+
new ByteSizeValue(Long.MAX_VALUE),
1141+
TimeValue.ZERO,
1142+
TimeValue.ZERO,
1143+
Collections.emptyMap()
1144+
);
1145+
final ThreadPool threadPool = new TestThreadPool(getTestClass().getSimpleName());
1146+
final AtomicLong leaderMappingVersion = new AtomicLong(0L);
1147+
final AtomicLong followerMappingVersion = new AtomicLong(0L);
1148+
final AtomicLong leaderSettingsVersion = new AtomicLong(0L);
1149+
final AtomicLong followerSettingsVersion = new AtomicLong(0L);
1150+
final AtomicLong leaderAliasesVersion = new AtomicLong(0L);
1151+
final AtomicLong followerAliasesVersion = new AtomicLong(0L);
1152+
1153+
final Phaser updates = new Phaser(1);
1154+
final ShardFollowNodeTask shardFollowNodeTask = new ShardFollowNodeTask(
1155+
1L, "type", ShardFollowTask.NAME, "description", null, Collections.emptyMap(), followTask, scheduler, System::nanoTime) {
1156+
@Override
1157+
protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer handler, Consumer<Exception> errorHandler) {
1158+
updates.register();
1159+
final long fetchedVersion = randomLongBetween(minRequiredMappingVersion, leaderMappingVersion.get());
1160+
followerMappingVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
1161+
threadPool.generic().execute(() -> {
1162+
handler.accept(fetchedVersion);
1163+
updates.arriveAndDeregister();
1164+
});
1165+
}
1166+
1167+
@Override
1168+
protected void innerUpdateSettings(LongConsumer handler, Consumer<Exception> errorHandler) {
1169+
updates.register();
1170+
final long fetchedVersion = randomLongBetween(0L, leaderSettingsVersion.get());
1171+
followerSettingsVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
1172+
threadPool.generic().execute(() -> {
1173+
handler.accept(fetchedVersion);
1174+
updates.arriveAndDeregister();
1175+
});
1176+
}
1177+
1178+
@Override
1179+
protected void innerUpdateAliases(LongConsumer handler, Consumer<Exception> errorHandler) {
1180+
updates.register();
1181+
final long fetchedVersion = randomLongBetween(0L, leaderAliasesVersion.get());
1182+
followerAliasesVersion.updateAndGet(curr -> Math.max(curr, fetchedVersion));
1183+
threadPool.generic().execute(() -> {
1184+
handler.accept(fetchedVersion);
1185+
updates.arriveAndDeregister();
1186+
});
1187+
}
1188+
1189+
@Override
1190+
protected void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
1191+
List<Translog.Operation> operations,
1192+
long leaderMaxSeqNoOfUpdatesOrDeletes,
1193+
Consumer<BulkShardOperationsResponse> handler,
1194+
Consumer<Exception> errorHandler) {
1195+
1196+
}
1197+
1198+
@Override
1199+
protected void innerSendShardChangesRequest(long from, int maxOperationCount,
1200+
Consumer<ShardChangesAction.Response> handler,
1201+
Consumer<Exception> errorHandler) {
1202+
1203+
}
1204+
1205+
@Override
1206+
protected Scheduler.Cancellable scheduleBackgroundRetentionLeaseRenewal(LongSupplier followerGlobalCheckpoint) {
1207+
return null;
1208+
}
1209+
1210+
@Override
1211+
synchronized void coordinateReads() {
1212+
1213+
}
1214+
};
1215+
int responses = between(10, 5000);
1216+
for (int i = 0; i < responses; i++) {
1217+
ShardChangesAction.Response response = new ShardChangesAction.Response(
1218+
leaderMappingVersion.addAndGet(between(0, Integer.MAX_VALUE)),
1219+
leaderSettingsVersion.addAndGet(between(0, Integer.MAX_VALUE)),
1220+
leaderAliasesVersion.addAndGet(between(0, Integer.MAX_VALUE)),
1221+
SequenceNumbers.NO_OPS_PERFORMED,
1222+
SequenceNumbers.NO_OPS_PERFORMED,
1223+
-1,
1224+
new Translog.Operation[0],
1225+
randomLong()
1226+
);
1227+
shardFollowNodeTask.handleReadResponse(0, -1, response);
1228+
}
1229+
try {
1230+
updates.arriveAndAwaitAdvance();
1231+
final ShardFollowNodeTaskStatus status = shardFollowNodeTask.getStatus();
1232+
assertThat(status.followerMappingVersion(), equalTo(followerMappingVersion.get()));
1233+
assertThat(status.followerSettingsVersion(), equalTo(followerSettingsVersion.get()));
1234+
assertThat(status.followerAliasesVersion(), equalTo(followerAliasesVersion.get()));
1235+
} finally {
1236+
terminate(threadPool);
1237+
}
1238+
}
11251239

11261240
static final class ShardFollowTaskParams {
11271241
private String remoteCluster = null;

0 commit comments

Comments
 (0)