Skip to content

Commit a26dec6

Browse files
committed
Cleanup
1 parent 51dae76 commit a26dec6

File tree

3 files changed

+64
-46
lines changed

3 files changed

+64
-46
lines changed

src/crate/qa/tests.py

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
'/usr/lib/java-1.8.0',
2929
) + tuple(glob('/Library/Java/JavaVirtualMachines/jdk*1.8*/Contents/Home'))
3030

31+
3132
class UpgradePath(NamedTuple):
3233
from_version: str
3334
to_version: str

tests/bwc/test_assertions.py

Whitespace-only changes.

tests/bwc/test_recovery.py

+63-46
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,35 @@
1+
import time
12
import unittest
23

34
from parameterized import parameterized
45
from crate.client import connect
56
import random
67
from random import sample
78

8-
from crate.qa.tests import NodeProvider, insert_data, UpgradePath, assert_busy
9+
from crate.qa.tests import NodeProvider, insert_data, UpgradePath
910

10-
NUMBER_OF_NODES = random.randint(3, 5)
11-
UPGRADE_42_TO_43 = ('4.2.x to 4.3.x', UpgradePath('4.2.x', '4.3.x'), NUMBER_OF_NODES,)
12-
UPGRADE_43_TO_LATEST = ('4.3.x to latest-nightly', UpgradePath('4.3.x', 'latest-nightly'), NUMBER_OF_NODES,)
11+
UPGRADE_PATHS = [(UpgradePath('4.2.x', '4.3.x'),), (UpgradePath('4.3.x', 'latest-nightly'),)]
12+
UPGRADE_PATHS_FROM_43 = [(UpgradePath('4.3.x', 'latest-nightly'),)]
13+
14+
15+
def assert_busy(assertion, timeout=60, f=2.0):
16+
waited = 0
17+
duration = 0.1
18+
assertion_error = None
19+
while waited < timeout:
20+
try:
21+
assertion()
22+
return
23+
except AssertionError as e:
24+
assertion_error = e
25+
time.sleep(duration)
26+
waited += duration
27+
duration *= f
28+
raise assertion_error
1329

1430

