Skip to content

Commit d26002a

Browse files
authored
[Transform] Do not align checkpoints for transforms created before 7.15. (#81729)
1 parent 171335d commit d26002a

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.Version;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.search.SearchAction;
1415
import org.elasticsearch.action.search.SearchRequest;
@@ -116,6 +117,10 @@ private static Function<Long, Long> createAlignTimestampFunction(TransformConfig
116117
if (Boolean.FALSE.equals(transformConfig.getSettings().getAlignCheckpoints())) {
117118
return identity();
118119
}
120+
// In case of transforms created before aligning timestamp optimization was introduced we assume the default was "false".
121+
if (transformConfig.getVersion().before(Version.V_7_15_0)) {
122+
return identity();
123+
}
119124
if (transformConfig.getPivotConfig() == null) {
120125
return identity();
121126
}

x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.search.SearchHits;
3535
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
3636
import org.elasticsearch.test.ESTestCase;
37+
import org.elasticsearch.test.VersionUtils;
3738
import org.elasticsearch.threadpool.ThreadPool;
3839
import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig;
3940
import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig;
@@ -94,18 +95,34 @@ public void testSourceHasChanged_NotChanged() throws InterruptedException {
9495
0,
9596
false,
9697
TransformCheckpoint.EMPTY,
98+
VersionUtils.randomVersionBetween(random(), Version.V_7_15_0, Version.CURRENT),
9799
TIMESTAMP_FIELD,
98100
TimeValue.timeValueMinutes(10),
99101
TimeValue.ZERO,
100102
tuple(0L, 123000000L)
101103
);
102104
}
103105

106+
public void testSourceHasChanged_NotChanged_DoNotAlignCheckpointsBecauseOfVersion() throws InterruptedException {
107+
testSourceHasChanged(
108+
0,
109+
false,
110+
TransformCheckpoint.EMPTY,
111+
Version.V_7_14_0,
112+
TIMESTAMP_FIELD,
113+
TimeValue.timeValueMinutes(10),
114+
TimeValue.ZERO,
115+
// Checkpoint alignment doesn't work here because the transform was created without alignment.
116+
tuple(0L, 123456789L)
117+
);
118+
}
119+
104120
public void testSourceHasChanged_Changed() throws InterruptedException {
105121
testSourceHasChanged(
106122
1,
107123
true,
108124
TransformCheckpoint.EMPTY,
125+
Version.CURRENT,
109126
TIMESTAMP_FIELD,
110127
TimeValue.timeValueMinutes(10),
111128
TimeValue.ZERO,
@@ -118,6 +135,7 @@ public void testSourceHasChanged_UnfinishedCheckpoint() throws InterruptedExcept
118135
0,
119136
false,
120137
new TransformCheckpoint("", 100000000L, 7, emptyMap(), null),
138+
Version.CURRENT,
121139
TIMESTAMP_FIELD,
122140
TimeValue.timeValueMinutes(10),
123141
TimeValue.ZERO,
@@ -130,6 +148,7 @@ public void testSourceHasChanged_SubsequentCheckpoint() throws InterruptedExcept
130148
0,
131149
false,
132150
new TransformCheckpoint("", 100000000L, 7, emptyMap(), 120000000L),
151+
Version.CURRENT,
133152
TIMESTAMP_FIELD,
134153
TimeValue.timeValueMinutes(10),
135154
TimeValue.ZERO,
@@ -142,6 +161,7 @@ public void testSourceHasChanged_WithDelay() throws InterruptedException {
142161
0,
143162
false,
144163
new TransformCheckpoint("", 100000000L, 7, emptyMap(), 120000000L),
164+
Version.CURRENT,
145165
TIMESTAMP_FIELD,
146166
TimeValue.timeValueMinutes(10),
147167
TimeValue.timeValueMinutes(5),
@@ -153,6 +173,7 @@ private void testSourceHasChanged(
153173
long totalHits,
154174
boolean expectedHasChangedValue,
155175
TransformCheckpoint lastCheckpoint,
176+
Version transformVersion,
156177
String dateHistogramField,
157178
TimeValue dateHistogramInterval,
158179
TimeValue delay,
@@ -162,6 +183,7 @@ private void testSourceHasChanged(
162183
String transformId = getTestName();
163184
TransformConfig transformConfig = newTransformConfigWithDateHistogram(
164185
transformId,
186+
transformVersion,
165187
dateHistogramField,
166188
dateHistogramInterval,
167189
delay
@@ -249,6 +271,7 @@ private void testCreateNextCheckpoint(
249271

250272
TransformConfig transformConfig = newTransformConfigWithDateHistogram(
251273
transformId,
274+
Version.CURRENT,
252275
dateHistogramField,
253276
dateHistogramInterval,
254277
delay
@@ -280,6 +303,7 @@ private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transf
280303

281304
private static TransformConfig newTransformConfigWithDateHistogram(
282305
String transformId,
306+
Version transformVersion,
283307
String dateHistogramField,
284308
TimeValue dateHistogramInterval,
285309
TimeValue delay
@@ -316,7 +340,7 @@ public SingleGroupSource get() {
316340
} else {
317341
// Leave align_checkpoints setting unset. This will be interpreted as "true".
318342
}
319-
return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSettings(
343+
return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId, transformVersion)).setSettings(
320344
settingsConfigBuilder.build()
321345
).setPivotConfig(pivotConfigWithDateHistogramSource).setSyncConfig(new TimeSyncConfig(TIMESTAMP_FIELD, delay)).build();
322346
}

0 commit comments

Comments
 (0)