From 6a616c49f720e748dcaf31ce9063b0299a5ae26d Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 12 Oct 2020 10:23:13 -0400 Subject: [PATCH 01/18] refactor safegraph.process to pave the way for multifile processing --- safegraph/delphi_safegraph/process.py | 123 ++++++++++++++++++++------ safegraph/tests/test_process.py | 22 ++--- 2 files changed, 106 insertions(+), 39 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 0da4be880..10e86157b 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -1,6 +1,9 @@ -import covidcast +import datetime +import os +from typing import List import numpy as np import pandas as pd +import covidcast from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK from .geo import FIPS_TO_STATE @@ -8,6 +11,51 @@ # Magic number for modular arithmetic; CBG -> FIPS MOD = 10000000 + +def date_from_fname(fname) -> datetime.date: + _, year, month, day, __ = fname.rsplit('/', 4) + return datetime.date(int(year), int(month), int(day)) + + +def date_from_timestamp(timestamp) -> datetime.date: + return datetime.date.fromisoformat(timestamp.split('T')[0]) + + +def read_and_validate_file(fname) -> pd.DataFrame: + df = pd.read_csv(fname) + unique_date = df['timestamp'].unique() + if len(unique_date) != 1: + raise ValueError( + f'More than one timestamp found in input file {fname}.') + # assert date_from_timestamp(unique_date[0]) == date_from_fname(fname),\ + # f'The name of input file {fname} does not correspond to the date'\ + # 'contained inside.' + return df + + +def files_in_past_week(current_filename) -> List[str]: + """Constructs file paths from previous 6 days. + Parameters + ---------- + current_filename: str + name of CSV file. Must be of the form + {path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-social-distancing.csv.gz + Returns + ------- + List of file names corresponding to the 6 days prior to YYYY-MM-DD. + """ + path, year, month, day, _ = current_filename.rsplit('/', 4) + current_date = datetime.date(int(year), int(month), int(day)) + one_day = datetime.timedelta(days=1) + for _ in range(1, 7): + current_date = current_date - one_day + y, m, d = (current_date.year, current_date.month, current_date.day) + new_filename = f'{path}/{y}/{m}/{d}/{current_date.isoformat()}-'\ + 'social-distancing.csv.gz' + if os.path.exists(new_filename): + yield new_filename + + def add_prefix(signal_names, wip_signal, prefix: str): """Adds prefix to signal if there is a WIP signal Parameters @@ -42,7 +90,7 @@ def add_prefix(signal_names, wip_signal, prefix: str): ] raise ValueError("Supply True | False or '' or [] | list()") -# Check if the signal name is public + def public_signal(signal_): """Checks if the signal name is already public using COVIDcast Parameters @@ -89,8 +137,6 @@ def construct_signals(cbg_df, signal_names): """ # Preparation - cbg_df['timestamp'] = cbg_df['date_range_start'].apply( - lambda x: str(x).split('T')[0]) cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply( lambda x: f'{int(x):05d}') @@ -108,9 +154,8 @@ def construct_signals(cbg_df, signal_names): elif signal.endswith(HOME_DWELL): cbg_df[signal] = (cbg_df['median_home_dwell_time']) - # Subsetting - return cbg_df[['timestamp', 'county_fips'] + signal_names] + return cbg_df[['county_fips'] + signal_names] def aggregate(df, signal_names, geo_resolution='county'): @@ -141,15 +186,10 @@ def aggregate(df, signal_names, geo_resolution='county'): raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.') # Aggregation and signal creation - df_mean = df.groupby(['geo_id', 'timestamp'])[ - signal_names - ].mean() - df_sd = df.groupby(['geo_id', 'timestamp'])[ - signal_names - ].std() - df_n = df.groupby(['geo_id', 'timestamp'])[ - signal_names - ].count() + grouped_df = df.groupby(['geo_id'])[signal_names] + df_mean = grouped_df.mean() + df_sd = grouped_df.std() + df_n = grouped_df.count() agg_df = pd.DataFrame.join(df_mean, df_sd, lsuffix='_mean', rsuffix='_sd') agg_df = pd.DataFrame.join(agg_df, df_n.rename({ @@ -161,8 +201,8 @@ def aggregate(df, signal_names, geo_resolution='county'): return agg_df.reset_index() -def process(fname, signal_names, geo_resolutions, export_dir): - '''Process an input census block group-level CSV and export it. Assumes +def process_single_date(df, signal_names, geo_resolutions, export_dir): + """Process an input census block group-level CSV and export it. Assumes that the input file has _only_ one date of data. Parameters ---------- @@ -170,26 +210,42 @@ def process(fname, signal_names, geo_resolutions, export_dir): path where the output files are saved signal_names : List[str] signal names to be processed - fname: str - Input filename. + cbg_df: pd.DataFrame + census block group-level CSV. geo_resolutions: List[str] List of geo resolutions to export the data. Returns ------- None - ''' - cbg_df = construct_signals(pd.read_csv(fname), signal_names) - unique_date = cbg_df['timestamp'].unique() - if len(unique_date) != 1: - raise ValueError(f'More than one timestamp found in input file {fname}.') - date = unique_date[0].replace('-', '') + """ + date = date_from_timestamp(df.at[0, 'timestamp']) + cbg_df = construct_signals(df, signal_names) + for geo_res in geo_resolutions: + aggregated_df = aggregate(cbg_df, signal_names, geo_res) + for signal in signal_names: + df_export = aggregated_df[ + ['geo_id'] + + [f'{signal}_{x}' for x in ('mean', 'se', 'n')] + ].rename({ + f'{signal}_mean': 'val', + f'{signal}_se': 'se', + f'{signal}_n': 'sample_size', + }, axis=1) + df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv', + na_rep='NA', + index=False, ) + + +def process_windowed_average(df_list, signal_names, geo_resolutions, export_dir): + date = date_from_timestamp(df_list[0].at[0, 'timestamp']) + cbg_df = pd.concat(construct_signals(df, signal_names) for df in df_list) for geo_res in geo_resolutions: - df = aggregate(cbg_df, signal_names, geo_res) + aggregated_df = aggregate(cbg_df, signal_names, geo_res) for signal in signal_names: - df_export = df[ + df_export = aggregated_df[ ['geo_id'] + [f'{signal}_{x}' for x in ('mean', 'se', 'n')] - ].rename({ + ].rename({ f'{signal}_mean': 'val', f'{signal}_se': 'se', f'{signal}_n': 'sample_size', @@ -197,3 +253,14 @@ def process(fname, signal_names, geo_resolutions, export_dir): df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv', na_rep='NA', index=False, ) + + +def process(fname, signal_names, geo_resolutions, export_dir): + past_week = [pd.read_csv(fname)] + # past_week.extend(pd.read_csv(f) + # for f in files_in_past_week(fname)) + + process_single_date(past_week[0], signal_names, + geo_resolutions, export_dir) + # process_windowed_average(past_week, signal_names, + # geo_resolutions, export_dir) diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index 0f6fab3fe..b08ed2692 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -12,13 +12,12 @@ ) from delphi_safegraph.run import SIGNALS from delphi_utils import read_params -signal_names = SIGNALS class TestProcess: def test_construct_signals_present(self): cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) + SIGNALS) assert 'completely_home_prop' in set(cbg_df.columns) assert 'full_time_work_prop' in set(cbg_df.columns) assert 'part_time_work_prop' in set(cbg_df.columns) @@ -26,31 +25,32 @@ def test_construct_signals_present(self): def test_construct_signals_proportions(self): cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) + SIGNALS) assert np.all(cbg_df['completely_home_prop'].values <= 1) assert np.all(cbg_df['full_time_work_prop'].values <= 1) assert np.all(cbg_df['part_time_work_prop'].values <= 1) def test_aggregate_county(self): cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) - df = aggregate(cbg_df, signal_names, 'county') + SIGNALS) + df = aggregate(cbg_df, SIGNALS, 'county') - assert np.all(df[f'{signal_names[0]}_n'].values > 0) - x = df[f'{signal_names[0]}_se'].values + assert np.all(df[f'{SIGNALS[0]}_n'].values > 0) + x = df[f'{SIGNALS[0]}_se'].values assert np.all(x[~np.isnan(x)] >= 0) def test_aggregate_state(self): cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), - signal_names) - df = aggregate(cbg_df, signal_names, 'state') + SIGNALS) + df = aggregate(cbg_df, SIGNALS, 'state') - assert np.all(df[f'{signal_names[0]}_n'].values > 0) - x = df[f'{signal_names[0]}_se'].values + assert np.all(df[f'{SIGNALS[0]}_n'].values > 0) + x = df[f'{SIGNALS[0]}_se'].values assert np.all(x[~np.isnan(x)] >= 0) def test_handle_wip_signal(self): # Test wip_signal = True + signal_names = SIGNALS signal_names = add_prefix(SIGNALS, True, prefix="wip_") assert all(s.startswith("wip_") for s in signal_names) # Test wip_signal = list From 45307ad8db588261312654ea6c28de851bccbea7 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 12 Oct 2020 12:35:41 -0400 Subject: [PATCH 02/18] tests for finding the file names in the past week --- safegraph/delphi_safegraph/process.py | 8 ++++---- safegraph/tests/test_process.py | 14 +++++++++++--- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 10e86157b..d3cebce6a 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -49,11 +49,11 @@ def files_in_past_week(current_filename) -> List[str]: one_day = datetime.timedelta(days=1) for _ in range(1, 7): current_date = current_date - one_day - y, m, d = (current_date.year, current_date.month, current_date.day) - new_filename = f'{path}/{y}/{m}/{d}/{current_date.isoformat()}-'\ + date_str = current_date.isoformat() + date_path = date_str.replace('-', '/') + new_filename = f'{path}/{date_path}/{date_str}-'\ 'social-distancing.csv.gz' - if os.path.exists(new_filename): - yield new_filename + yield new_filename def add_prefix(signal_names, wip_signal, prefix: str): diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index b08ed2692..cd206a16f 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -8,7 +8,8 @@ from delphi_safegraph.process import ( construct_signals, aggregate, - add_prefix + add_prefix, + files_in_past_week ) from delphi_safegraph.run import SIGNALS from delphi_utils import read_params @@ -62,5 +63,12 @@ def test_handle_wip_signal(self): assert signal_names[0].startswith("wip_") assert all(not s.startswith("wip_") for s in signal_names[1:]) - - + def test_files_in_past_week(self): + assert tuple(files_in_past_week( + "data_dir/2020/07/04/2020-07-04-social-distancing.csv.gz")) ==\ + ("data_dir/2020/07/03/2020-07-03-social-distancing.csv.gz", + "data_dir/2020/07/02/2020-07-02-social-distancing.csv.gz", + "data_dir/2020/07/01/2020-07-01-social-distancing.csv.gz", + "data_dir/2020/06/30/2020-06-30-social-distancing.csv.gz", + "data_dir/2020/06/29/2020-06-29-social-distancing.csv.gz", + "data_dir/2020/06/28/2020-06-28-social-distancing.csv.gz") From efdf3fd86a679238f16744bfe878088e8125c663 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Tue, 13 Oct 2020 09:47:37 -0400 Subject: [PATCH 03/18] testing process_window --- safegraph/delphi_safegraph/process.py | 81 ++++++++++----------------- safegraph/delphi_safegraph/run.py | 17 +++--- safegraph/tests/test_process.py | 60 +++++++++++++++----- 3 files changed, 84 insertions(+), 74 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index d3cebce6a..3340db02b 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -1,5 +1,5 @@ +"""Internal functions for creating Safegraph indicator.""" import datetime -import os from typing import List import numpy as np import pandas as pd @@ -12,27 +12,17 @@ MOD = 10000000 -def date_from_fname(fname) -> datetime.date: - _, year, month, day, __ = fname.rsplit('/', 4) - return datetime.date(int(year), int(month), int(day)) +def validate(df): + """Confirms that a data frame has only one date.""" + timestamps = df['date_range_start'].apply(date_from_timestamp) + assert len(timestamps.unique()) == 1 def date_from_timestamp(timestamp) -> datetime.date: + """Extracts the date from a timestamp beginning with {YYYY}-{MM}-{DD}T.""" return datetime.date.fromisoformat(timestamp.split('T')[0]) -def read_and_validate_file(fname) -> pd.DataFrame: - df = pd.read_csv(fname) - unique_date = df['timestamp'].unique() - if len(unique_date) != 1: - raise ValueError( - f'More than one timestamp found in input file {fname}.') - # assert date_from_timestamp(unique_date[0]) == date_from_fname(fname),\ - # f'The name of input file {fname} does not correspond to the date'\ - # 'contained inside.' - return df - - def files_in_past_week(current_filename) -> List[str]: """Constructs file paths from previous 6 days. Parameters @@ -201,43 +191,30 @@ def aggregate(df, signal_names, geo_resolution='county'): return agg_df.reset_index() -def process_single_date(df, signal_names, geo_resolutions, export_dir): - """Process an input census block group-level CSV and export it. Assumes - that the input file has _only_ one date of data. +def process_window(df_list: List[pd.DataFrame], + signal_names: List[str], + geo_resolutions: List[str], + export_dir: str): + """Processes a list of input census block group-level CSVs as a single + data set and exports it. Assumes each data file has _only_ one date + of data. Parameters ---------- - export_dir - path where the output files are saved - signal_names : List[str] - signal names to be processed cbg_df: pd.DataFrame - census block group-level CSV. + list of census block group-level CSVs. + signal_names: List[str] + signal names to be processed geo_resolutions: List[str] List of geo resolutions to export the data. + export_dir + path where the output files are saved Returns ------- None """ - date = date_from_timestamp(df.at[0, 'timestamp']) - cbg_df = construct_signals(df, signal_names) - for geo_res in geo_resolutions: - aggregated_df = aggregate(cbg_df, signal_names, geo_res) - for signal in signal_names: - df_export = aggregated_df[ - ['geo_id'] - + [f'{signal}_{x}' for x in ('mean', 'se', 'n')] - ].rename({ - f'{signal}_mean': 'val', - f'{signal}_se': 'se', - f'{signal}_n': 'sample_size', - }, axis=1) - df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv', - na_rep='NA', - index=False, ) - - -def process_windowed_average(df_list, signal_names, geo_resolutions, export_dir): - date = date_from_timestamp(df_list[0].at[0, 'timestamp']) + for df in df_list: + validate(df) + date = date_from_timestamp(df_list[0].at[0, 'date_range_start']) cbg_df = pd.concat(construct_signals(df, signal_names) for df in df_list) for geo_res in geo_resolutions: aggregated_df = aggregate(cbg_df, signal_names, geo_res) @@ -255,12 +232,12 @@ def process_windowed_average(df_list, signal_names, geo_resolutions, export_dir) index=False, ) -def process(fname, signal_names, geo_resolutions, export_dir): - past_week = [pd.read_csv(fname)] - # past_week.extend(pd.read_csv(f) - # for f in files_in_past_week(fname)) +def process(current_filename, previous_filenames, signal_names, + geo_resolutions, export_dir): + past_week = [pd.read_csv(current_filename)] + past_week.extend(pd.read_csv(f) for f in previous_filenames) - process_single_date(past_week[0], signal_names, - geo_resolutions, export_dir) - # process_windowed_average(past_week, signal_names, - # geo_resolutions, export_dir) + # First process the current file alone... + process_window(past_week[:1], signal_names, geo_resolutions, export_dir) + # ...then as part of the whole window. + process_window(past_week, signal_names, geo_resolutions, export_dir) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index a1a59c378..07277bc22 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -5,12 +5,12 @@ import glob import multiprocessing as mp import subprocess -from functools import partial from delphi_utils import read_params from .constants import SIGNALS, GEO_RESOLUTIONS -from .process import process, add_prefix +from .process import process, add_prefix, files_in_past_week + def run_module(): @@ -24,11 +24,14 @@ def run_module(): aws_endpoint = params["aws_endpoint"] wip_signal = params["wip_signal"] - process_file = partial(process, - signal_names=add_prefix(SIGNALS, wip_signal, prefix='wip_'), - geo_resolutions=GEO_RESOLUTIONS, - export_dir=export_dir, - ) + def process_file(current_filename): + return process(current_filename, + files_in_past_week(current_filename), + signal_names=add_prefix( + SIGNALS, wip_signal, prefix='wip_'), + geo_resolutions=GEO_RESOLUTIONS, + export_dir=export_dir, + ) # Update raw data # Why call subprocess rather than using a native Python client, e.g. boto3? diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index cd206a16f..a981b86de 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -1,15 +1,15 @@ -import pytest - -from os import listdir -from os.path import join - +"""Tests for Safegraph process functions.""" +import os import numpy as np import pandas as pd +import pytest + from delphi_safegraph.process import ( - construct_signals, - aggregate, add_prefix, - files_in_past_week + aggregate, + construct_signals, + files_in_past_week, + process_window ) from delphi_safegraph.run import SIGNALS from delphi_utils import read_params @@ -65,10 +65,40 @@ def test_handle_wip_signal(self): def test_files_in_past_week(self): assert tuple(files_in_past_week( - "data_dir/2020/07/04/2020-07-04-social-distancing.csv.gz")) ==\ - ("data_dir/2020/07/03/2020-07-03-social-distancing.csv.gz", - "data_dir/2020/07/02/2020-07-02-social-distancing.csv.gz", - "data_dir/2020/07/01/2020-07-01-social-distancing.csv.gz", - "data_dir/2020/06/30/2020-06-30-social-distancing.csv.gz", - "data_dir/2020/06/29/2020-06-29-social-distancing.csv.gz", - "data_dir/2020/06/28/2020-06-28-social-distancing.csv.gz") + 'x/y/z/2020/07/04/2020-07-04-social-distancing.csv.gz')) ==\ + ('x/y/z/2020/07/03/2020-07-03-social-distancing.csv.gz', + 'x/y/z/2020/07/02/2020-07-02-social-distancing.csv.gz', + 'x/y/z/2020/07/01/2020-07-01-social-distancing.csv.gz', + 'x/y/z/2020/06/30/2020-06-30-social-distancing.csv.gz', + 'x/y/z/2020/06/29/2020-06-29-social-distancing.csv.gz', + 'x/y/z/2020/06/28/2020-06-28-social-distancing.csv.gz') + + def test_process_window(self, tmp_path): + export_dir = tmp_path / 'export' + export_dir.mkdir() + df1 = pd.DataFrame(data={ + 'date_range_start': ['2020-06-12T00:00:00-05:00:00']*3, + 'origin_census_block_group': [10539707003, 10539707003, 10730144081], + 'device_count': [10, 20, 100], + 'completely_home_device_count': [20, 120, 400] + }) + df2 = pd.DataFrame(data={ + 'date_range_start': ['2020-06-11T00:00:00-05:00:00'], + 'origin_census_block_group': [10730144081], + 'device_count': [200], + 'completely_home_device_count': [4800] + }) + process_window([df1, df2], ['completely_home_prop'], ['county'], + export_dir) + expected = pd.DataFrame(data={ + 'geo_id': [1053, 1073], + 'val': [4.0, 14.0], + 'se': [2.0, 10.0], + 'sample_size': [2, 2] + }) + actual = pd.read_csv( + export_dir / '2020-06-12_county_completely_home_prop.csv') + print(expected) + print(actual) + assert set(expected.columns) == set(actual.columns) + assert expected.equals(actual) From 7ed90e145f1d60ab2ad5bca5b46ccea19f6d1a86 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Tue, 13 Oct 2020 10:18:18 -0400 Subject: [PATCH 04/18] comments and formatting for pylint compliance --- safegraph/delphi_safegraph/constants.py | 2 +- safegraph/delphi_safegraph/geo.py | 2 +- safegraph/delphi_safegraph/process.py | 40 ++++++++++++++++++++----- safegraph/delphi_safegraph/run.py | 3 +- safegraph/tests/test_process.py | 17 ++++++++--- 5 files changed, 50 insertions(+), 14 deletions(-) diff --git a/safegraph/delphi_safegraph/constants.py b/safegraph/delphi_safegraph/constants.py index 334ca1df1..c2fe606cb 100644 --- a/safegraph/delphi_safegraph/constants.py +++ b/safegraph/delphi_safegraph/constants.py @@ -1,4 +1,4 @@ - +"""Constants for constructing Safegraph indicator.""" HOME_DWELL = 'median_home_dwell_time' COMPLETELY_HOME = 'completely_home_prop' diff --git a/safegraph/delphi_safegraph/geo.py b/safegraph/delphi_safegraph/geo.py index 67969630c..a60a2d664 100644 --- a/safegraph/delphi_safegraph/geo.py +++ b/safegraph/delphi_safegraph/geo.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +"""Geo location constants for constructing Safegraph indicator.""" # https://code.activestate.com/recipes/577775-state-fips-codes-dict/ STATE_TO_FIPS = { @@ -61,4 +62,3 @@ } FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()} - diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 3340db02b..3e83b8b25 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -10,6 +10,8 @@ # Magic number for modular arithmetic; CBG -> FIPS MOD = 10000000 +# Geo resolutions allowed for aggregation. +VALID_GEO_RESOLUTIONS = ('county', 'state') def validate(df): @@ -149,7 +151,7 @@ def construct_signals(cbg_df, signal_names): def aggregate(df, signal_names, geo_resolution='county'): - '''Aggregate signals to appropriate resolution and produce standard errors. + """Aggregate signals to appropriate resolution and produce standard errors. Parameters ---------- df: pd.DataFrame @@ -164,16 +166,16 @@ def aggregate(df, signal_names, geo_resolution='county'): pd.DataFrame: DataFrame with one row per geo_id, with columns for the individual signals, standard errors, and sample sizes. - ''' + """ # Prepare geo resolution - GEO_RESOLUTION = ('county', 'state') if geo_resolution == 'county': df['geo_id'] = df['county_fips'] elif geo_resolution == 'state': df['geo_id'] = df['county_fips'].apply(lambda x: FIPS_TO_STATE[x[:2]]) else: - raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.') + raise ValueError( + f'`geo_resolution` must be one of {VALID_GEO_RESOLUTIONS}.') # Aggregation and signal creation grouped_df = df.groupby(['geo_id'])[signal_names] @@ -210,7 +212,8 @@ def process_window(df_list: List[pd.DataFrame], path where the output files are saved Returns ------- - None + None. One file is written per (signal, resolution) pair containing the + aggregated data from `df`. """ for df in df_list: validate(df) @@ -232,8 +235,31 @@ def process_window(df_list: List[pd.DataFrame], index=False, ) -def process(current_filename, previous_filenames, signal_names, - geo_resolutions, export_dir): +def process(current_filename: str, + previous_filenames: List[str], + signal_names: List[str], + geo_resolutions: List[str], + export_dir: str): + """Creates and exports signals corresponding both to a single day as well + as averaged over the previous week. + Parameters + ---------- + current_filename: str + path to file holding the target date's data. + previous_filenames: List[str] + paths to files holding data from each day in the week preceding the + target date. + signal_names: List[str] + signal names to be processed + geo_resolutions: List[str] + List of geo resolutions to export the data. + export_dir + path where the output files are saved. + Returns + ------- + None. Two files are written per (signal, resolution) pair, one for the + single date values and one for the data averaged over the previous week. + """ past_week = [pd.read_csv(current_filename)] past_week.extend(pd.read_csv(f) for f in previous_filenames) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index 07277bc22..1d4ec829b 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -13,7 +13,7 @@ def run_module(): - + """Creates the Safegraph indicator.""" params = read_params() export_dir = params["export_dir"] raw_data_dir = params["raw_data_dir"] @@ -46,6 +46,7 @@ def process_file(current_filename): 'AWS_DEFAULT_REGION': aws_default_region, }, shell=True, + check=True, ) files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz', diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index a981b86de..e2fd10af0 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -1,8 +1,6 @@ """Tests for Safegraph process functions.""" -import os import numpy as np import pandas as pd -import pytest from delphi_safegraph.process import ( add_prefix, @@ -12,11 +10,11 @@ process_window ) from delphi_safegraph.run import SIGNALS -from delphi_utils import read_params class TestProcess: def test_construct_signals_present(self): + """Tests that all signals are constructed.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), SIGNALS) assert 'completely_home_prop' in set(cbg_df.columns) @@ -25,6 +23,7 @@ def test_construct_signals_present(self): assert 'median_home_dwell_time' in set(cbg_df.columns) def test_construct_signals_proportions(self): + """Tests that constructed signals are actual proportions.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), SIGNALS) assert np.all(cbg_df['completely_home_prop'].values <= 1) @@ -32,6 +31,8 @@ def test_construct_signals_proportions(self): assert np.all(cbg_df['part_time_work_prop'].values <= 1) def test_aggregate_county(self): + """Tests that aggregation at the county level creates non-zero-valued + signals.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), SIGNALS) df = aggregate(cbg_df, SIGNALS, 'county') @@ -41,6 +42,8 @@ def test_aggregate_county(self): assert np.all(x[~np.isnan(x)] >= 0) def test_aggregate_state(self): + """Tests that aggregation at the state level creates non-zero-valued + signals.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), SIGNALS) df = aggregate(cbg_df, SIGNALS, 'state') @@ -50,6 +53,7 @@ def test_aggregate_state(self): assert np.all(x[~np.isnan(x)] >= 0) def test_handle_wip_signal(self): + """Tests that `add_prefix()` derives work-in-progress signals.""" # Test wip_signal = True signal_names = SIGNALS signal_names = add_prefix(SIGNALS, True, prefix="wip_") @@ -64,6 +68,8 @@ def test_handle_wip_signal(self): assert all(not s.startswith("wip_") for s in signal_names[1:]) def test_files_in_past_week(self): + """Tests that `files_in_past_week()` finds the file names corresponding + to the previous 6 days.""" assert tuple(files_in_past_week( 'x/y/z/2020/07/04/2020-07-04-social-distancing.csv.gz')) ==\ ('x/y/z/2020/07/03/2020-07-03-social-distancing.csv.gz', @@ -74,11 +80,14 @@ def test_files_in_past_week(self): 'x/y/z/2020/06/28/2020-06-28-social-distancing.csv.gz') def test_process_window(self, tmp_path): + """Tests that processing over a window correctly aggregates signals.""" export_dir = tmp_path / 'export' export_dir.mkdir() df1 = pd.DataFrame(data={ 'date_range_start': ['2020-06-12T00:00:00-05:00:00']*3, - 'origin_census_block_group': [10539707003, 10539707003, 10730144081], + 'origin_census_block_group': [10539707003, + 10539707003, + 10730144081], 'device_count': [10, 20, 100], 'completely_home_device_count': [20, 120, 400] }) From d0151e84d0087398bb833fc2fe1d2f826012137c Mon Sep 17 00:00:00 2001 From: sgsmob Date: Tue, 13 Oct 2020 11:09:33 -0400 Subject: [PATCH 05/18] docstring updates Co-authored-by: krivard --- safegraph/delphi_safegraph/process.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 3e83b8b25..ba79ad673 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -197,8 +197,8 @@ def process_window(df_list: List[pd.DataFrame], signal_names: List[str], geo_resolutions: List[str], export_dir: str): - """Processes a list of input census block group-level CSVs as a single - data set and exports it. Assumes each data file has _only_ one date + """Processes a list of input census block group-level data frames as a single + data set and exports it. Assumes each data frame has _only_ one date of data. Parameters ---------- From 8918ada249ecdcb79185e4140174ba2c5996b94e Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Tue, 13 Oct 2020 11:15:18 -0400 Subject: [PATCH 06/18] lint compliance in test cases --- safegraph/tests/test_process.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index e2fd10af0..435149639 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -13,6 +13,7 @@ class TestProcess: + """Tests for processing Safegraph indicators.""" def test_construct_signals_present(self): """Tests that all signals are constructed.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), @@ -42,7 +43,7 @@ def test_aggregate_county(self): assert np.all(x[~np.isnan(x)] >= 0) def test_aggregate_state(self): - """Tests that aggregation at the state level creates non-zero-valued + """Tests that aggregation at the state level creates non-zero-valued signals.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), SIGNALS) @@ -68,7 +69,7 @@ def test_handle_wip_signal(self): assert all(not s.startswith("wip_") for s in signal_names[1:]) def test_files_in_past_week(self): - """Tests that `files_in_past_week()` finds the file names corresponding + """Tests that `files_in_past_week()` finds the file names corresponding to the previous 6 days.""" assert tuple(files_in_past_week( 'x/y/z/2020/07/04/2020-07-04-social-distancing.csv.gz')) ==\ From 6b241853b169a9599386fdb5af9f40c84b3870fe Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Tue, 13 Oct 2020 11:16:01 -0400 Subject: [PATCH 07/18] move location of VALID_GEO_RESOLUTIONS --- safegraph/delphi_safegraph/geo.py | 2 ++ safegraph/delphi_safegraph/process.py | 12 +++++------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/safegraph/delphi_safegraph/geo.py b/safegraph/delphi_safegraph/geo.py index a60a2d664..e1b2d8859 100644 --- a/safegraph/delphi_safegraph/geo.py +++ b/safegraph/delphi_safegraph/geo.py @@ -62,3 +62,5 @@ } FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()} + +VALID_GEO_RESOLUTIONS = ('county', 'state') diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index ba79ad673..6ae1497fa 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -6,12 +6,10 @@ import covidcast from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK -from .geo import FIPS_TO_STATE +from .geo import FIPS_TO_STATE, VALID_GEO_RESOLUTIONS # Magic number for modular arithmetic; CBG -> FIPS MOD = 10000000 -# Geo resolutions allowed for aggregation. -VALID_GEO_RESOLUTIONS = ('county', 'state') def validate(df): @@ -197,13 +195,13 @@ def process_window(df_list: List[pd.DataFrame], signal_names: List[str], geo_resolutions: List[str], export_dir: str): - """Processes a list of input census block group-level data frames as a single - data set and exports it. Assumes each data frame has _only_ one date - of data. + """Processes a list of input census block group-level data frames as a + single data set and exports it. Assumes each data frame has _only_ one + date of data. Parameters ---------- cbg_df: pd.DataFrame - list of census block group-level CSVs. + list of census block group-level frames. signal_names: List[str] signal names to be processed geo_resolutions: List[str] From 10d3711717682dffd8f1b179c6fc1557e96a26eb Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Tue, 13 Oct 2020 14:51:58 -0400 Subject: [PATCH 08/18] file existence checking in process --- safegraph/delphi_safegraph/process.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 6ae1497fa..a687d4996 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -1,5 +1,6 @@ """Internal functions for creating Safegraph indicator.""" import datetime +import os from typing import List import numpy as np import pandas as pd @@ -259,7 +260,9 @@ def process(current_filename: str, single date values and one for the data averaged over the previous week. """ past_week = [pd.read_csv(current_filename)] - past_week.extend(pd.read_csv(f) for f in previous_filenames) + for fname in previous_filenames: + if os.path.exists(fname): + past_week.append(pd.read_csv(fname)) # First process the current file alone... process_window(past_week[:1], signal_names, geo_resolutions, export_dir) From 8c665c635c985b38308b2179736262af6c12a712 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Tue, 13 Oct 2020 16:21:00 -0400 Subject: [PATCH 09/18] refactor CSV name --- safegraph/delphi_safegraph/process.py | 7 ++++--- safegraph/tests/test_process.py | 19 +++++++++++++++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index a687d4996..921bb1fb7 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -12,6 +12,8 @@ # Magic number for modular arithmetic; CBG -> FIPS MOD = 10000000 +# Base file name for raw data CSVs. +CSV_NAME = 'social-distancing.csv.gz' def validate(df): """Confirms that a data frame has only one date.""" @@ -30,7 +32,7 @@ def files_in_past_week(current_filename) -> List[str]: ---------- current_filename: str name of CSV file. Must be of the form - {path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-social-distancing.csv.gz + {path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-{CSV_NAME} Returns ------- List of file names corresponding to the 6 days prior to YYYY-MM-DD. @@ -42,8 +44,7 @@ def files_in_past_week(current_filename) -> List[str]: current_date = current_date - one_day date_str = current_date.isoformat() date_path = date_str.replace('-', '/') - new_filename = f'{path}/{date_path}/{date_str}-'\ - 'social-distancing.csv.gz' + new_filename = f'{path}/{date_path}/{date_str}-{CSV_NAME}' yield new_filename diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index 435149639..66378187d 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -71,6 +71,7 @@ def test_handle_wip_signal(self): def test_files_in_past_week(self): """Tests that `files_in_past_week()` finds the file names corresponding to the previous 6 days.""" + # Week that stretches over a month boundary. assert tuple(files_in_past_week( 'x/y/z/2020/07/04/2020-07-04-social-distancing.csv.gz')) ==\ ('x/y/z/2020/07/03/2020-07-03-social-distancing.csv.gz', @@ -79,6 +80,24 @@ def test_files_in_past_week(self): 'x/y/z/2020/06/30/2020-06-30-social-distancing.csv.gz', 'x/y/z/2020/06/29/2020-06-29-social-distancing.csv.gz', 'x/y/z/2020/06/28/2020-06-28-social-distancing.csv.gz') + # Week that stretches over a year boundary. + assert tuple(files_in_past_week( + 'x/y/z/2020/01/04/2020-01-04-social-distancing.csv.gz')) ==\ + ('x/y/z/2020/01/03/2020-01-03-social-distancing.csv.gz', + 'x/y/z/2020/01/02/2020-01-02-social-distancing.csv.gz', + 'x/y/z/2020/01/01/2020-01-01-social-distancing.csv.gz', + 'x/y/z/2019/12/31/2019-12-31-social-distancing.csv.gz', + 'x/y/z/2019/12/30/2019-12-30-social-distancing.csv.gz', + 'x/y/z/2019/12/29/2019-12-29-social-distancing.csv.gz') + # Week that includes a leap day. + assert tuple(files_in_past_week( + 'x/y/z/2020/03/01/2020-03-01-social-distancing.csv.gz')) ==\ + ('x/y/z/2020/02/29/2020-02-29-social-distancing.csv.gz', + 'x/y/z/2020/02/28/2020-02-28-social-distancing.csv.gz', + 'x/y/z/2020/02/27/2020-02-27-social-distancing.csv.gz', + 'x/y/z/2020/02/26/2020-02-26-social-distancing.csv.gz', + 'x/y/z/2020/02/25/2020-02-25-social-distancing.csv.gz', + 'x/y/z/2020/02/24/2020-02-24-social-distancing.csv.gz') def test_process_window(self, tmp_path): """Tests that processing over a window correctly aggregates signals.""" From e0ed614adbfb2036a40d10ea895ccad2cb564536 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Wed, 14 Oct 2020 11:23:10 -0400 Subject: [PATCH 10/18] add test for process --- safegraph/tests/raw_data/small_raw_data_0.csv | 2 + safegraph/tests/raw_data/small_raw_data_1.csv | 2 + safegraph/tests/raw_data/small_raw_data_3.csv | 3 + safegraph/tests/test_process.py | 69 ++++++++++++++++--- 4 files changed, 65 insertions(+), 11 deletions(-) create mode 100644 safegraph/tests/raw_data/small_raw_data_0.csv create mode 100644 safegraph/tests/raw_data/small_raw_data_1.csv create mode 100644 safegraph/tests/raw_data/small_raw_data_3.csv diff --git a/safegraph/tests/raw_data/small_raw_data_0.csv b/safegraph/tests/raw_data/small_raw_data_0.csv new file mode 100644 index 000000000..db325288b --- /dev/null +++ b/safegraph/tests/raw_data/small_raw_data_0.csv @@ -0,0 +1,2 @@ +origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices +10539707003,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,15,6,35,45 \ No newline at end of file diff --git a/safegraph/tests/raw_data/small_raw_data_1.csv b/safegraph/tests/raw_data/small_raw_data_1.csv new file mode 100644 index 000000000..f59a1652e --- /dev/null +++ b/safegraph/tests/raw_data/small_raw_data_1.csv @@ -0,0 +1,2 @@ +origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices +10730144081,2020-06-11T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,15,25 \ No newline at end of file diff --git a/safegraph/tests/raw_data/small_raw_data_3.csv b/safegraph/tests/raw_data/small_raw_data_3.csv new file mode 100644 index 000000000..ad3004d97 --- /dev/null +++ b/safegraph/tests/raw_data/small_raw_data_3.csv @@ -0,0 +1,3 @@ +origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices +420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,10,7,20,30 +420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,20,8,30,40 \ No newline at end of file diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index 66378187d..f092b0b30 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -7,6 +7,7 @@ aggregate, construct_signals, files_in_past_week, + process, process_window ) from delphi_safegraph.run import SIGNALS @@ -14,6 +15,7 @@ class TestProcess: """Tests for processing Safegraph indicators.""" + def test_construct_signals_present(self): """Tests that all signals are constructed.""" cbg_df = construct_signals(pd.read_csv('raw_data/sample_raw_data.csv'), @@ -104,30 +106,75 @@ def test_process_window(self, tmp_path): export_dir = tmp_path / 'export' export_dir.mkdir() df1 = pd.DataFrame(data={ - 'date_range_start': ['2020-06-12T00:00:00-05:00:00']*3, + 'date_range_start': ['2020-02-14T00:00:00-05:00:00']*3, 'origin_census_block_group': [10539707003, 10539707003, 10730144081], - 'device_count': [10, 20, 100], - 'completely_home_device_count': [20, 120, 400] + 'device_count': [100, 200, 1000], + 'completely_home_device_count': [2, 12, 40] }) df2 = pd.DataFrame(data={ - 'date_range_start': ['2020-06-11T00:00:00-05:00:00'], + 'date_range_start': ['2020-02-14T00:00:00-05:00:00'], 'origin_census_block_group': [10730144081], - 'device_count': [200], - 'completely_home_device_count': [4800] + 'device_count': [2000], + 'completely_home_device_count': [480] }) process_window([df1, df2], ['completely_home_prop'], ['county'], export_dir) expected = pd.DataFrame(data={ 'geo_id': [1053, 1073], - 'val': [4.0, 14.0], - 'se': [2.0, 10.0], + 'val': [0.04, 0.14], + 'se': [0.02, 0.10], 'sample_size': [2, 2] }) actual = pd.read_csv( - export_dir / '2020-06-12_county_completely_home_prop.csv') + export_dir / '2020-02-14_county_completely_home_prop.csv') print(expected) print(actual) - assert set(expected.columns) == set(actual.columns) - assert expected.equals(actual) + pd.testing.assert_frame_equal(expected, actual) + + def test_process(self, tmp_path): + """Tests that processing a list of current and previous file names + correctly reads and aggregates signals.""" + export_dir = tmp_path / 'export' + export_dir.mkdir() + + process('raw_data/small_raw_data_0.csv', + # File 2 does not exist. + ['raw_data/small_raw_data_1.csv', + 'raw_data/small_raw_data_2.csv', + 'raw_data/small_raw_data_3.csv', ], + SIGNALS, + ['state'], + export_dir) + + expected = { + 'median_home_dwell_time': pd.DataFrame(data={ + 'geo_id': ['al', 'pa'], + 'val': [4.5, 7.5], + 'se': [1.5, 0.5], + 'sample_size': [2, 2] + }), + 'completely_home_prop': pd.DataFrame(data={ + 'geo_id': ['al', 'pa'], + 'val': [0.1, 0.15], + 'se': [0.05, 0.05], + 'sample_size': [2, 2] + }), + 'part_time_work_prop': pd.DataFrame(data={ + 'geo_id': ['al', 'pa'], + 'val': [0.25, 0.25], + 'se': [0.1, 0.05], + 'sample_size': [2, 2] + }), + 'full_time_work_prop': pd.DataFrame(data={ + 'geo_id': ['al', 'pa'], + 'val': [0.35, 0.35], + 'se': [0.1, 0.05], + 'sample_size': [2, 2] + }) + } + actual = {signal: pd.read_csv( + export_dir / f'2020-06-12_state_{signal}.csv') for signal in SIGNALS} + for signal in SIGNALS: + pd.testing.assert_frame_equal(expected[signal], actual[signal]) From 8db87c55aa44e0538fccfb27692241ab8009492f Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Wed, 14 Oct 2020 14:01:06 -0400 Subject: [PATCH 11/18] fix line too long --- safegraph/tests/test_process.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index f092b0b30..ae2ba2ef1 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -175,6 +175,7 @@ def test_process(self, tmp_path): }) } actual = {signal: pd.read_csv( - export_dir / f'2020-06-12_state_{signal}.csv') for signal in SIGNALS} + export_dir / f'2020-06-12_state_{signal}.csv') + for signal in SIGNALS} for signal in SIGNALS: pd.testing.assert_frame_equal(expected[signal], actual[signal]) From e6502e5a4831a6e794804caf18160655f76dff0e Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Thu, 15 Oct 2020 09:39:44 -0400 Subject: [PATCH 12/18] remove extraneous prints --- safegraph/tests/test_process.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index ae2ba2ef1..97fac6369 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -129,8 +129,6 @@ def test_process_window(self, tmp_path): }) actual = pd.read_csv( export_dir / '2020-02-14_county_completely_home_prop.csv') - print(expected) - print(actual) pd.testing.assert_frame_equal(expected, actual) def test_process(self, tmp_path): From 922432b7d18384468b3a94013e32eb243f82c90c Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Thu, 15 Oct 2020 11:17:03 -0400 Subject: [PATCH 13/18] documentation on process_file wrapper --- safegraph/delphi_safegraph/run.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index 1d4ec829b..546394cac 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -25,6 +25,14 @@ def run_module(): wip_signal = params["wip_signal"] def process_file(current_filename): + """Wrapper around `process()` that only takes a single argument. + + A single argument function is necessary to use `pool.map()` below. + Because each call to `process()` has two arguments that are dependent + on the input file name (`current_filename` and `previous_filenames`), + we choose to use this wrapper rather than something like + `functools.partial()`. + """ return process(current_filename, files_in_past_week(current_filename), signal_names=add_prefix( From 90a22c467252130bcbc7ab639cb4d8ac465dde6e Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 19 Oct 2020 15:53:07 -0400 Subject: [PATCH 14/18] don't overwrite files --- safegraph/delphi_safegraph/process.py | 40 +++++++++++++++++++++------ safegraph/delphi_safegraph/run.py | 4 +-- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 921bb1fb7..2fee972d5 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -48,6 +48,11 @@ def files_in_past_week(current_filename) -> List[str]: yield new_filename +def add_suffix(signals, suffix): + """Adds `suffix` to every element of `signals`.""" + return [s + suffix for s in signals] + + def add_prefix(signal_names, wip_signal, prefix: str): """Adds prefix to signal if there is a WIP signal Parameters @@ -134,16 +139,16 @@ def construct_signals(cbg_df, signal_names): # Transformation: create signal not available in raw data for signal in signal_names: - if signal.endswith(FULL_TIME_WORK): + if signal.contains(FULL_TIME_WORK): cbg_df[signal] = (cbg_df['full_time_work_behavior_devices'] / cbg_df['device_count']) - elif signal.endswith(COMPLETELY_HOME): + elif signal.contains(COMPLETELY_HOME): cbg_df[signal] = (cbg_df['completely_home_device_count'] / cbg_df['device_count']) - elif signal.endswith(PART_TIME_WORK): + elif signal.contains(PART_TIME_WORK): cbg_df[signal] = (cbg_df['part_time_work_behavior_devices'] / cbg_df['device_count']) - elif signal.endswith(HOME_DWELL): + elif signal.contains(HOME_DWELL): cbg_df[signal] = (cbg_df['median_home_dwell_time']) # Subsetting @@ -238,6 +243,7 @@ def process_window(df_list: List[pd.DataFrame], def process(current_filename: str, previous_filenames: List[str], signal_names: List[str], + wip_signal, geo_resolutions: List[str], export_dir: str): """Creates and exports signals corresponding both to a single day as well @@ -250,15 +256,23 @@ def process(current_filename: str, paths to files holding data from each day in the week preceding the target date. signal_names: List[str] - signal names to be processed + signal names to be processed for a single date. + A second version of each such signal named {SIGNAL}_7d_avg will be + created averaging {SIGNAL} over the past 7 days. + wip_signal : List[str] or bool + a list of wip signals: [], OR + all signals in the registry: True OR + only signals that have never been published: False geo_resolutions: List[str] List of geo resolutions to export the data. export_dir path where the output files are saved. Returns ------- - None. Two files are written per (signal, resolution) pair, one for the - single date values and one for the data averaged over the previous week. + None. For each (signal, resolution) pair, one file is written for the + single date values to {export_dir}/{date}_{resolution}_{signal}.csv and + one for the data averaged over the previous week to + {export_dir}/{date}_{resolution}_{signal}_7d_avg.csv. """ past_week = [pd.read_csv(current_filename)] for fname in previous_filenames: @@ -266,6 +280,14 @@ def process(current_filename: str, past_week.append(pd.read_csv(fname)) # First process the current file alone... - process_window(past_week[:1], signal_names, geo_resolutions, export_dir) + process_window(past_week[:1], + add_prefix(signal_names, wip_signal, 'wip_'), + geo_resolutions, + export_dir) # ...then as part of the whole window. - process_window(past_week, signal_names, geo_resolutions, export_dir) + process_window(past_week, + add_prefix(add_suffix(signal_names, '_7d_avg'), + wip_signal, + 'wip_'), + geo_resolutions, + export_dir) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index 546394cac..0a713095d 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -35,8 +35,8 @@ def process_file(current_filename): """ return process(current_filename, files_in_past_week(current_filename), - signal_names=add_prefix( - SIGNALS, wip_signal, prefix='wip_'), + signal_names=SIGNALS, + wip_signal=wip_signal, geo_resolutions=GEO_RESOLUTIONS, export_dir=export_dir, ) From 5117b9d9d111004dfbcd6cabc803332fa1686e23 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 19 Oct 2020 15:56:37 -0400 Subject: [PATCH 15/18] remove unused import --- safegraph/delphi_safegraph/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/safegraph/delphi_safegraph/run.py b/safegraph/delphi_safegraph/run.py index 0a713095d..3fa4105af 100644 --- a/safegraph/delphi_safegraph/run.py +++ b/safegraph/delphi_safegraph/run.py @@ -9,7 +9,7 @@ from delphi_utils import read_params from .constants import SIGNALS, GEO_RESOLUTIONS -from .process import process, add_prefix, files_in_past_week +from .process import process, files_in_past_week def run_module(): From 509f4d74944049e7540d14c1f6df93fb9ea155dc Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 19 Oct 2020 16:23:01 -0400 Subject: [PATCH 16/18] substring testing with 'in' --- safegraph/delphi_safegraph/process.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/safegraph/delphi_safegraph/process.py b/safegraph/delphi_safegraph/process.py index 2fee972d5..f51b1faff 100644 --- a/safegraph/delphi_safegraph/process.py +++ b/safegraph/delphi_safegraph/process.py @@ -139,16 +139,16 @@ def construct_signals(cbg_df, signal_names): # Transformation: create signal not available in raw data for signal in signal_names: - if signal.contains(FULL_TIME_WORK): + if FULL_TIME_WORK in signal: cbg_df[signal] = (cbg_df['full_time_work_behavior_devices'] / cbg_df['device_count']) - elif signal.contains(COMPLETELY_HOME): + elif COMPLETELY_HOME in signal: cbg_df[signal] = (cbg_df['completely_home_device_count'] / cbg_df['device_count']) - elif signal.contains(PART_TIME_WORK): + elif PART_TIME_WORK in signal: cbg_df[signal] = (cbg_df['part_time_work_behavior_devices'] / cbg_df['device_count']) - elif signal.contains(HOME_DWELL): + elif HOME_DWELL in signal: cbg_df[signal] = (cbg_df['median_home_dwell_time']) # Subsetting From 9d6394adbb8f5f473c781b743d04a055d4b15c28 Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 19 Oct 2020 16:23:36 -0400 Subject: [PATCH 17/18] update tests to process to include wip and 7d_avg signals --- safegraph/tests/raw_data/small_raw_data_0.csv | 4 +- safegraph/tests/test_process.py | 64 +++++++++++++------ 2 files changed, 48 insertions(+), 20 deletions(-) diff --git a/safegraph/tests/raw_data/small_raw_data_0.csv b/safegraph/tests/raw_data/small_raw_data_0.csv index db325288b..deae4aafc 100644 --- a/safegraph/tests/raw_data/small_raw_data_0.csv +++ b/safegraph/tests/raw_data/small_raw_data_0.csv @@ -1,2 +1,4 @@ origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices -10539707003,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,15,6,35,45 \ No newline at end of file +10539707003,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,15,6,35,45 +131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,5,5 +131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,6,4,6,6 diff --git a/safegraph/tests/test_process.py b/safegraph/tests/test_process.py index 97fac6369..e6d6ddc0e 100644 --- a/safegraph/tests/test_process.py +++ b/safegraph/tests/test_process.py @@ -143,37 +143,63 @@ def test_process(self, tmp_path): 'raw_data/small_raw_data_2.csv', 'raw_data/small_raw_data_3.csv', ], SIGNALS, + ['median_home_dwell_time', + 'completely_home_prop_7d_avg'], ['state'], export_dir) expected = { - 'median_home_dwell_time': pd.DataFrame(data={ - 'geo_id': ['al', 'pa'], - 'val': [4.5, 7.5], - 'se': [1.5, 0.5], - 'sample_size': [2, 2] + 'wip_median_home_dwell_time': pd.DataFrame(data={ + 'geo_id': ['al', 'ga'], + 'val': [6, 3.5], + 'se': [None, 0.5], + 'sample_size': [1, 2] }), 'completely_home_prop': pd.DataFrame(data={ - 'geo_id': ['al', 'pa'], - 'val': [0.1, 0.15], - 'se': [0.05, 0.05], - 'sample_size': [2, 2] + 'geo_id': ['al', 'ga'], + 'val': [0.15, 0.055], + 'se': [None, 0.005], + 'sample_size': [1, 2] }), 'part_time_work_prop': pd.DataFrame(data={ - 'geo_id': ['al', 'pa'], - 'val': [0.25, 0.25], - 'se': [0.1, 0.05], - 'sample_size': [2, 2] + 'geo_id': ['al', 'ga'], + 'val': [0.35, 0.055], + 'se': [None, 0.005], + 'sample_size': [1, 2] }), 'full_time_work_prop': pd.DataFrame(data={ - 'geo_id': ['al', 'pa'], - 'val': [0.35, 0.35], - 'se': [0.1, 0.05], - 'sample_size': [2, 2] + 'geo_id': ['al', 'ga'], + 'val': [0.45, 0.055], + 'se': [None, 0.005], + 'sample_size': [1, 2] + }), + 'median_home_dwell_time_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [4.5, 3.5, 7.5], + 'se': [1.5, 0.5, 0.5], + 'sample_size': [2, 2, 2] + }), + 'wip_completely_home_prop_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [0.1, 0.055, 0.15], + 'se': [0.05, 0.005, 0.05], + 'sample_size': [2, 2, 2] + }), + 'part_time_work_prop_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [0.25, 0.055, 0.25], + 'se': [0.1, 0.005, 0.05], + 'sample_size': [2, 2, 2] + }), + 'full_time_work_prop_7d_avg': pd.DataFrame(data={ + 'geo_id': ['al', 'ga', 'pa'], + 'val': [0.35, 0.055, 0.35], + 'se': [0.1, 0.005, 0.05], + 'sample_size': [2, 2, 2] }) } actual = {signal: pd.read_csv( export_dir / f'2020-06-12_state_{signal}.csv') - for signal in SIGNALS} - for signal in SIGNALS: + for signal in expected} + for signal in expected: pd.testing.assert_frame_equal(expected[signal], actual[signal]) From 470e95dfad06d50de0db4f627b1209c02efade2d Mon Sep 17 00:00:00 2001 From: Mike O'Brien Date: Mon, 19 Oct 2020 16:27:46 -0400 Subject: [PATCH 18/18] added wip signals to params file --- safegraph/tests/params.json.template | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/safegraph/tests/params.json.template b/safegraph/tests/params.json.template index 7e6096821..6839ac4e6 100644 --- a/safegraph/tests/params.json.template +++ b/safegraph/tests/params.json.template @@ -8,5 +8,8 @@ "aws_secret_access_key": "", "aws_default_region": "", "aws_endpoint": "", - "wip_signal" : "" + "wip_signal" : ["median_home_dwell_time_7d_avg", + "completely_home_prop_7d_avg", + "part_time_work_prop_7d_avg", + "full_time_work_prop_7d_avg"] }