diff --git a/changehc/delphi_changehc/config.py b/changehc/delphi_changehc/config.py index 217e81daa..07f3dde8b 100644 --- a/changehc/delphi_changehc/config.py +++ b/changehc/delphi_changehc/config.py @@ -6,7 +6,6 @@ """ from datetime import datetime, timedelta -import numpy as np class Config: diff --git a/changehc/delphi_changehc/constants.py b/changehc/delphi_changehc/constants.py index 107fd49d3..9a2cb29d4 100644 --- a/changehc/delphi_changehc/constants.py +++ b/changehc/delphi_changehc/constants.py @@ -1,6 +1,6 @@ """Registry for signal names and geo types""" -SMOOTHED = "smoothed_chc" -SMOOTHED_ADJ = "smoothed_adj_chc" +SMOOTHED = "smoothed_cli" +SMOOTHED_ADJ = "smoothed_adj_cli" SIGNALS = [SMOOTHED, SMOOTHED_ADJ] NA = "NA" HRR = "hrr" diff --git a/changehc/delphi_changehc/download_ftp_files.py b/changehc/delphi_changehc/download_ftp_files.py new file mode 100644 index 000000000..ec2382004 --- /dev/null +++ b/changehc/delphi_changehc/download_ftp_files.py @@ -0,0 +1,73 @@ +""" +Downloads files modified in the last 24 hours from the specified ftp server.""" + +# standard +import datetime +import functools +from os import path + +# third party +import paramiko + + +def print_callback(filename, bytes_so_far, bytes_total): + """Log file transfer progress""" + rough_percent_transferred = int(100 * (bytes_so_far / bytes_total)) + if (rough_percent_transferred % 25) == 0: + print(f'{filename} transfer: {rough_percent_transferred}%') + + +def get_files_from_dir(sftp, out_path): + """Download files from sftp server that have been uploaded in last day + Args: + sftp: SFTP Session from Paramiko client + out_path: Path to local directory into which to download the files + """ + + current_time = datetime.datetime.now() + + # go through files in recieving dir + filepaths_to_download = {} + for fileattr in sftp.listdir_attr(): + file_time = datetime.datetime.fromtimestamp(fileattr.st_mtime) + filename = fileattr.filename + if current_time - file_time < datetime.timedelta(days=1) and \ + not path.exists(path.join(out_path, filename)): + filepaths_to_download[filename] = path.join(out_path, filename) + + # make sure we don't download more than 2 files per day + assert len(filepaths_to_download) <= 2, "more files dropped than expected" + + # download! + for infile, outfile in filepaths_to_download.items(): + callback_for_filename = functools.partial(print_callback, infile) + sftp.get(infile, outfile, callback=callback_for_filename) + + +def download(out_path, ftp_conn): + """Downloads files necessary to create CHC signal from ftp server. + Args: + out_path: Path to local directory into which to download the files + ftp_conn: Dict containing login credentials to ftp server + """ + + # open client + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + client.connect(ftp_conn["host"], username=ftp_conn["user"], + password=ftp_conn["pass"][1:] + ftp_conn["pass"][0], + port=ftp_conn["port"], + allow_agent=False, look_for_keys=False) + sftp = client.open_sftp() + + sftp.chdir('/dailycounts/All_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + sftp.chdir('/dailycounts/Covid_Outpatients_By_County') + get_files_from_dir(sftp, out_path) + + finally: + if client: + client.close() diff --git a/changehc/delphi_changehc/run.py b/changehc/delphi_changehc/run.py index a85a9cf3b..97dce3f6c 100644 --- a/changehc/delphi_changehc/run.py +++ b/changehc/delphi_changehc/run.py @@ -14,6 +14,7 @@ from delphi_utils import read_params # first party +from .download_ftp_files import download from .update_sensor import CHCSensorUpdator @@ -25,11 +26,15 @@ def run_module(): logging.basicConfig(level=logging.DEBUG) + ## download recent files from FTP server + logging.info("downloading recent files through SFTP") + download(params["cache_dir"], params["ftp_conn"]) + ## get end date from input file # the filenames are expected to be in the format: # Denominator: "YYYYMMDD_All_Outpatients_By_County.dat.gz" # Numerator: "YYYYMMDD_Covid_Outpatients_By_County.dat.gz" - + if params["drop_date"] is None: dropdate_denom = datetime.strptime( Path(params["input_denom_file"]).name.split("_")[0], "%Y%m%d" diff --git a/changehc/delphi_changehc/sensor.py b/changehc/delphi_changehc/sensor.py index bef8869e7..7a625e4fe 100644 --- a/changehc/delphi_changehc/sensor.py +++ b/changehc/delphi_changehc/sensor.py @@ -129,7 +129,7 @@ def fit(y_data, first_sensor_date, geo_id, num_col="num", den_col="den"): # checks - due to the smoother, the first value will be NA assert ( - np.sum(np.isnan(smoothed_total_rates[1:]) == True) == 0 + np.sum(np.isnan(smoothed_total_rates[1:])) == 0 ), "NAs in rate calculation" assert ( np.sum(smoothed_total_rates[1:] <= 0) == 0 diff --git a/changehc/delphi_changehc/update_sensor.py b/changehc/delphi_changehc/update_sensor.py index 2f2efb169..91b44b36e 100644 --- a/changehc/delphi_changehc/update_sensor.py +++ b/changehc/delphi_changehc/update_sensor.py @@ -5,7 +5,6 @@ """ # standard packages import logging -from datetime import timedelta from multiprocessing import Pool, cpu_count import covidcast from delphi_utils import GeoMapper, S3ArchiveDiffer, read_params @@ -18,7 +17,7 @@ from .load_data import load_combined_data from .sensor import CHCSensor from .weekday import Weekday -from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, HRR, NA, FIPS +from .constants import SIGNALS, SMOOTHED, SMOOTHED_ADJ, NA def write_to_csv(output_dict, write_se, out_name, output_path="."): @@ -123,6 +122,8 @@ def public_signal(signal_): class CHCSensorUpdator: + """Contains methods to update sensor and write results to csv + """ def __init__(self, startdate, @@ -181,10 +182,10 @@ def geo_reindex(self, data): # get right geography geo = self.geo gmpr = GeoMapper() - if geo not in {"county", "state", "msa", "hrr"}: + if geo not in {"county", "state", "msa", "hrr"}: logging.error(f"{geo} is invalid, pick one of 'county', 'state', 'msa', 'hrr'") return False - elif geo == "county": + if geo == "county": data_frame = gmpr.fips_to_megacounty(data,Config.MIN_DEN,Config.MAX_BACKFILL_WINDOW,thr_col="den",mega_col=geo) elif geo == "state": data_frame = gmpr.replace_geocode(data, "fips", "state_id", new_col="state") @@ -305,4 +306,3 @@ def update_sensor(self, for exported_file in fails: print(f"Failed to archive '{exported_file}'") ''' - return diff --git a/changehc/params.json.template b/changehc/params.json.template index 68f806fec..d2aae255b 100644 --- a/changehc/params.json.template +++ b/changehc/params.json.template @@ -18,5 +18,11 @@ "aws_access_key_id": "", "aws_secret_access_key": "" }, - "bucket_name": "" -} \ No newline at end of file + "bucket_name": "", + "ftp_conn": { + "host": "", + "user": "", + "pass": "", + "port": 0 + } +} diff --git a/changehc/setup.py b/changehc/setup.py index 3a8521fd7..17302c243 100644 --- a/changehc/setup.py +++ b/changehc/setup.py @@ -11,7 +11,8 @@ "delphi-utils", "covidcast", "boto3", - "moto" + "moto", + "paramiko" ] setup( diff --git a/changehc/tests/params.json.template b/changehc/tests/params.json.template index 3702a8a5e..68c16e4be 100644 --- a/changehc/tests/params.json.template +++ b/changehc/tests/params.json.template @@ -2,8 +2,8 @@ "static_file_dir": "../static", "export_dir": "./receiving", "cache_dir": "./cache", - "input_emr_file": "test_data/SYNICUE_CMB_INPATIENT_11062020.csv.gz", - "input_claims_file": "test_data/SYNEDI_AGG_INPATIENT_11062020_1451CDT.csv.gz", + "input_denom_file": "test_data/20200601_All_Outpatients_By_County.dat", + "input_covid_file": "test_data/20200601_Covid_Outpatients_By_County.dat", "start_date": "2020-02-01", "end_date": "2020-02-02", "drop_date": "2020-02-02", @@ -19,4 +19,4 @@ "aws_secret_access_key": "FAKE_TEST_SECRET_ACCESS_KEY" }, "bucket_name": "test_bucket" -} \ No newline at end of file +} diff --git a/changehc/tests/test_download_ftp_files.py b/changehc/tests/test_download_ftp_files.py new file mode 100644 index 000000000..5808b4882 --- /dev/null +++ b/changehc/tests/test_download_ftp_files.py @@ -0,0 +1,63 @@ +# standard +import pytest +import mock +from datetime import datetime as dt +from datetime import timedelta + +# first party +from delphi_changehc.download_ftp_files import * + +class TestDownloadFTPFiles: + + class MockSFTP: + + # Mocks an SFTP connection + def __init__(self, attrs): + self.attrs = attrs + self.num_gets = 0 + + # Attrs are modified time and filename + def listdir_attr(self): + return self.attrs + + # Don't download anything, just note that method was called + def get(self, infile, outfile, callback=None): + self.num_gets += 1 + return + + + class FileAttr: + + def __init__(self, time, name): + self.st_mtime = time + self.filename = name + + + @mock.patch("os.path") + def test_get_files(self, mock_path): + + # When one new file is present, one file is downloaded + one_new = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo")]) + get_files_from_dir(one_new, "") + assert one_new.num_gets == 1 + + # When one new file and one old file are present, one file is downloaded + one_new_one_old = self.MockSFTP([self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo"), + self.FileAttr(dt.timestamp(dt.now()-timedelta(days=10)),"foo")]) + get_files_from_dir(one_new_one_old, "") + assert one_new_one_old.num_gets == 1 + + # When three new files are present, AssertionError + new_file1 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo1") + new_file2 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo2") + new_file3 = self.FileAttr(dt.timestamp(dt.now()-timedelta(minutes=1)),"foo3") + three_new = self.MockSFTP([new_file1, new_file2, new_file3]) + with pytest.raises(AssertionError): + get_files_from_dir(three_new,"") + + # When the file already exists, no files are downloaded + mock_path.exists.return_value = True + one_exists = self.MockSFTP([new_file1]) + get_files_from_dir(one_new, "") + assert one_exists.num_gets == 0 + diff --git a/changehc/tests/test_update_sensor.py b/changehc/tests/test_update_sensor.py index 1ca3e7880..b1329543f 100644 --- a/changehc/tests/test_update_sensor.py +++ b/changehc/tests/test_update_sensor.py @@ -268,7 +268,6 @@ def test_handle_wip_signal(self): assert signal_names[0].startswith("wip_") assert all(not s.startswith("wip_") for s in signal_names[1:]) # Test wip_signal = False (only unpublished signals should receive prefix) - # No CHC signal is published now, so both should get prefix signal_names = add_prefix(["xyzzy", SIGNALS[0]], False) assert signal_names[0].startswith("wip_") - assert all(s.startswith("wip_") for s in signal_names[1:]) + assert all(not s.startswith("wip_") for s in signal_names[1:])