2
2
import unittest
3
3
4
4
from cr8 .run_crate import get_crate , _extract_version
5
- from parameterized import parameterized
6
5
from crate .client import connect
7
6
import random
8
7
from random import sample
9
8
10
9
from crate .qa .tests import NodeProvider , insert_data , UpgradePath
11
10
12
- UPGRADE_PATHS = [( UpgradePath ('4.2.x' , '4.3.x' ),), ( UpgradePath ('4.3.x' , 'latest-nightly' ), )]
13
- UPGRADE_PATHS_FROM_43 = [( UpgradePath ('4.3.x' , 'latest-nightly' ), )]
11
+ UPGRADE_PATHS = [UpgradePath ('4.2.x' , '4.3.x' ), UpgradePath ('4.3.x' , 'latest-nightly' )]
12
+ UPGRADE_PATHS_FROM_43 = [UpgradePath ('4.3.x' , 'latest-nightly' )]
14
13
15
14
16
15
class RecoveryTest (NodeProvider , unittest .TestCase ):
@@ -83,12 +82,23 @@ def _upgrade_cluster(self, cluster, version: str, nodes: int) -> None:
83
82
new_node = self .upgrade_node (node , version )
84
83
cluster [i ] = new_node
85
84
86
- @parameterized .expand (UPGRADE_PATHS )
87
- def test_recovery_with_concurrent_indexing (self , path ):
85
+ def _run_upgrade_paths (self , test , paths ):
86
+ for p in paths :
87
+ try :
88
+ self .setUp ()
89
+ test (p )
90
+ finally :
91
+ self .tearDown ()
92
+
93
+ def test_recovery_with_concurrent_indexing (self ):
88
94
"""
89
95
This test creates a new table and insert data at every stage of the
90
96
rolling upgrade.
91
97
"""
98
+
99
+ self ._run_upgrade_paths (self ._test_recovery_with_concurrent_indexing , UPGRADE_PATHS )
100
+
101
+ def _test_recovery_with_concurrent_indexing (self , path ):
92
102
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
93
103
cluster .start ()
94
104
@@ -143,8 +153,10 @@ def test_recovery_with_concurrent_indexing(self, path):
143
153
for node_id in node_ids :
144
154
self .assert_busy (lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 ))
145
155
146
- @parameterized .expand (UPGRADE_PATHS )
147
- def test_relocation_with_concurrent_indexing (self , path ):
156
+ def test_relocation_with_concurrent_indexing (self ):
157
+ self ._run_upgrade_paths (self ._test_relocation_with_concurrent_indexing , UPGRADE_PATHS )
158
+
159
+ def _test_relocation_with_concurrent_indexing (self , path ):
148
160
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
149
161
cluster .start ()
150
162
@@ -216,13 +228,15 @@ def _assert_shard_state(self, conn, schema, table_name, node_id, state):
216
228
self .assertTrue (current_state )
217
229
self .assertEqual (current_state [0 ], state )
218
230
219
- @parameterized .expand (UPGRADE_PATHS )
220
- def test_recovery (self , path ):
231
+ def test_recovery (self ):
221
232
"""
222
233
This test creates a new table, insert data and asserts the state at every stage of the
223
234
rolling upgrade.
224
235
"""
225
236
237
+ self ._run_upgrade_paths (self ._test_recovery , UPGRADE_PATHS )
238
+
239
+ def _test_recovery (self , path ):
226
240
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
227
241
cluster .start ()
228
242
@@ -253,13 +267,15 @@ def test_recovery(self, path):
253
267
254
268
self .assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
255
269
256
- @parameterized .expand (UPGRADE_PATHS )
257
- def test_recovery_closed_index (self , path ):
270
+ def test_recovery_closed_index (self ):
258
271
"""
259
272
This test creates a table in the non upgraded cluster and closes it. It then
260
273
checks that the table is effectively closed and potentially replicated.
261
274
"""
262
275
276
+ self ._run_upgrade_paths (self ._test_recovery_closed_index , UPGRADE_PATHS )
277
+
278
+ def _test_recovery_closed_index (self , path ):
263
279
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
264
280
cluster .start ()
265
281
@@ -284,8 +300,10 @@ def test_recovery_closed_index(self, path):
284
300
285
301
self ._assert_is_closed (conn , 'doc' , 'test' )
286
302
287
- @parameterized .expand (UPGRADE_PATHS )
288
- def test_closed_index_during_rolling_upgrade (self , path ):
303
+ def test_closed_index_during_rolling_upgrade (self ):
304
+ self ._run_upgrade_paths (self ._test_closed_index_during_rolling_upgrade , UPGRADE_PATHS )
305
+
306
+ def _test_closed_index_during_rolling_upgrade (self , path ):
289
307
"""
290
308
This test creates and closes a new table at every stage of the rolling
291
309
upgrade. It then checks that the table is effectively closed and
@@ -334,12 +352,15 @@ def test_closed_index_during_rolling_upgrade(self, path):
334
352
335
353
self ._assert_is_closed (conn , 'doc' , 'upgraded_cluster' )
336
354
337
- @parameterized .expand (UPGRADE_PATHS )
338
- def test_update_docs (self , path ):
355
+ def test_update_docs (self ):
339
356
"""
340
357
This test creates a new table, insert data and updates data at every state at every stage of the
341
358
rolling upgrade.
342
359
"""
360
+
361
+ self ._run_upgrade_paths (self ._test_update_docs , UPGRADE_PATHS )
362
+
363
+ def _test_update_docs (self , path ):
343
364
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
344
365
cluster .start ()
345
366
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -384,8 +405,7 @@ def test_update_docs(self, path):
384
405
for result in res :
385
406
self .assertEqual (result ['rowcount' ], 1 )
386
407
387
- @parameterized .expand (UPGRADE_PATHS_FROM_43 )
388
- def test_operation_based_recovery (self , path ):
408
+ def test_operation_based_recovery (self ):
389
409
"""
390
410
Tests that we should perform an operation-based recovery if there were
391
411
some but not too many uncommitted documents (i.e., less than 10% of
@@ -394,6 +414,9 @@ def test_operation_based_recovery(self, path):
394
414
based peer recoveries.
395
415
"""
396
416
417
+ self ._run_upgrade_paths (self ._test_operation_based_recovery , UPGRADE_PATHS_FROM_43 )
418
+
419
+ def _test_operation_based_recovery (self , path ):
397
420
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
398
421
cluster .start ()
399
422
@@ -435,13 +458,15 @@ def test_operation_based_recovery(self, path):
435
458
436
459
self ._assert_ensure_checkpoints_are_synced (conn , 'doc' , 'test' )
437
460
438
- @parameterized .expand (UPGRADE_PATHS_FROM_43 )
439
- def test_turnoff_translog_retention_after_upgraded (self , path ):
461
+ def test_turnoff_translog_retention_after_upgraded (self ):
440
462
"""
441
463
Verifies that once all shard copies on the new version, we should turn
442
464
off the translog retention for indices with soft-deletes.
443
465
"""
444
466
467
+ self ._run_upgrade_paths (self ._test_turnoff_translog_retention_after_upgraded , UPGRADE_PATHS_FROM_43 )
468
+
469
+ def _test_turnoff_translog_retention_after_upgraded (self , path ):
445
470
cluster = self ._new_cluster (path .from_version , self .NUMBER_OF_NODES )
446
471
cluster .start ()
447
472
0 commit comments