diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java index 4ebb72a220412..20f1c490d1014 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProvider.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; @@ -116,6 +117,10 @@ private static Function createAlignTimestampFunction(TransformConfig if (Boolean.FALSE.equals(transformConfig.getSettings().getAlignCheckpoints())) { return identity(); } + // In case of transforms created before aligning timestamp optimization was introduced we assume the default was "false". + if (transformConfig.getVersion().before(Version.V_7_15_0)) { + return identity(); + } if (transformConfig.getPivotConfig() == null) { return identity(); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java index ccdaacd200118..4c10834d82cc1 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/checkpoint/TimeBasedCheckpointProviderTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.transform.transforms.SettingsConfig; import org.elasticsearch.xpack.core.transform.transforms.TimeSyncConfig; @@ -94,6 +95,7 @@ public void testSourceHasChanged_NotChanged() throws InterruptedException { 0, false, TransformCheckpoint.EMPTY, + VersionUtils.randomVersionBetween(random(), Version.V_7_15_0, Version.CURRENT), TIMESTAMP_FIELD, TimeValue.timeValueMinutes(10), TimeValue.ZERO, @@ -101,11 +103,26 @@ public void testSourceHasChanged_NotChanged() throws InterruptedException { ); } + public void testSourceHasChanged_NotChanged_DoNotAlignCheckpointsBecauseOfVersion() throws InterruptedException { + testSourceHasChanged( + 0, + false, + TransformCheckpoint.EMPTY, + Version.V_7_14_0, + TIMESTAMP_FIELD, + TimeValue.timeValueMinutes(10), + TimeValue.ZERO, + // Checkpoint alignment doesn't work here because the transform was created without alignment. + tuple(0L, 123456789L) + ); + } + public void testSourceHasChanged_Changed() throws InterruptedException { testSourceHasChanged( 1, true, TransformCheckpoint.EMPTY, + Version.CURRENT, TIMESTAMP_FIELD, TimeValue.timeValueMinutes(10), TimeValue.ZERO, @@ -118,6 +135,7 @@ public void testSourceHasChanged_UnfinishedCheckpoint() throws InterruptedExcept 0, false, new TransformCheckpoint("", 100000000L, 7, emptyMap(), null), + Version.CURRENT, TIMESTAMP_FIELD, TimeValue.timeValueMinutes(10), TimeValue.ZERO, @@ -130,6 +148,7 @@ public void testSourceHasChanged_SubsequentCheckpoint() throws InterruptedExcept 0, false, new TransformCheckpoint("", 100000000L, 7, emptyMap(), 120000000L), + Version.CURRENT, TIMESTAMP_FIELD, TimeValue.timeValueMinutes(10), TimeValue.ZERO, @@ -142,6 +161,7 @@ public void testSourceHasChanged_WithDelay() throws InterruptedException { 0, false, new TransformCheckpoint("", 100000000L, 7, emptyMap(), 120000000L), + Version.CURRENT, TIMESTAMP_FIELD, TimeValue.timeValueMinutes(10), TimeValue.timeValueMinutes(5), @@ -153,6 +173,7 @@ private void testSourceHasChanged( long totalHits, boolean expectedHasChangedValue, TransformCheckpoint lastCheckpoint, + Version transformVersion, String dateHistogramField, TimeValue dateHistogramInterval, TimeValue delay, @@ -162,6 +183,7 @@ private void testSourceHasChanged( String transformId = getTestName(); TransformConfig transformConfig = newTransformConfigWithDateHistogram( transformId, + transformVersion, dateHistogramField, dateHistogramInterval, delay @@ -249,6 +271,7 @@ private void testCreateNextCheckpoint( TransformConfig transformConfig = newTransformConfigWithDateHistogram( transformId, + Version.CURRENT, dateHistogramField, dateHistogramInterval, delay @@ -280,6 +303,7 @@ private TimeBasedCheckpointProvider newCheckpointProvider(TransformConfig transf private static TransformConfig newTransformConfigWithDateHistogram( String transformId, + Version transformVersion, String dateHistogramField, TimeValue dateHistogramInterval, TimeValue delay @@ -316,7 +340,7 @@ public SingleGroupSource get() { } else { // Leave align_checkpoints setting unset. This will be interpreted as "true". } - return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId)).setSettings( + return new TransformConfig.Builder(TransformConfigTests.randomTransformConfig(transformId, transformVersion)).setSettings( settingsConfigBuilder.build() ).setPivotConfig(pivotConfigWithDateHistogramSource).setSyncConfig(new TimeSyncConfig(TIMESTAMP_FIELD, delay)).build(); }