1
+ import time
1
2
import unittest
2
3
3
4
from parameterized import parameterized
4
5
from crate .client import connect
5
6
import random
6
7
from random import sample
7
8
8
- from crate .qa .tests import NodeProvider , insert_data , UpgradePath , assert_busy
9
+ from crate .qa .tests import NodeProvider , insert_data , UpgradePath
9
10
10
- NUMBER_OF_NODES = random .randint (3 , 5 )
11
- UPGRADE_42_TO_43 = ('4.2.x to 4.3.x' , UpgradePath ('4.2.x' , '4.3.x' ), NUMBER_OF_NODES ,)
12
- UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly' , UpgradePath ('4.3.x' , 'latest-nightly' ), NUMBER_OF_NODES ,)
11
+ UPGRADE_PATHS = [(UpgradePath ('4.2.x' , '4.3.x' ),), (UpgradePath ('4.3.x' , 'latest-nightly' ),)]
12
+ UPGRADE_PATHS_FROM_43 = [(UpgradePath ('4.3.x' , 'latest-nightly' ),)]
13
+
14
+
15
+ def assert_busy (assertion , timeout = 60 , f = 2.0 ):
16
+ waited = 0
17
+ duration = 0.1
18
+ assertion_error = None
19
+ while waited < timeout :
20
+ try :
21
+ assertion ()
22
+ return
23
+ except AssertionError as e :
24
+ assertion_error = e
25
+ time .sleep (duration )
26
+ waited += duration
27
+ duration *= f
28
+ raise assertion_error
13
29
14
30
15
31
class RecoveryTest (NodeProvider , unittest .TestCase ):
32
+ NUMBER_OF_NODES = 3
16
33
"""
17
34
In depth testing of the recovery mechanism during a rolling restart.
18
35
Based on org.elasticsearch.upgrades.RecoveryIT.java
@@ -61,13 +78,13 @@ def _upgrade_cluster(self, cluster, version, nodes):
61
78
new_node = self .upgrade_node (node , version )
62
79
cluster [i ] = new_node
63
80
64
- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
65
- def test_recovery_with_concurrent_indexing (self , name , path , nodes ):
81
+ @parameterized .expand (UPGRADE_PATHS )
82
+ def test_recovery_with_concurrent_indexing (self , path ):
66
83
"""
67
84
This test creates a new table and insert data at every stage of the
68
85
rolling upgrade.
69
86
"""
70
- cluster = self ._new_cluster (path .from_version , nodes )
87
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
71
88
cluster .start ()
72
89
73
90
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -85,7 +102,7 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
85
102
c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
86
103
87
104
# upgrade to mixed cluster
88
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
105
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
89
106
c .execute ('''alter table doc.test set ("routing.allocation.enable"='all')''' )
90
107
# insert data into a mixed cluster
91
108
insert_data (conn , 'doc' , 'test' , 50 )
@@ -96,15 +113,15 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
96
113
# check counts for each node individually
97
114
c .execute ('select id from sys.nodes' )
98
115
node_ids = c .fetchall ()
99
- self .assertEqual (len (node_ids ), nodes )
116
+ self .assertEqual (len (node_ids ), self . NUMBER_OF_NODES )
100
117
101
118
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
102
119
for node_id in node_ids :
103
120
assert_busy (lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 60 ))
104
121
105
122
c .execute ('''alter table doc.test set ("routing.allocation.enable"='primaries')''' )
106
123
# upgrade the full cluster
107
- self ._upgrade_cluster (cluster , path .to_version , nodes )
124
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
108
125
c .execute ('''alter table doc.test set ("routing.allocation.enable"='all')''' )
109
126
110
127
insert_data (conn , 'doc' , 'test' , 45 )
@@ -115,14 +132,14 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
115
132
116
133
c .execute ('select id from sys.nodes' )
117
134
node_ids = c .fetchall ()
118
- self .assertEqual (len (node_ids ), nodes )
135
+ self .assertEqual (len (node_ids ), self . NUMBER_OF_NODES )
119
136
120
137
for node_id in node_ids :
121
138
assert_busy (lambda : self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 ))
122
139
123
- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
124
- def test_relocation_with_concurrent_indexing (self , name , path , nodes ):
125
- cluster = self ._new_cluster (path .from_version , nodes )
140
+ @parameterized .expand (UPGRADE_PATHS )
141
+ def test_relocation_with_concurrent_indexing (self , path ):
142
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
126
143
cluster .start ()
127
144
128
145
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -141,7 +158,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
141
158
c .execute ('''alter table doc.test set("routing.allocation.enable"='none')''' )
142
159
143
160
# upgrade to mixed cluster
144
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
161
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
145
162
146
163
c .execute ('''select id from sys.nodes order by version['number'] desc limit 1''' )
147
164
new_node_id = c .fetchone ()[0 ]
@@ -169,7 +186,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
169
186
self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , new_node_id , 60 )
170
187
171
188
# upgrade fully to the new version
172
- self ._upgrade_cluster (cluster , path .to_version , nodes )
189
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
173
190
174
191
c .execute ('''alter table doc.test set("number_of_replicas"=2)''' )
175
192
c .execute ('''alter table doc.test reset("routing.allocation.include._id")''' )
@@ -180,7 +197,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
180
197
c .execute ('refresh table doc.test' )
181
198
c .execute ('select id from sys.nodes' )
182
199
node_ids = c .fetchall ()
183
- self .assertEqual (len (node_ids ), nodes )
200
+ self .assertEqual (len (node_ids ), self . NUMBER_OF_NODES )
184
201
185
202
for node_id in node_ids :
186
203
self ._assert_num_docs_by_node_id (conn , 'doc' , 'test' , node_id [0 ], 105 )
@@ -193,14 +210,14 @@ def _assert_shard_state(self, conn, schema, table_name, node_id, state):
193
210
self .assertTrue (current_state )
194
211
self .assertEqual (current_state [0 ], state )
195
212
196
- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
197
- def test_recovery (self , name , path , nodes ):
213
+ @parameterized .expand (UPGRADE_PATHS )
214
+ def test_recovery (self , path ):
198
215
"""
199
216
This test creates a new table, insert data and asserts the state at every stage of the
200
217
rolling upgrade.
201
218
"""
202
219
203
- cluster = self ._new_cluster (path .from_version , nodes )
220
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
204
221
cluster .start ()
205
222
206
223
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -218,26 +235,26 @@ def test_recovery(self, name, path, nodes):
218
235
c .execute ("refresh table doc.test" )
219
236
220
237
# upgrade to mixed cluster
221
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
238
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
222
239
223
240
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
224
241
225
242
# upgrade fully to the new version
226
- self ._upgrade_cluster (cluster , path .to_version , nodes )
243
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
227
244
228
245
if random .choice ([True , False ]):
229
246
c .execute ("refresh table doc.test" )
230
247
231
248
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
232
249
233
- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
234
- def test_recovery_closed_index (self , name , path , nodes ):
250
+ @parameterized .expand (UPGRADE_PATHS )
251
+ def test_recovery_closed_index (self , path ):
235
252
"""
236
253
This test creates a table in the non upgraded cluster and closes it. It then
237
254
checks that the table is effectively closed and potentially replicated.
238
255
"""
239
256
240
- cluster = self ._new_cluster (path .from_version , nodes )
257
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
241
258
cluster .start ()
242
259
243
260
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -252,24 +269,24 @@ def test_recovery_closed_index(self, name, path, nodes):
252
269
c .execute ('alter table doc.test close' )
253
270
254
271
# upgrade to mixed cluster
255
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
272
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
256
273
257
274
self ._assert_is_closed (conn , 'doc' , 'test' )
258
275
259
276
# upgrade fully to the new version
260
- self ._upgrade_cluster (cluster , path .to_version , nodes )
277
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
261
278
262
279
self ._assert_is_closed (conn , 'doc' , 'test' )
263
280
264
- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
265
- def test_closed_index_during_rolling_upgrade (self , name , path , nodes ):
281
+ @parameterized .expand (UPGRADE_PATHS )
282
+ def test_closed_index_during_rolling_upgrade (self , path ):
266
283
"""
267
284
This test creates and closes a new table at every stage of the rolling
268
285
upgrade. It then checks that the table is effectively closed and
269
286
replicated.
270
287
"""
271
288
272
- cluster = self ._new_cluster (path .from_version , nodes )
289
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
273
290
cluster .start ()
274
291
275
292
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -283,7 +300,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
283
300
self ._assert_is_closed (conn , 'doc' , 'old_cluster' )
284
301
285
302
# upgrade to mixed cluster
286
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
303
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
287
304
288
305
self ._assert_is_closed (conn , 'doc' , 'old_cluster' )
289
306
@@ -297,7 +314,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
297
314
self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
298
315
299
316
# upgrade fully to the new version
300
- self ._upgrade_cluster (cluster , path .to_version , nodes )
317
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
301
318
302
319
self ._assert_is_closed (conn , 'doc' , 'old_cluster' )
303
320
self ._assert_is_closed (conn , 'doc' , 'mixed_cluster' )
@@ -311,13 +328,13 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
311
328
312
329
self ._assert_is_closed (conn , 'doc' , 'upgraded_cluster' )
313
330
314
- @parameterized .expand ([ UPGRADE_42_TO_43 , UPGRADE_43_TO_LATEST ] )
315
- def test_update_docs (self , name , path , nodes ):
331
+ @parameterized .expand (UPGRADE_PATHS )
332
+ def test_update_docs (self , path ):
316
333
"""
317
334
This test creates a new table, insert data and updates data at every state at every stage of the
318
335
rolling upgrade.
319
336
"""
320
- cluster = self ._new_cluster (path .from_version , nodes )
337
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
321
338
cluster .start ()
322
339
with connect (cluster .node ().http_url , error_trace = True ) as conn :
323
340
c = conn .cursor ()
@@ -332,7 +349,7 @@ def test_update_docs(self, name, path, nodes):
332
349
c .execute ('refresh table doc.test' )
333
350
334
351
# upgrade to mixed cluster
335
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
352
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
336
353
337
354
if random .choice ([True , False ]):
338
355
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
@@ -351,7 +368,7 @@ def test_update_docs(self, name, path, nodes):
351
368
c .execute ('refresh table doc.test' )
352
369
353
370
# upgrade fully to the new version
354
- self ._upgrade_cluster (cluster , path .to_version , nodes )
371
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
355
372
356
373
updates = [(i , str (random .randint )) for i in range (0 , 100 )]
357
374
res = c .executemany (
@@ -361,8 +378,8 @@ def test_update_docs(self, name, path, nodes):
361
378
for result in res :
362
379
self .assertEqual (result ['rowcount' ], 1 )
363
380
364
- @parameterized .expand ([ UPGRADE_43_TO_LATEST ] )
365
- def test_operation_based_recovery (self , name , path , nodes ):
381
+ @parameterized .expand (UPGRADE_PATHS_FROM_43 )
382
+ def test_operation_based_recovery (self , path ):
366
383
"""
367
384
Tests that we should perform an operation-based recovery if there were
368
385
some but not too many uncommitted documents (i.e., less than 10% of
@@ -371,7 +388,7 @@ def test_operation_based_recovery(self, name, path, nodes):
371
388
based peer recoveries.
372
389
"""
373
390
374
- cluster = self ._new_cluster (path .from_version , nodes )
391
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
375
392
cluster .start ()
376
393
377
394
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -392,7 +409,7 @@ def test_operation_based_recovery(self, name, path, nodes):
392
409
insert_data (conn , 'doc' , 'test' , num_docs )
393
410
394
411
# upgrade to mixed cluster
395
- self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , nodes - 1 ))
412
+ self ._upgrade_cluster (cluster , path .to_version , random .randint (1 , self . NUMBER_OF_NODES - 1 ))
396
413
397
414
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
398
415
@@ -402,7 +419,7 @@ def test_operation_based_recovery(self, name, path, nodes):
402
419
self ._assert_ensure_checkpoints_are_synced (conn , 'doc' , 'test' )
403
420
404
421
# upgrade fully to the new version
405
- self ._upgrade_cluster (cluster , path .to_version , nodes )
422
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
406
423
407
424
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
408
425
@@ -412,14 +429,14 @@ def test_operation_based_recovery(self, name, path, nodes):
412
429
413
430
self ._assert_ensure_checkpoints_are_synced (conn , 'doc' , 'test' )
414
431
415
- @parameterized .expand ([UPGRADE_43_TO_LATEST ])
416
- def test_turnoff_translog_retention_after_upgraded (self , name , path , nodes ):
432
+ @parameterized .expand ([UPGRADE_PATHS_FROM_43 ])
433
+ def test_turnoff_translog_retention_after_upgraded (self , path ):
417
434
"""
418
435
Verifies that once all shard copies on the new version, we should turn
419
436
off the translog retention for indices with soft-deletes.
420
437
"""
421
438
422
- cluster = self ._new_cluster (path .from_version , nodes )
439
+ cluster = self ._new_cluster (path .from_version , self . NUMBER_OF_NODES )
423
440
cluster .start ()
424
441
425
442
with connect (cluster .node ().http_url , error_trace = True ) as conn :
@@ -440,7 +457,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
440
457
insert_data (conn , 'doc' , 'test' , num_docs )
441
458
442
459
# update the cluster to the new version
443
- self ._upgrade_cluster (cluster , path .to_version , nodes )
460
+ self ._upgrade_cluster (cluster , path .to_version , self . NUMBER_OF_NODES )
444
461
445
462
assert_busy (lambda : self ._assert_is_green (conn , 'doc' , 'test' ))
446
463
c .execute ('refresh table doc.test' )
0 commit comments