-
Notifications
You must be signed in to change notification settings - Fork 68
/
Copy pathflusurv_update.py
193 lines (165 loc) · 6.58 KB
/
flusurv_update.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
"""
===============
=== Purpose ===
===============
Stores FluSurv-NET data (flu hospitaliation rates) from CDC.
Note that the flusurv age groups are, in general, not the same as the ILINet
(fluview) age groups. However, the following groups are equivalent:
- flusurv age_0 == fluview age_0 (0-4 years)
- flusurv age_3 == fluview age_4 (50-64 years)
- flusurv age_4 == fluview age_5 (65+ years)
See also:
- flusurv.py
=======================
=== Data Dictionary ===
=======================
`flusurv` is the table where US flu hospitalization rates are stored.
+--------------+-------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+--------------+-------------+------+-----+---------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| release_date | date | NO | MUL | NULL | |
| issue | int(11) | NO | MUL | NULL | |
| epiweek | int(11) | NO | MUL | NULL | |
| location | varchar(32) | NO | MUL | NULL | |
| lag | int(11) | NO | MUL | NULL | |
| rate_age_0 | double | YES | | NULL | |
| rate_age_1 | double | YES | | NULL | |
| rate_age_2 | double | YES | | NULL | |
| rate_age_3 | double | YES | | NULL | |
| rate_age_4 | double | YES | | NULL | |
| rate_overall | double | YES | | NULL | |
| rate_age_5 | double | YES | | NULL | |
| rate_age_6 | double | YES | | NULL | |
| rate_age_7 | double | YES | | NULL | |
+--------------+-------------+------+-----+---------+----------------+
id: unique identifier for each record
release_date: the date when this record was first published by the CDC
issue: the epiweek of publication (e.g. issue 201453 includes epiweeks up to
and including 2014w53, but not 2015w01 or following)
epiweek: the epiweek during which the data was collected
location: the name of the catchment (e.g. 'network_all', 'CA', 'NY_albany')
lag: number of weeks between `epiweek` and `issue`
rate_age_0: hospitalization rate for ages 0-4
rate_age_1: hospitalization rate for ages 5-17
rate_age_2: hospitalization rate for ages 18-49
rate_age_3: hospitalization rate for ages 50-64
rate_age_4: hospitalization rate for ages 65+
rate_overall: overall hospitalization rate
rate_age_5: hospitalization rate for ages 65-74
rate_age_6: hospitalization rate for ages 75-84
rate_age_7: hospitalization rate for ages 85+
=================
=== Changelog ===
=================
2017-05-22
* update for new data source
2017-05-17
* infer field `issue` from current date
2017-02-03
+ initial version
"""
# standard library
import argparse
# third party
import mysql.connector
# first party
from delphi.epidata.acquisition.flusurv import flusurv
import delphi.operations.secrets as secrets
from delphi.utils.epidate import EpiDate
from delphi.utils.epiweek import delta_epiweeks
def get_rows(cur):
"""Return the number of rows in the `flusurv` table."""
# count all rows
cur.execute("SELECT count(1) `num` FROM `flusurv`")
for (num,) in cur:
return num
def update(issue, location_name, test_mode=False):
"""Fetch and store the currently avialble weekly FluSurv dataset."""
# fetch data
location_code = flusurv.location_codes[location_name]
print("fetching data for", location_name, location_code)
data = flusurv.get_data(location_code)
# metadata
epiweeks = sorted(data.keys())
location = location_name
release_date = str(EpiDate.today())
# connect to the database
u, p = secrets.db.epi
cnx = mysql.connector.connect(host=secrets.db.host, user=u, password=p, database="epidata")
cur = cnx.cursor()
rows1 = get_rows(cur)
print(f"rows before: {int(rows1)}")
# SQL for insert/update
sql = """
INSERT INTO `flusurv` (
`release_date`, `issue`, `epiweek`, `location`, `lag`, `rate_age_0`,
`rate_age_1`, `rate_age_2`, `rate_age_3`, `rate_age_4`, `rate_overall`,
`rate_age_5`, `rate_age_6`, `rate_age_7`
)
VALUES (
%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON DUPLICATE KEY UPDATE
`release_date` = least(`release_date`, %s),
`rate_age_0` = coalesce(%s, `rate_age_0`),
`rate_age_1` = coalesce(%s, `rate_age_1`),
`rate_age_2` = coalesce(%s, `rate_age_2`),
`rate_age_3` = coalesce(%s, `rate_age_3`),
`rate_age_4` = coalesce(%s, `rate_age_4`),
`rate_overall` = coalesce(%s, `rate_overall`),
`rate_age_5` = coalesce(%s, `rate_age_5`),
`rate_age_6` = coalesce(%s, `rate_age_6`),
`rate_age_7` = coalesce(%s, `rate_age_7`)
"""
# insert/update each row of data (one per epiweek)
for epiweek in epiweeks:
lag = delta_epiweeks(epiweek, issue)
if lag > 52:
# Ignore values older than one year, as (1) they are assumed not to
# change, and (2) it would adversely affect database performance if all
# values (including duplicates) were stored on each run.
continue
args_meta = [release_date, issue, epiweek, location, lag]
args_insert = data[epiweek]
args_update = [release_date] + data[epiweek]
cur.execute(sql, tuple(args_meta + args_insert + args_update))
# commit and disconnect
rows2 = get_rows(cur)
print(f"rows after: {int(rows2)} (+{int(rows2 - rows1)})")
cur.close()
if test_mode:
print("test mode: not committing database changes")
else:
cnx.commit()
cnx.close()
def main():
# args and usage
parser = argparse.ArgumentParser()
# fmt: off
parser.add_argument(
"location",
help='location for which data should be scraped (e.g. "CA" or "all")'
)
parser.add_argument(
"--test",
"-t",
default=False,
action="store_true",
help="do not commit database changes"
)
# fmt: on
args = parser.parse_args()
# scrape current issue from the main page
issue = flusurv.get_current_issue()
print(f"current issue: {int(issue)}")
# fetch flusurv data
if args.location == "all":
# all locations
for location in flusurv.location_codes.keys():
update(issue, location, args.test)
else:
# single location
update(issue, args.location, args.test)
if __name__ == "__main__":
main()