-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathrun.py
103 lines (85 loc) · 3.4 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# -*- coding: utf-8 -*-
"""Functions to call when running the function.
This module should contain a function called `run_module`, that is executed
when the module is run with `python -m delphi_changehc`.
"""
# standard packages
import logging
from datetime import datetime, timedelta
from pathlib import Path
# third party
from delphi_utils import read_params
# first party
from .download_ftp_files import download
from .update_sensor import CHCSensorUpdator
def run_module():
"""Run the delphi_changehc module.
"""
params = read_params()
logging.basicConfig(level=logging.DEBUG)
## download recent files from FTP server
logging.info("downloading recent files through SFTP")
download(params["cache_dir"], params["ftp_conn"])
## get end date from input file
# the filenames are expected to be in the format:
# Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz"
# Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz"
if params["drop_date"] is None:
dropdate_denom = datetime.strptime(
Path(params["input_denom_file"]).name.split("_")[0], "%Y%m%d"
)
dropdate_covid = datetime.strptime(
Path(params["input_covid_file"]).name.split("_")[0], "%Y%m%d"
)
assert dropdate_denom == dropdate_covid, "different drop dates for data files"
dropdate_dt = dropdate_denom
else:
dropdate_dt = datetime.strptime(params["drop_date"], "%Y-%m-%d")
dropdate = str(dropdate_dt.date())
# range of estimates to produce
n_backfill_days = params["n_backfill_days"] # produce estimates for n_backfill_days
n_waiting_days = params["n_waiting_days"] # most recent n_waiting_days won't be est
enddate_dt = dropdate_dt - timedelta(days=n_waiting_days)
startdate_dt = enddate_dt - timedelta(days=n_backfill_days)
enddate = str(enddate_dt.date())
startdate = str(startdate_dt.date())
# now allow manual overrides
if params["end_date"] is not None:
enddate = params["end_date"]
if params["start_date"] is not None:
startdate = params["start_date"]
logging.info("first sensor date:\t%s", startdate)
logging.info("last sensor date:\t%s", enddate)
logging.info("drop date:\t\t%s", dropdate)
logging.info("n_backfill_days:\t%s", n_backfill_days)
logging.info("n_waiting_days:\t%s", n_waiting_days)
## print out other vars
logging.info("geos:\t\t\t%s", params["geos"])
logging.info("outpath:\t\t%s", params["export_dir"])
logging.info("parallel:\t\t%s", params["parallel"])
logging.info("weekday:\t\t%s", params["weekday"])
logging.info("se:\t\t\t%s", params["se"])
## start generating
for geo in params["geos"]:
for weekday in params["weekday"]:
if weekday:
logging.info("starting %s, weekday adj", geo)
else:
logging.info("starting %s, no adj", geo)
su_inst = CHCSensorUpdator(
startdate,
enddate,
dropdate,
geo,
params["parallel"],
weekday,
params["se"]
)
su_inst.update_sensor(
params["input_denom_file"],
params["input_covid_file"],
params["export_dir"],
params["static_file_dir"]
)
logging.info("finished %s", geo)
logging.info("finished all")