1531
class RecoveryTest(NodeProvider, unittest.TestCase):
32+
NUMBER_OF_NODES = 3
1633
"""
1734
In depth testing of the recovery mechanism during a rolling restart.
1835
Based on org.elasticsearch.upgrades.RecoveryIT.java
@@ -61,13 +78,13 @@ def _upgrade_cluster(self, cluster, version, nodes):
6178
new_node = self.upgrade_node(node, version)
6279
cluster[i] = new_node
6380

64-
@parameterized.expand([UPGRADE_42_TO_43, UPGRADE_43_TO_LATEST])
65-
def test_recovery_with_concurrent_indexing(self, name, path, nodes):
81+
@parameterized.expand(UPGRADE_PATHS)
82+
def test_recovery_with_concurrent_indexing(self, path):
6683
"""
6784
This test creates a new table and insert data at every stage of the
6885
rolling upgrade.
6986
"""
70-
cluster = self._new_cluster(path.from_version, nodes)
87+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
7188
cluster.start()
7289

7390
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -85,7 +102,7 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
85102
c.execute('''alter table doc.test set ("routing.allocation.enable"='primaries')''')
86103

87104
# upgrade to mixed cluster
88-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
105+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
89106
c.execute('''alter table doc.test set ("routing.allocation.enable"='all')''')
90107
# insert data into a mixed cluster
91108
insert_data(conn, 'doc', 'test', 50)
@@ -96,15 +113,15 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
96113
# check counts for each node individually
97114
c.execute('select id from sys.nodes')
98115
node_ids = c.fetchall()
99-
self.assertEqual(len(node_ids), nodes)
116+
self.assertEqual(len(node_ids), self.NUMBER_OF_NODES)
100117

101118
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
102119
for node_id in node_ids:
103120
assert_busy(lambda: self._assert_num_docs_by_node_id(conn, 'doc', 'test', node_id[0], 60))
104121

105122
c.execute('''alter table doc.test set ("routing.allocation.enable"='primaries')''')
106123
# upgrade the full cluster
107-
self._upgrade_cluster(cluster, path.to_version, nodes)
124+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
108125
c.execute('''alter table doc.test set ("routing.allocation.enable"='all')''')
109126

110127
insert_data(conn, 'doc', 'test', 45)
@@ -115,14 +132,14 @@ def test_recovery_with_concurrent_indexing(self, name, path, nodes):
115132

116133
c.execute('select id from sys.nodes')
117134
node_ids = c.fetchall()
118-
self.assertEqual(len(node_ids), nodes)
135+
self.assertEqual(len(node_ids), self.NUMBER_OF_NODES)
119136

120137
for node_id in node_ids:
121138
assert_busy(lambda: self._assert_num_docs_by_node_id(conn, 'doc', 'test', node_id[0], 105))
122139

123-
@parameterized.expand([UPGRADE_42_TO_43, UPGRADE_43_TO_LATEST])
124-
def test_relocation_with_concurrent_indexing(self, name, path, nodes):
125-
cluster = self._new_cluster(path.from_version, nodes)
140+
@parameterized.expand(UPGRADE_PATHS)
141+
def test_relocation_with_concurrent_indexing(self, path):
142+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
126143
cluster.start()
127144

128145
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -141,7 +158,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
141158
c.execute('''alter table doc.test set("routing.allocation.enable"='none')''')
142159

143160
# upgrade to mixed cluster
144-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
161+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
145162

146163
c.execute('''select id from sys.nodes order by version['number'] desc limit 1''')
147164
new_node_id = c.fetchone()[0]
@@ -169,7 +186,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
169186
self._assert_num_docs_by_node_id(conn, 'doc', 'test', new_node_id, 60)
170187

171188
# upgrade fully to the new version
172-
self._upgrade_cluster(cluster, path.to_version, nodes)
189+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
173190

174191
c.execute('''alter table doc.test set("number_of_replicas"=2)''')
175192
c.execute('''alter table doc.test reset("routing.allocation.include._id")''')
@@ -180,7 +197,7 @@ def test_relocation_with_concurrent_indexing(self, name, path, nodes):
180197
c.execute('refresh table doc.test')
181198
c.execute('select id from sys.nodes')
182199
node_ids = c.fetchall()
183-
self.assertEqual(len(node_ids), nodes)
200+
self.assertEqual(len(node_ids), self.NUMBER_OF_NODES)
184201

185202
for node_id in node_ids:
186203
self._assert_num_docs_by_node_id(conn, 'doc', 'test', node_id[0], 105)
@@ -193,14 +210,14 @@ def _assert_shard_state(self, conn, schema, table_name, node_id, state):
193210
self.assertTrue(current_state)
194211
self.assertEqual(current_state[0], state)
195212

196-
@parameterized.expand([UPGRADE_42_TO_43, UPGRADE_43_TO_LATEST])
197-
def test_recovery(self, name, path, nodes):
213+
@parameterized.expand(UPGRADE_PATHS)
214+
def test_recovery(self, path):
198215
"""
199216
This test creates a new table, insert data and asserts the state at every stage of the
200217
rolling upgrade.
201218
"""
202219

203-
cluster = self._new_cluster(path.from_version, nodes)
220+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
204221
cluster.start()
205222

206223
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -218,26 +235,26 @@ def test_recovery(self, name, path, nodes):
218235
c.execute("refresh table doc.test")
219236

220237
# upgrade to mixed cluster
221-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
238+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
222239

223240
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
224241

225242
# upgrade fully to the new version
226-
self._upgrade_cluster(cluster, path.to_version, nodes)
243+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
227244

228245
if random.choice([True, False]):
229246
c.execute("refresh table doc.test")
230247

231248
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
232249

233-
@parameterized.expand([UPGRADE_42_TO_43, UPGRADE_43_TO_LATEST])
234-
def test_recovery_closed_index(self, name, path, nodes):
250+
@parameterized.expand(UPGRADE_PATHS)
251+
def test_recovery_closed_index(self, path):
235252
"""
236253
This test creates a table in the non upgraded cluster and closes it. It then
237254
checks that the table is effectively closed and potentially replicated.
238255
"""
239256

240-
cluster = self._new_cluster(path.from_version, nodes)
257+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
241258
cluster.start()
242259

243260
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -252,24 +269,24 @@ def test_recovery_closed_index(self, name, path, nodes):
252269
c.execute('alter table doc.test close')
253270

254271
# upgrade to mixed cluster
255-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
272+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
256273

257274
self._assert_is_closed(conn, 'doc', 'test')
258275

259276
# upgrade fully to the new version
260-
self._upgrade_cluster(cluster, path.to_version, nodes)
277+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
261278

262279
self._assert_is_closed(conn, 'doc', 'test')
263280

264-
@parameterized.expand([UPGRADE_42_TO_43, UPGRADE_43_TO_LATEST])
265-
def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
281+
@parameterized.expand(UPGRADE_PATHS)
282+
def test_closed_index_during_rolling_upgrade(self, path):
266283
"""
267284
This test creates and closes a new table at every stage of the rolling
268285
upgrade. It then checks that the table is effectively closed and
269286
replicated.
270287
"""
271288

272-
cluster = self._new_cluster(path.from_version, nodes)
289+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
273290
cluster.start()
274291

275292
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -283,7 +300,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
283300
self._assert_is_closed(conn, 'doc', 'old_cluster')
284301

285302
# upgrade to mixed cluster
286-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
303+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
287304

288305
self._assert_is_closed(conn, 'doc', 'old_cluster')
289306

@@ -297,7 +314,7 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
297314
self._assert_is_closed(conn, 'doc', 'mixed_cluster')
298315

299316
# upgrade fully to the new version
300-
self._upgrade_cluster(cluster, path.to_version, nodes)
317+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
301318

302319
self._assert_is_closed(conn, 'doc', 'old_cluster')
303320
self._assert_is_closed(conn, 'doc', 'mixed_cluster')
@@ -311,13 +328,13 @@ def test_closed_index_during_rolling_upgrade(self, name, path, nodes):
311328

312329
self._assert_is_closed(conn, 'doc', 'upgraded_cluster')
313330

314-
@parameterized.expand([UPGRADE_42_TO_43, UPGRADE_43_TO_LATEST])
315-
def test_update_docs(self, name, path, nodes):
331+
@parameterized.expand(UPGRADE_PATHS)
332+
def test_update_docs(self, path):
316333
"""
317334
This test creates a new table, insert data and updates data at every state at every stage of the
318335
rolling upgrade.
319336
"""
320-
cluster = self._new_cluster(path.from_version, nodes)
337+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
321338
cluster.start()
322339
with connect(cluster.node().http_url, error_trace=True) as conn:
323340
c = conn.cursor()
@@ -332,7 +349,7 @@ def test_update_docs(self, name, path, nodes):
332349
c.execute('refresh table doc.test')
333350

334351
# upgrade to mixed cluster
335-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
352+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
336353

337354
if random.choice([True, False]):
338355
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
@@ -351,7 +368,7 @@ def test_update_docs(self, name, path, nodes):
351368
c.execute('refresh table doc.test')
352369

353370
# upgrade fully to the new version
354-
self._upgrade_cluster(cluster, path.to_version, nodes)
371+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
355372

356373
updates = [(i, str(random.randint)) for i in range(0, 100)]
357374
res = c.executemany(
@@ -361,8 +378,8 @@ def test_update_docs(self, name, path, nodes):
361378
for result in res:
362379
self.assertEqual(result['rowcount'], 1)
363380

364-
@parameterized.expand([UPGRADE_43_TO_LATEST])
365-
def test_operation_based_recovery(self, name, path, nodes):
381+
@parameterized.expand(UPGRADE_PATHS_FROM_43)
382+
def test_operation_based_recovery(self, path):
366383
"""
367384
Tests that we should perform an operation-based recovery if there were
368385
some but not too many uncommitted documents (i.e., less than 10% of
@@ -371,7 +388,7 @@ def test_operation_based_recovery(self, name, path, nodes):
371388
based peer recoveries.
372389
"""
373390

374-
cluster = self._new_cluster(path.from_version, nodes)
391+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
375392
cluster.start()
376393

377394
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -392,7 +409,7 @@ def test_operation_based_recovery(self, name, path, nodes):
392409
insert_data(conn, 'doc', 'test', num_docs)
393410

394411
# upgrade to mixed cluster
395-
self._upgrade_cluster(cluster, path.to_version, random.randint(1, nodes - 1))
412+
self._upgrade_cluster(cluster, path.to_version, random.randint(1, self.NUMBER_OF_NODES - 1))
396413

397414
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
398415

@@ -402,7 +419,7 @@ def test_operation_based_recovery(self, name, path, nodes):
402419
self._assert_ensure_checkpoints_are_synced(conn, 'doc', 'test')
403420

404421
# upgrade fully to the new version
405-
self._upgrade_cluster(cluster, path.to_version, nodes)
422+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
406423

407424
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
408425

@@ -412,14 +429,14 @@ def test_operation_based_recovery(self, name, path, nodes):
412429

413430
self._assert_ensure_checkpoints_are_synced(conn, 'doc', 'test')
414431

415-
@parameterized.expand([UPGRADE_43_TO_LATEST])
416-
def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
432+
@parameterized.expand([UPGRADE_PATHS_FROM_43])
433+
def test_turnoff_translog_retention_after_upgraded(self, path):
417434
"""
418435
Verifies that once all shard copies on the new version, we should turn
419436
off the translog retention for indices with soft-deletes.
420437
"""
421438

422-
cluster = self._new_cluster(path.from_version, nodes)
439+
cluster = self._new_cluster(path.from_version, self.NUMBER_OF_NODES)
423440
cluster.start()
424441

425442
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -440,7 +457,7 @@ def test_turnoff_translog_retention_after_upgraded(self, name, path, nodes):
440457
insert_data(conn, 'doc', 'test', num_docs)
441458

442459
# update the cluster to the new version
443-
self._upgrade_cluster(cluster, path.to_version, nodes)
460+
self._upgrade_cluster(cluster, path.to_version, self.NUMBER_OF_NODES)
444461

445462
assert_busy(lambda: self._assert_is_green(conn, 'doc', 'test'))
446463
c.execute('refresh table doc.test')

0 commit comments

Comments
 (0)