-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathtest_database.py
135 lines (100 loc) · 4.1 KB
/
test_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""Unit tests for database.py."""
# standard library
import unittest
from unittest.mock import MagicMock
from delphi.epidata.acquisition.covidcast.database import Database
# py3tester coverage target
__test_target__ = 'delphi.epidata.acquisition.covidcast.database'
class UnitTests(unittest.TestCase):
"""Basic unit tests."""
def test_connect_opens_connection(self):
"""Connect to the database."""
mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)
self.assertTrue(mock_connector.connect.called)
def test_disconnect_with_rollback(self):
"""Disconnect from the database and rollback."""
mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)
# rollback
database.disconnect(False)
connection = mock_connector.connect()
self.assertFalse(connection.commit.called)
self.assertTrue(connection.close.called)
def test_disconnect_with_commit(self):
"""Disconnect from the database and commit."""
mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)
# commit
database.disconnect(True)
connection = mock_connector.connect()
self.assertTrue(connection.commit.called)
self.assertTrue(connection.close.called)
def test_update_covidcast_meta_cache_query(self):
"""Query to update the metadata cache looks sensible.
NOTE: Actual behavior is tested by integration test.
"""
args = ('epidata_json_str',)
mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)
database.update_covidcast_meta_cache(*args)
connection = mock_connector.connect()
cursor = connection.cursor()
self.assertTrue(cursor.execute.called)
sql, args = cursor.execute.call_args[0]
expected_args = ('"epidata_json_str"',)
self.assertEqual(args, expected_args)
sql = sql.lower()
self.assertIn('update', sql)
self.assertIn('`covidcast_meta_cache`', sql)
self.assertIn('timestamp', sql)
self.assertIn('epidata', sql)
def test_compute_coverage_crossref_query(self):
"""Query to update the compute crossref looks sensible.
NOTE: Actual behavior is tested by integration test.
"""
mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)
database.compute_coverage_crossref()
connection = mock_connector.connect()
cursor = connection.cursor()
self.assertTrue(cursor.execute.called)
def test_insert_or_update_batch_exception_reraised(self):
"""Test that an exception is reraised"""
mock_connector = MagicMock()
database = Database()
database.connect(connector_impl=mock_connector)
connection = mock_connector.connect()
cursor = connection.cursor()
cursor.executemany.side_effect = Exception('Test')
cc_rows = {MagicMock(geo_id='CA', val=1, se=0, sample_size=0)}
self.assertRaises(Exception, database.insert_or_update_batch, cc_rows)
def test_insert_or_update_batch_row_count_returned(self):
"""Test that the row count is returned"""
mock_connector = MagicMock()
database = Database()
database.count_all_load_rows = lambda:0 # simulate an empty load table
database.connect(connector_impl=mock_connector)
connection = mock_connector.connect()
cursor = connection.cursor()
cursor.rowcount = 3
cc_rows = [MagicMock(geo_id='CA', val=1, se=0, sample_size=0)]
result = database.insert_or_update_batch(cc_rows)
self.assertEqual(result, 3)
def test_insert_or_update_batch_none_returned(self):
"""Test that None is returned when row count cannot be returned"""
mock_connector = MagicMock()
database = Database()
database.count_all_load_rows = lambda:0 # simulate an empty load table
database.connect(connector_impl=mock_connector)
connection = mock_connector.connect()
cursor = connection.cursor()
cursor.rowcount = -1
cc_rows = [MagicMock(geo_id='CA', val=1, se=0, sample_size=0)]
result = database.insert_or_update_batch(cc_rows)
self.assertIsNone(result)