Skip to content

Commit 7063a40

Browse files
authored
[7.x] [ML][Data Frame] Adding bwc tests for pivot transform (#43506) (#43929)
* [ML][Data Frame] Adding bwc tests for pivot transform (#43506) * [ML][Data Frame] Adding bwc tests for pivot transform * adding continuous transforms * adding continuous dataframes to bwc * adding continuous data frame tests * Adding rolling upgrade tests for continuous df * Fixing test * Adjusting indices used in BWC, and handling NPE for seq_no_stats * updating and muting specific bwc test * Adjusting bwc tests for backport
1 parent 553f783 commit 7063a40

File tree

6 files changed

+523
-7
lines changed

6 files changed

+523
-7
lines changed

x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/checkpoint/DataFrameTransformsCheckpointService.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
1616
import org.elasticsearch.action.admin.indices.stats.ShardStats;
1717
import org.elasticsearch.client.Client;
18+
import org.elasticsearch.common.Strings;
19+
import org.elasticsearch.index.seqno.SeqNoStats;
1820
import org.elasticsearch.xpack.core.ClientHelper;
1921
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpoint;
2022
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformCheckpointStats;
@@ -222,6 +224,16 @@ static Map<String, long[]> extractIndexCheckPoints(ShardStats[] shards, Set<Stri
222224
for (ShardStats shard : shards) {
223225
String indexName = shard.getShardRouting().getIndexName();
224226
if (userIndices.contains(indexName)) {
227+
SeqNoStats seqNoStats = shard.getSeqNoStats();
228+
// SeqNoStats could be `null`. This indicates that an `AlreadyClosed` exception was thrown somewhere down the stack
229+
// Indicates that the index COULD be closed, or at least that the shard is not fully recovered yet.
230+
if (seqNoStats == null) {
231+
logger.warn("failure gathering checkpoint information for index [{}] as seq_no_stats were null. Shard Stats [{}]",
232+
indexName,
233+
Strings.toString(shard));
234+
throw new CheckpointException(
235+
"Unable to gather checkpoint information for index [" + indexName + "]. seq_no_stats are missing.");
236+
}
225237
if (checkpointsByIndex.containsKey(indexName)) {
226238
// we have already seen this index, just check/add shards
227239
TreeMap<Integer, Long> checkpoints = checkpointsByIndex.get(indexName);

x-pack/qa/rolling-upgrade/build.gradle

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,12 @@ for (Version version : bwcVersions.wireCompatible) {
168168
oldClusterTestRunner.configure {
169169
systemProperty 'tests.rest.suite', 'old_cluster'
170170
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
171+
// Dataframe transforms were not added until 7.2.0
172+
if (version.before('7.2.0')) {
173+
systemProperty 'tests.rest.blacklist', [
174+
'old_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on old cluster'
175+
].join(',')
176+
}
171177
}
172178

173179
Closure configureUpgradeCluster = {String name, Task lastRunner, int stopNode, Closure getOtherUnicastHostAddresses ->
@@ -227,12 +233,18 @@ for (Version version : bwcVersions.wireCompatible) {
227233
systemProperty 'tests.first_round', 'true'
228234
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
229235
// We only need to run these tests once so we may as well do it when we're two thirds upgraded
230-
systemProperty 'tests.rest.blacklist', [
231-
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
232-
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
233-
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
234-
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster'
235-
].join(',')
236+
def toBlackList = [
237+
'mixed_cluster/10_basic/Start scroll in mixed cluster on upgraded node that we will continue after upgrade',
238+
'mixed_cluster/30_ml_jobs_crud/Create a job in the mixed cluster and write some data',
239+
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed without aggs in mixed cluster',
240+
'mixed_cluster/40_ml_datafeed_crud/Put job and datafeed with aggs in mixed cluster',
241+
'mixed_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on mixed cluster'
242+
]
243+
// Dataframe transforms were not added until 7.2.0
244+
if (version.before('7.2.0')) {
245+
toBlackList << 'mixed_cluster/80_data_frame_jobs_crud/Test GET, start, and stop old cluster batch transforms'
246+
}
247+
systemProperty 'tests.rest.blacklist', toBlackList.join(',')
236248
finalizedBy "${baseName}#oldClusterTestCluster#node1.stop"
237249
}
238250

@@ -248,6 +260,14 @@ for (Version version : bwcVersions.wireCompatible) {
248260
systemProperty 'tests.first_round', 'false'
249261
systemProperty 'tests.upgrade_from_version', version.toString().replace('-SNAPSHOT', '')
250262
finalizedBy "${baseName}#oldClusterTestCluster#node2.stop"
263+
// Dataframe transforms were not added until 7.2.0
264+
if (version.before('7.2.0')) {
265+
systemProperty 'tests.rest.blacklist', [
266+
'mixed_cluster/80_data_frame_jobs_crud/Test put batch data frame transforms on mixed cluster',
267+
'mixed_cluster/80_data_frame_jobs_crud/Test GET, start, and stop old cluster batch transforms'
268+
269+
].join(',')
270+
}
251271
}
252272

253273
Task upgradedClusterTest = tasks.create(name: "${baseName}#upgradedClusterTest", type: RestIntegTestTask)
@@ -272,12 +292,20 @@ for (Version version : bwcVersions.wireCompatible) {
272292
// otherwise we could check the index created version
273293
String versionStr = project.extensions.findByName("${baseName}#oldClusterTestCluster").properties.get('bwcVersion')
274294
String[] versionParts = versionStr.split('\\.')
295+
def toBlackList = []
275296
if (versionParts[0].equals("5")) {
276297
Integer minor = Integer.parseInt(versionParts[1])
277298
if (minor >= 2) {
278-
systemProperty 'tests.rest.blacklist', '/20_security/Verify default password migration results in upgraded cluster'
299+
toBlackList << '/20_security/Verify default password migration results in upgraded cluster'
279300
}
280301
}
302+
// Dataframe transforms were not added until 7.2.0
303+
if (version.before('7.2.0')) {
304+
toBlackList << 'upgraded_cluster/80_data_frame_jobs_crud/Get start, stop, and delete old and mixed cluster batch data frame transforms'
305+
}
306+
if (!toBlackList.empty) {
307+
systemProperty 'tests.rest.blacklist', toBlackList.join(',')
308+
}
281309
}
282310

283311
Task versionBwcTest = tasks.create(name: "${baseName}#bwcTest") {
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
---
2+
"Test put batch data frame transforms on mixed cluster":
3+
- do:
4+
cluster.health:
5+
index: "dataframe-transform-airline-data"
6+
wait_for_status: green
7+
timeout: 70s
8+
9+
- do:
10+
data_frame.put_data_frame_transform:
11+
transform_id: "mixed-simple-transform"
12+
body: >
13+
{
14+
"source": { "index": "dataframe-transform-airline-data" },
15+
"dest": { "index": "mixed-simple-transform-idx" },
16+
"pivot": {
17+
"group_by": { "airline": {"terms": {"field": "airline"}}},
18+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
19+
}
20+
}
21+
- match: { acknowledged: true }
22+
23+
- do:
24+
data_frame.start_data_frame_transform:
25+
transform_id: "mixed-simple-transform"
26+
- match: { acknowledged: true }
27+
- do:
28+
data_frame.get_data_frame_transform_stats:
29+
transform_id: "mixed-simple-transform"
30+
- match: { count: 1 }
31+
- match: { transforms.0.id: "mixed-simple-transform" }
32+
- match: { transforms.0.state.task_state: "/started|stopped/" }
33+
34+
- do:
35+
data_frame.stop_data_frame_transform:
36+
transform_id: "mixed-simple-transform"
37+
wait_for_completion: true
38+
- match: { acknowledged: true }
39+
40+
- do:
41+
data_frame.get_data_frame_transform_stats:
42+
transform_id: "mixed-simple-transform"
43+
- match: { count: 1 }
44+
- match: { transforms.0.id: "mixed-simple-transform" }
45+
- match: { transforms.0.state.indexer_state: "stopped" }
46+
- match: { transforms.0.state.task_state: "stopped" }
47+
48+
- do:
49+
data_frame.put_data_frame_transform:
50+
transform_id: "mixed-complex-transform"
51+
body: >
52+
{
53+
"source": {
54+
"index": "dataframe-transform-airline-data",
55+
"query": {
56+
"bool": {
57+
"filter": {"term": {"airline": "ElasticAir"}}
58+
}
59+
}
60+
},
61+
"dest": {
62+
"index": "mixed-complex-transform-idx"
63+
},
64+
"pivot": {
65+
"group_by": {
66+
"airline": {"terms": {"field": "airline"}},
67+
"day": {"date_histogram": {"field": "timestamp", "calendar_interval": "1d"}},
68+
"every_50": {"histogram": {"field": "responsetime", "interval": 50}}
69+
},
70+
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
71+
}
72+
}
73+
- match: { acknowledged: true }
74+
75+
- do:
76+
data_frame.get_data_frame_transform:
77+
transform_id: "mixed-complex-transform"
78+
- match: { count: 1 }
79+
- match: { transforms.0.id: "mixed-complex-transform" }
80+
81+
- do:
82+
data_frame.start_data_frame_transform:
83+
transform_id: "mixed-complex-transform"
84+
- match: { acknowledged: true }
85+
- do:
86+
data_frame.get_data_frame_transform_stats:
87+
transform_id: "mixed-complex-transform"
88+
- match: { count: 1 }
89+
- match: { transforms.0.id: "mixed-complex-transform" }
90+
- match: { transforms.0.state.task_state: "/started|stopped/" }
91+
92+
- do:
93+
data_frame.stop_data_frame_transform:
94+
transform_id: "mixed-complex-transform"
95+
wait_for_completion: true
96+
- match: { acknowledged: true }
97+
98+
- do:
99+
data_frame.get_data_frame_transform_stats:
100+
transform_id: "mixed-complex-transform"
101+
- match: { count: 1 }
102+
- match: { transforms.0.id: "mixed-complex-transform" }
103+
- match: { transforms.0.state.indexer_state: "stopped" }
104+
- match: { transforms.0.state.task_state: "stopped" }
105+
106+
---
107+
"Test GET, start, and stop old cluster batch transforms":
108+
- do:
109+
cluster.health:
110+
index: "dataframe-transform-airline-data"
111+
wait_for_status: green
112+
timeout: 70s
113+
114+
- do:
115+
data_frame.get_data_frame_transform:
116+
transform_id: "old-simple-transform"
117+
- match: { count: 1 }
118+
- match: { transforms.0.id: "old-simple-transform" }
119+
- match: { transforms.0.source.index.0: "dataframe-transform-airline-data" }
120+
- match: { transforms.0.dest.index: "old-simple-transform-idx" }
121+
- match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
122+
- match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
123+
124+
- do:
125+
data_frame.start_data_frame_transform:
126+
transform_id: "old-simple-transform"
127+
- match: { acknowledged: true }
128+
- do:
129+
data_frame.get_data_frame_transform_stats:
130+
transform_id: "old-simple-transform"
131+
- match: { count: 1 }
132+
- match: { transforms.0.id: "old-simple-transform" }
133+
- match: { transforms.0.state.task_state: "/started|stopped/" }
134+
135+
- do:
136+
data_frame.stop_data_frame_transform:
137+
transform_id: "old-simple-transform"
138+
wait_for_completion: true
139+
- match: { acknowledged: true }
140+
- do:
141+
data_frame.get_data_frame_transform_stats:
142+
transform_id: "old-simple-transform"
143+
- match: { count: 1 }
144+
- match: { transforms.0.id: "old-simple-transform" }
145+
- match: { transforms.0.state.indexer_state: "stopped" }
146+
- match: { transforms.0.state.task_state: "stopped" }
147+
148+
- do:
149+
data_frame.get_data_frame_transform:
150+
transform_id: "old-complex-transform"
151+
- match: { count: 1 }
152+
- match: { transforms.0.id: "old-complex-transform" }
153+
- match: { transforms.0.source.index.0: "dataframe-transform-airline-data" }
154+
- match: { transforms.0.dest.index: "old-complex-transform-idx" }
155+
- match: { transforms.0.pivot.group_by.airline.terms.field: "airline" }
156+
- match: { transforms.0.pivot.group_by.day.date_histogram.field: "timestamp" }
157+
- match: { transforms.0.pivot.group_by.every_50.histogram.field: "responsetime" }
158+
- match: { transforms.0.pivot.aggregations.avg_response.avg.field: "responsetime" }
159+
160+
- do:
161+
data_frame.start_data_frame_transform:
162+
transform_id: "old-complex-transform"
163+
- match: { acknowledged: true }
164+
- do:
165+
data_frame.get_data_frame_transform_stats:
166+
transform_id: "old-complex-transform"
167+
- match: { count: 1 }
168+
- match: { transforms.0.id: "old-complex-transform" }
169+
- match: { transforms.0.state.task_state: "/started|stopped/" }
170+
171+
- do:
172+
data_frame.stop_data_frame_transform:
173+
transform_id: "old-complex-transform"
174+
wait_for_completion: true
175+
- match: { acknowledged: true }
176+
- do:
177+
data_frame.get_data_frame_transform_stats:
178+
transform_id: "old-complex-transform"
179+
- match: { count: 1 }
180+
- match: { transforms.0.id: "old-complex-transform" }
181+
- match: { transforms.0.state.indexer_state: "stopped" }
182+
- match: { transforms.0.state.task_state: "stopped" }

0 commit comments

Comments
 (0)