diff --git a/safegraph/delphi_safegraph/constants.py b/safegraph/delphi_safegraph/constants.py index 334ca1df1..c2fe606cb 100644 --- a/safegraph/delphi_safegraph/constants.py +++ b/safegraph/delphi_safegraph/constants.py @@ -1,4 +1,4 @@ - +"""Constants for constructing Safegraph indicator.""" HOME_DWELL = 'median_home_dwell_time' COMPLETELY_HOME = 'completely_home_prop' diff --git a/safegraph/delphi_safegraph/geo.py b/safegraph/delphi_safegraph/geo.py index 67969630c..e1b2d8859 100644 --- a/safegraph/delphi_safegraph/geo.py +++ b/safegraph/delphi_safegraph/geo.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Geo location constants for constructing Safegraph indicator.""" # https://code.activestate.com/recipes/577775-state-fips-codes-dict/ STATE_TO_FIPS = { @@ -62,3 +63,4 @@ FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()} +VALID_GEO_RESOLUTIONS = ('county', 'state') diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 0da4be880..f51b1faff 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -1,13 +1,58 @@ -import covidcast +"""Internal functions for creating Safegraph indicator.""" +import datetime +import os +from typing import List import numpy as np import pandas as pd +import covidcast from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK -from .geo import FIPS_TO_STATE +from .geo import FIPS_TO_STATE, VALID_GEO_RESOLUTIONS # Magic number for modular arithmetic; CBG -> FIPS MOD = 10000000 +# Base file name for raw data CSVs. +CSV_NAME = 'social-distancing.csv.gz' + +def validate(df): + """Confirms that a data frame has only one date.""" + timestamps = df['date_range_start'].apply(date_from_timestamp) + assert len(timestamps.unique()) == 1 + + +def date_from_timestamp(timestamp) -> datetime.date: + """Extracts the date from a timestamp beginning with {YYYY}-{MM}-{DD}T.""" + return datetime.date.fromisoformat(timestamp.split('T')[0]) + + +def files_in_past_week(current_filename) -> List[str]: + """Constructs file paths from previous 6 days. + Parameters + ---------- + current_filename: str + name of CSV file. Must be of the form + {path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-{CSV_NAME} + Returns + ------- + List of file names corresponding to the 6 days prior to YYYY-MM-DD. + """ + path, year, month, day, _ = current_filename.rsplit('/', 4) + current_date = datetime.date(int(year), int(month), int(day)) + one_day = datetime.timedelta(days=1) + for _ in range(1, 7): + current_date = current_date - one_day + date_str = current_date.isoformat() + date_path = date_str.replace('-', '/') + new_filename = f'{path}/{date_path}/{date_str}-{CSV_NAME}' + yield new_filename + + +def add_suffix(signals, suffix): + """Adds `suffix` to every element of `signals`.""" + return [s + suffix for s in signals] + + def add_prefix(signal_names, wip_signal, prefix: str): """Adds prefix to signal if there is a WIP signal Parameters @@ -42,7 +87,7 @@ def add_prefix(signal_names, wip_signal, prefix: str): ] raise ValueError("Supply True | False or '' or [] | list()") -# Check if the signal name is public + def public_signal(signal_): """Checks if the signal name is already public using COVIDcast Parameters @@ -89,32 +134,29 @@ def construct_signals(cbg_df, signal_names): """ # Preparation - cbg_df['timestamp'] = cbg_df['date_range_start'].apply( - lambda x: str(x).split('T')[0]) cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply( lambda x: f'{int(x):05d}') # Transformation: create signal not available in raw data for signal in signal_names: - if signal.endswith(FULL_TIME_WORK): + if FULL_TIME_WORK in signal: cbg_df[signal] = (cbg_df['full_time_work_behavior_devices'] / cbg_df['device_count']) - elif signal.endswith(COMPLETELY_HOME): + elif COMPLETELY_HOME in signal: cbg_df[signal] = (cbg_df['completely_home_device_count'] / cbg_df['device_count']) - elif signal.endswith(PART_TIME_WORK): + elif PART_TIME_WORK in signal: cbg_df[signal] = (cbg_df['part_time_work_behavior_devices'] / cbg_df['device_count']) - elif signal.endswith(HOME_DWELL): + elif HOME_DWELL in signal: cbg_df[signal] = (cbg_df['median_home_dwell_time']) - # Subsetting - return cbg_df[['timestamp', 'county_fips'] + signal_names] + return cbg_df[['county_fips'] + signal_names] def aggregate(df, signal_names, geo_resolution='county'): - '''Aggregate signals to appropriate resolution and produce standard errors. + """Aggregate signals to appropriate resolution and produce standard errors. Parameters ---------- df: pd.DataFrame @@ -129,27 +171,22 @@ def aggregate(df, signal_names, geo_resolution='county'): pd.DataFrame: DataFrame with one row per geo_id, with columns for the individual signals, standard errors, and sample sizes. - ''' + """ # Prepare geo resolution - GEO_RESOLUTION = ('county', 'state') if geo_resolution == 'county': df['geo_id'] = df['county_fips'] elif geo_resolution == 'state': df['geo_id'] = df['county_fips'].apply(lambda x: FIPS_TO_STATE[x[:2]]) else: - raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.') + raise ValueError( + f'`geo_resolution` must be one of {VALID_GEO_RESOLUTIONS}.') # Aggregation and signal creation - df_mean = df.groupby(['geo_id', 'timestamp'])[ - signal_names - ].mean() - df_sd = df.groupby(['geo_id', 'timestamp'])[ - signal_names - ].std() - df_n = df.groupby(['geo_id', 'timestamp'])[ - signal_names - ].count() + grouped_df = df.groupby(['geo_id'])[signal_names] + df_mean = grouped_df.mean() + df_sd = grouped_df.std() + df_n = grouped_df.count() agg_df = pd.DataFrame.join(df_mean, df_sd, lsuffix='_mean', rsuffix='_sd') agg_df = pd.DataFrame.join(agg_df, df_n.rename({ @@ -161,35 +198,39 @@ def aggregate(df, signal_names, geo_resolution='county'): return agg_df.reset_index() -def process(fname, signal_names, geo_resolutions, export_dir): - '''Process an input census block group-level CSV and export it. Assumes - that the input file has _only_ one date of data. +def process_window(df_list: List[pd.DataFrame], + signal_names: List[str], + geo_resolutions: List[str], + export_dir: str): + """Processes a list of input census block group-level data frames as a + single data set and exports it. Assumes each data frame has _only_ one + date of data. Parameters ---------- - export_dir - path where the output files are saved - signal_names : List[str] + cbg_df: pd.DataFrame + list of census block group-level frames. + signal_names: List[str] signal names to be processed - fname: str - Input filename. geo_resolutions: List[str] List of geo resolutions to export the data. + export_dir + path where the output files are saved Returns ------- - None - ''' - cbg_df = construct_signals(pd.read_csv(fname), signal_names) - unique_date = cbg_df['timestamp'].unique() - if len(unique_date) != 1: - raise ValueError(f'More than one timestamp found in input file {fname}.') - date = unique_date[0].replace('-', '') + None. One file is written per (signal, resolution) pair containing the + aggregated data from `df`. + """ + for df in df_list: + validate(df) + date = date_from_timestamp(df_list[0].at[0, 'date_range_start']) + cbg_df = pd.concat(construct_signals(df, signal_names) for df in df_list) for geo_res in geo_resolutions: - df = aggregate(cbg_df, signal_names, geo_res) + aggregated_df = aggregate(cbg_df, signal_names, geo_res) for signal in signal_names: - df_export = df[ + df_export = aggregated_df[ ['geo_id'] + [f'{signal}_{x}' for x in ('mean', 'se', 'n')] - ].rename({ + ].rename({ f'{signal}_mean': 'val', f'{signal}_se': 'se', f'{signal}_n': 'sample_size', @@ -197,3 +238,56 @@ def process(fname, signal_names, geo_resolutions, export_dir): df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv', na_rep='NA', index=False, ) + + +def process(current_filename: str, + previous_filenames: List[str], + signal_names: List[str], + wip_signal, + geo_resolutions: List[str], + export_dir: str): + """Creates and exports signals corresponding both to a single day as well + as averaged over the previous week. + Parameters + ---------- + current_filename: str + path to file holding the target date's data. + previous_filenames: List[str] + paths to files holding data from each day in the week preceding the + target date. + signal_names: List[str] + signal names to be processed for a single date. + A second version of each such signal named {SIGNAL}_7d_avg will be + created averaging {SIGNAL} over the past 7 days. + wip_signal : List[str] or bool + a list of wip signals: [], OR + all signals in the registry: True OR + only signals that have never been published: False + geo_resolutions: List[str] + List of geo resolutions to export the data. + export_dir + path where the output files are saved. + Returns + ------- + None. For each (signal, resolution) pair, one file is written for the + single date values to {export_dir}/{date}_{resolution}_{signal}.csv and + one for the data averaged over the previous week to + {export_dir}/{date}_{resolution}_{signal}_7d_avg.csv. + """ + past_week = [pd.read_csv(current_filename)] + for fname in previous_filenames: + if os.path.exists(fname): + past_week.append(pd.read_csv(fname)) + + # First process the current file alone... + process_window(past_week[:1], + add_prefix(signal_names, wip_signal, 'wip_'), + geo_resolutions, + export_dir) + # ...then as part of the whole window. + process_window(past_week, + add_prefix(add_suffix(signal_names, '_7d_avg'), + wip_signal, + 'wip_'), + geo_resolutions, + export_dir) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index a1a59c378..3fa4105af 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -5,15 +5,15 @@ import glob import multiprocessing as mp import subprocess -from functools import partial from delphi_utils import read_params from .constants import SIGNALS, GEO_RESOLUTIONS -from .process import process, add_prefix +from .process import process, files_in_past_week -def run_module(): +def run_module(): + """Creates the Safegraph indicator.""" params = read_params() export_dir = params["export_dir"] raw_data_dir = params["raw_data_dir"] @@ -24,11 +24,22 @@ def run_module(): aws_endpoint = params["aws_endpoint"] wip_signal = params["wip_signal"] - process_file = partial(process, - signal_names=add_prefix(SIGNALS, wip_signal, prefix='wip_'), - geo_resolutions=GEO_RESOLUTIONS, - export_dir=export_dir, - ) + def process_file(current_filename): + """Wrapper around `process()` that only takes a single argument. + + A single argument function is necessary to use `pool.map()` below. + Because each call to `process()` has two arguments that are dependent + on the input file name (`current_filename` and `previous_filenames`), + we choose to use this wrapper rather than something like + `functools.partial()`. + """ + return process(current_filename, + files_in_past_week(current_filename), + signal_names=SIGNALS, + wip_signal=wip_signal, + geo_resolutions=GEO_RESOLUTIONS, + export_dir=export_dir, + ) # Update raw data # Why call subprocess rather than using a native Python client, e.g. boto3? @@ -43,6 +54,7 @@ def run_module(): 'AWS_DEFAULT_REGION': aws_default_region, }, shell=True, + check=True, ) files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz', diff --git a/safegraph/tests/params.json.template b/safegraph/tests/params.json.template index 7e6096821..6839ac4e6 100644 --- a/safegraph/tests/params.json.template +++ b/safegraph/tests/params.json.template @@ -8,5 +8,8 @@ "aws_secret_access_key": "", "aws_default_region": "", "aws_endpoint": "", - "wip_signal" : "" + "wip_signal" : ["median_home_dwell_time_7d_avg", + "completely_home_prop_7d_avg", + "part_time_work_prop_7d_avg", + "full_time_work_prop_7d_avg"] } diff --git a/safegraph/tests/raw_data/small_raw_data_0.csv b/safegraph/tests/raw_data/small_raw_data_0.csv new file mode 100644 index 000000000..deae4aafc --- /dev/null +++ b/safegraph/tests/raw_data/small_raw_data_0.csv @@ -0,0 +1,4 @@ +origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices +10539707003,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,15,6,35,45 +131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,5,5 +131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,6,4,6,6 diff --git a/safegraph/tests/raw_data/small_raw_data_1.csv b/safegraph/tests/raw_data/small_raw_data_1.csv new file mode 100644 index 000000000..f59a1652e --- /dev/null +++ b/safegraph/tests/raw_data/small_raw_data_1.csv @@ -0,0 +1,2 @@ +origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices +10730144081,2020-06-11T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,15,25 \ No newline at end of file diff --git a/safegraph/tests/raw_data/small_raw_data_3.csv b/safegraph/tests/raw_data/small_raw_data_3.csv new file mode 100644 index 000000000..ad3004d97 --- /dev/null +++ b/safegraph/tests/raw_data/small_raw_data_3.csv @@ -0,0 +1,3 @@ +origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices +420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,10,7,20,30 +420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,20,8,30,40 \ No newline at end of file diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index 0f6fab3fe..e6d6ddc0e 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -1,56 +1,64 @@ -import pytest - -from os import listdir -from os.path import join - +"""Tests for Safegraph process functions.""" import numpy as np import pandas as pd + from delphi_safegraph.process import ( - construct_signals, + add_prefix, aggregate, - add_prefix + construct_signals, + files_in_past_week, + process, + process_window ) from delphi_safegraph.run import SIGNALS -from delphi_utils import read_params -signal_names = SIGNALS class TestProcess: + """Tests for processing Safegraph indicators.""" + def test_construct_signals_present(self): + """Tests that all signals are constructed.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) + SIGNALS) assert 'completely_home_prop' in set(cbg_df.columns) assert 'full_time_work_prop' in set(cbg_df.columns) assert 'part_time_work_prop' in set(cbg_df.columns) assert 'median_home_dwell_time' in set(cbg_df.columns) def test_construct_signals_proportions(self): + """Tests that constructed signals are actual proportions.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) + SIGNALS) assert np.all(cbg_df['completely_home_prop'].values <= 1) assert np.all(cbg_df['full_time_work_prop'].values <= 1) assert np.all(cbg_df['part_time_work_prop'].values <= 1) def test_aggregate_county(self): + """Tests that aggregation at the county level creates non-zero-valued + signals.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) - df = aggregate(cbg_df, signal_names, 'county') + SIGNALS) + df = aggregate(cbg_df, SIGNALS, 'county') - assert np.all(df[f'{signal_names[0]}_n'].values > 0) - x = df[f'{signal_names[0]}_se'].values + assert np.all(df[f'{SIGNALS[0]}_n'].values > 0) + x = df[f'{SIGNALS[0]}_se'].values assert np.all(x[~np.isnan(x)] >= 0) def test_aggregate_state(self): + """Tests that aggregation at the state level creates non-zero-valued + signals.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) - df = aggregate(cbg_df, signal_names, 'state') + SIGNALS) + df = aggregate(cbg_df, SIGNALS, 'state') - assert np.all(df[f'{signal_names[0]}_n'].values > 0) - x = df[f'{signal_names[0]}_se'].values + assert np.all(df[f'{SIGNALS[0]}_n'].values > 0) + x = df[f'{SIGNALS[0]}_se'].values assert np.all(x[~np.isnan(x)] >= 0) def test_handle_wip_signal(self): + """Tests that `add_prefix()` derives work-in-progress signals.""" # Test wip_signal = True + signal_names = SIGNALS signal_names = add_prefix(SIGNALS, True, prefix="wip_") assert all(s.startswith("wip_") for s in signal_names) # Test wip_signal = list @@ -62,5 +70,136 @@ def test_handle_wip_signal(self): assert signal_names[0].startswith("wip_") assert all(not s.startswith("wip_") for s in signal_names[1:]) + def test_files_in_past_week(self): + """Tests that `files_in_past_week()` finds the file names corresponding + to the previous 6 days.""" + # Week that stretches over a month boundary. + assert tuple(files_in_past_week( + 'x/y/z/2020/07/04/2020-07-04-social-distancing.csv.gz')) ==\ + ('x/y/z/2020/07/03/2020-07-03-social-distancing.csv.gz', + 'x/y/z/2020/07/02/2020-07-02-social-distancing.csv.gz', + 'x/y/z/2020/07/01/2020-07-01-social-distancing.csv.gz', + 'x/y/z/2020/06/30/2020-06-30-social-distancing.csv.gz', + 'x/y/z/2020/06/29/2020-06-29-social-distancing.csv.gz', + 'x/y/z/2020/06/28/2020-06-28-social-distancing.csv.gz') + # Week that stretches over a year boundary. + assert tuple(files_in_past_week( + 'x/y/z/2020/01/04/2020-01-04-social-distancing.csv.gz')) ==\ + ('x/y/z/2020/01/03/2020-01-03-social-distancing.csv.gz', + 'x/y/z/2020/01/02/2020-01-02-social-distancing.csv.gz', + 'x/y/z/2020/01/01/2020-01-01-social-distancing.csv.gz', + 'x/y/z/2019/12/31/2019-12-31-social-distancing.csv.gz', + 'x/y/z/2019/12/30/2019-12-30-social-distancing.csv.gz', + 'x/y/z/2019/12/29/2019-12-29-social-distancing.csv.gz') + # Week that includes a leap day. + assert tuple(files_in_past_week( + 'x/y/z/2020/03/01/2020-03-01-social-distancing.csv.gz')) ==\ + ('x/y/z/2020/02/29/2020-02-29-social-distancing.csv.gz', + 'x/y/z/2020/02/28/2020-02-28-social-distancing.csv.gz', + 'x/y/z/2020/02/27/2020-02-27-social-distancing.csv.gz', + 'x/y/z/2020/02/26/2020-02-26-social-distancing.csv.gz', + 'x/y/z/2020/02/25/2020-02-25-social-distancing.csv.gz', + 'x/y/z/2020/02/24/2020-02-24-social-distancing.csv.gz') + + def test_process_window(self, tmp_path): + """Tests that processing over a window correctly aggregates signals.""" + export_dir = tmp_path / 'export' + export_dir.mkdir() + df1 = pd.DataFrame(data={ + 'date_range_start': ['2020-02-14T00:00:00-05:00:00']*3, + 'origin_census_block_group': [10539707003, + 10539707003, + 10730144081], + 'device_count': [100, 200, 1000], + 'completely_home_device_count': [2, 12, 40] + }) + df2 = pd.DataFrame(data={ + 'date_range_start': ['2020-02-14T00:00:00-05:00:00'], + 'origin_census_block_group': [10730144081], + 'device_count': [2000], + 'completely_home_device_count': [480] + }) + process_window([df1, df2], ['completely_home_prop'], ['county'], + export_dir) + expected = pd.DataFrame(data={ + 'geo_id': [1053, 1073], + 'val': [0.04, 0.14], + 'se': [0.02, 0.10], + 'sample_size': [2, 2] + }) + actual = pd.read_csv( + export_dir / '2020-02-14_county_completely_home_prop.csv') + pd.testing.assert_frame_equal(expected, actual) + + def test_process(self, tmp_path): + """Tests that processing a list of current and previous file names + correctly reads and aggregates signals.""" + export_dir = tmp_path / 'export' + export_dir.mkdir() + process('raw_data/small_raw_data_0.csv', + # File 2 does not exist. + ['raw_data/small_raw_data_1.csv', + 'raw_data/small_raw_data_2.csv', + 'raw_data/small_raw_data_3.csv', ], + SIGNALS, + ['median_home_dwell_time', + 'completely_home_prop_7d_avg'], + ['state'], + export_dir) + expected = { + 'wip_median_home_dwell_time': pd.DataFrame(data={ + 'geo_id': ['al', 'ga'], + 'val': [6, 3.5], + 'se': [None, 0.5], + 'sample_size': [1, 2] + }), + 'completely_home_prop': pd.DataFrame(data={ + 'geo_id': ['al', 'ga'], + 'val': [0.15, 0.055], + 'se': [None, 0.005], + 'sample_size': [1, 2] + }), + 'part_time_work_prop': pd.DataFrame(data={ + 'geo_id': ['al', 'ga'], + 'val': [0.35, 0.055], + 'se': [None, 0.005], + 'sample_size': [1, 2] + }), + 'full_time_work_prop': pd.DataFrame(data={ + 'geo_id': ['al', 'ga'], + 'val': [0.45, 0.055], + 'se': [None, 0.005], + 'sample_size': [1, 2] + }), + 'median_home_dwell_time_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [4.5, 3.5, 7.5], + 'se': [1.5, 0.5, 0.5], + 'sample_size': [2, 2, 2] + }), + 'wip_completely_home_prop_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [0.1, 0.055, 0.15], + 'se': [0.05, 0.005, 0.05], + 'sample_size': [2, 2, 2] + }), + 'part_time_work_prop_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [0.25, 0.055, 0.25], + 'se': [0.1, 0.005, 0.05], + 'sample_size': [2, 2, 2] + }), + 'full_time_work_prop_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [0.35, 0.055, 0.35], + 'se': [0.1, 0.005, 0.05], + 'sample_size': [2, 2, 2] + }) + } + actual = {signal: pd.read_csv( + export_dir / f'2020-06-12_state_{signal}.csv') + for signal in expected} + for signal in expected: + pd.testing.assert_frame_equal(expected[signal], actual[signal])