Skip to content

Commit e792ed2

Browse files
author
Hendrik Muhs
committed
[Transform] do not fail checkpoint creation due to global checkpoint mismatch (#48423)
Take the max if global checkpoints mismatch instead of throwing an exception. It turned out global checkpoints can mismatch by design fixes #48379
1 parent 39aac22 commit e792ed2

File tree

2 files changed

+18
-18
lines changed

2 files changed

+18
-18
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/DefaultCheckpointProvider.java

+7-9
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import org.elasticsearch.client.Client;
2121
import org.elasticsearch.common.util.set.Sets;
2222
import org.elasticsearch.xpack.core.ClientHelper;
23-
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
2423
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpoint;
2524
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointStats;
2625
import org.elasticsearch.xpack.core.transform.transforms.TransformCheckpointingInfo;
2726
import org.elasticsearch.xpack.core.transform.transforms.TransformConfig;
27+
import org.elasticsearch.xpack.core.transform.transforms.TransformIndexerPosition;
2828
import org.elasticsearch.xpack.core.transform.transforms.TransformProgress;
2929
import org.elasticsearch.xpack.transform.notifications.TransformAuditor;
3030
import org.elasticsearch.xpack.transform.persistence.TransformConfigManager;
@@ -188,14 +188,12 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
188188
if (checkpointsByIndex.containsKey(indexName)) {
189189
// we have already seen this index, just check/add shards
190190
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);
191-
if (checkpoints.containsKey(shard.getShardRouting().getId())) {
192-
// there is already a checkpoint entry for this index/shard combination, check if they match
193-
if (checkpoints.get(shard.getShardRouting().getId()) != globalCheckpoint) {
194-
throw new CheckpointException("Global checkpoints mismatch for index [" + indexName + "] between shards of id ["
195-
+ shard.getShardRouting().getId() + "]");
196-
}
197-
} else {
198-
// 1st time we see this shard for this index, add the entry for the shard
191+
// 1st time we see this shard for this index, add the entry for the shard
192+
// or there is already a checkpoint entry for this index/shard combination
193+
// but with a higher global checkpoint. This is by design(not a problem) and
194+
// we take the higher value
195+
if (checkpoints.containsKey(shard.getShardRouting().getId()) == false
196+
|| checkpoints.get(shard.getShardRouting().getId()) < globalCheckpoint) {
199197
checkpoints.put(shard.getShardRouting().getId(), globalCheckpoint);
200198
}
201199
} else {

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TransformsCheckpointServiceTests.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@
4343
import java.util.Map.Entry;
4444
import java.util.Set;
4545

46-
import static org.hamcrest.Matchers.containsString;
47-
4846
public class TransformsCheckpointServiceTests extends ESTestCase {
4947

5048
public void testExtractIndexCheckpoints() {
@@ -104,11 +102,15 @@ public void testExtractIndexCheckpointsInconsistentGlobalCheckpoints() {
104102

105103
ShardStats[] shardStatsArray = createRandomShardStats(expectedCheckpoints, indices, randomBoolean(), true, false);
106104

107-
// fail
108-
CheckpointException e = expectThrows(CheckpointException.class,
109-
() -> DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices));
105+
Map<String, long[]> checkpoints = DefaultCheckpointProvider.extractIndexCheckPoints(shardStatsArray, indices);
106+
107+
assertEquals(expectedCheckpoints.size(), checkpoints.size());
108+
assertEquals(expectedCheckpoints.keySet(), checkpoints.keySet());
110109

111-
assertThat(e.getMessage(), containsString("Global checkpoints mismatch"));
110+
// global checkpoints should be max() of all global checkpoints
111+
for (Entry<String, long[]> entry : expectedCheckpoints.entrySet()) {
112+
assertArrayEquals(entry.getValue(), checkpoints.get(entry.getKey()));
113+
}
112114
}
113115

114116
/**
@@ -176,8 +178,8 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC
176178
}
177179

178180
// SeqNoStats asserts that checkpoints are logical
179-
long localCheckpoint = randomLongBetween(0L, 100000000L);
180-
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(0L, 100000000L);
181+
long localCheckpoint = randomLongBetween(100L, 100000000L);
182+
long globalCheckpoint = randomBoolean() ? localCheckpoint : randomLongBetween(100L, 100000000L);
181183
long maxSeqNo = Math.max(localCheckpoint, globalCheckpoint);
182184

183185
SeqNoStats validSeqNoStats = null;
@@ -221,7 +223,7 @@ private static ShardStats[] createRandomShardStats(Map<String, long[]> expectedC
221223
if (inconsistentReplica == replica) {
222224
// overwrite
223225
SeqNoStats invalidSeqNoStats =
224-
new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint + randomLongBetween(10L, 100L));
226+
new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint - randomLongBetween(10L, 100L));
225227
shardStats.add(
226228
new ShardStats(shardRouting,
227229
new ShardPath(false, path, path, shardId), stats, null, invalidSeqNoStats, null));

0 commit comments

Comments
 (0)