Skip to content

Propagate safegraph 7-day average signals into main #331

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4a58a16
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 8, 2020
c687428
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 9, 2020
ed4c306
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 12, 2020
6a616c4
refactor safegraph.process to pave the way for multifile processing
sgsmob Oct 12, 2020
45307ad
tests for finding the file names in the past week
sgsmob Oct 12, 2020
efdf3fd
testing process_window
sgsmob Oct 13, 2020
7ed90e1
comments and formatting for pylint compliance
sgsmob Oct 13, 2020
d0151e8
docstring updates
sgsmob Oct 13, 2020
8918ada
lint compliance in test cases
sgsmob Oct 13, 2020
6b24185
move location of VALID_GEO_RESOLUTIONS
sgsmob Oct 13, 2020
10d3711
file existence checking in process
sgsmob Oct 13, 2020
8604ec1
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 13, 2020
8c665c6
refactor CSV name
sgsmob Oct 13, 2020
e0ed614
add test for process
sgsmob Oct 14, 2020
8db87c5
fix line too long
sgsmob Oct 14, 2020
e6502e5
remove extraneous prints
sgsmob Oct 15, 2020
922432b
documentation on process_file wrapper
sgsmob Oct 15, 2020
7c8e702
Merge branch 'main' of github.com:cmu-delphi/covidcast-indicators int…
sgsmob Oct 19, 2020
90a22c4
don't overwrite files
sgsmob Oct 19, 2020
5117b9d
remove unused import
sgsmob Oct 19, 2020
509f4d7
substring testing with 'in'
sgsmob Oct 19, 2020
9d6394a
update tests to process to include wip and 7d_avg signals
sgsmob Oct 19, 2020
470e95d
added wip signals to params file
sgsmob Oct 19, 2020
ee49484
Merge pull request #309 from sgsmob/weekday
krivard Oct 20, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion safegraph/delphi_safegraph/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@

"""Constants for constructing Safegraph indicator."""

HOME_DWELL = 'median_home_dwell_time'
COMPLETELY_HOME = 'completely_home_prop'
Expand Down
2 changes: 2 additions & 0 deletions safegraph/delphi_safegraph/geo.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
"""Geo location constants for constructing Safegraph indicator."""

# https://code.activestate.com/recipes/577775-state-fips-codes-dict/
STATE_TO_FIPS = {
Expand Down Expand Up @@ -62,3 +63,4 @@

FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()}

VALID_GEO_RESOLUTIONS = ('county', 'state')
178 changes: 136 additions & 42 deletions safegraph/delphi_safegraph/process.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,58 @@
import covidcast
"""Internal functions for creating Safegraph indicator."""
import datetime
import os
from typing import List
import numpy as np
import pandas as pd
import covidcast

from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK
from .geo import FIPS_TO_STATE
from .geo import FIPS_TO_STATE, VALID_GEO_RESOLUTIONS

# Magic number for modular arithmetic; CBG -> FIPS
MOD = 10000000

# Base file name for raw data CSVs.
CSV_NAME = 'social-distancing.csv.gz'

def validate(df):
"""Confirms that a data frame has only one date."""
timestamps = df['date_range_start'].apply(date_from_timestamp)
assert len(timestamps.unique()) == 1


def date_from_timestamp(timestamp) -> datetime.date:
"""Extracts the date from a timestamp beginning with {YYYY}-{MM}-{DD}T."""
return datetime.date.fromisoformat(timestamp.split('T')[0])


def files_in_past_week(current_filename) -> List[str]:
"""Constructs file paths from previous 6 days.
Parameters
----------
current_filename: str
name of CSV file. Must be of the form
{path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-{CSV_NAME}
Returns
-------
List of file names corresponding to the 6 days prior to YYYY-MM-DD.
"""
path, year, month, day, _ = current_filename.rsplit('/', 4)
current_date = datetime.date(int(year), int(month), int(day))
one_day = datetime.timedelta(days=1)
for _ in range(1, 7):
current_date = current_date - one_day
date_str = current_date.isoformat()
date_path = date_str.replace('-', '/')
new_filename = f'{path}/{date_path}/{date_str}-{CSV_NAME}'
yield new_filename


def add_suffix(signals, suffix):
"""Adds `suffix` to every element of `signals`."""
return [s + suffix for s in signals]


def add_prefix(signal_names, wip_signal, prefix: str):
"""Adds prefix to signal if there is a WIP signal
Parameters
Expand Down Expand Up @@ -42,7 +87,7 @@ def add_prefix(signal_names, wip_signal, prefix: str):
]
raise ValueError("Supply True | False or '' or [] | list()")

# Check if the signal name is public

def public_signal(signal_):
"""Checks if the signal name is already public using COVIDcast
Parameters
Expand Down Expand Up @@ -89,32 +134,29 @@ def construct_signals(cbg_df, signal_names):
"""

# Preparation
cbg_df['timestamp'] = cbg_df['date_range_start'].apply(
lambda x: str(x).split('T')[0])
cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply(
lambda x: f'{int(x):05d}')

# Transformation: create signal not available in raw data
for signal in signal_names:
if signal.endswith(FULL_TIME_WORK):
if FULL_TIME_WORK in signal:
cbg_df[signal] = (cbg_df['full_time_work_behavior_devices']
/ cbg_df['device_count'])
elif signal.endswith(COMPLETELY_HOME):
elif COMPLETELY_HOME in signal:
cbg_df[signal] = (cbg_df['completely_home_device_count']
/ cbg_df['device_count'])
elif signal.endswith(PART_TIME_WORK):
elif PART_TIME_WORK in signal:
cbg_df[signal] = (cbg_df['part_time_work_behavior_devices']
/ cbg_df['device_count'])
elif signal.endswith(HOME_DWELL):
elif HOME_DWELL in signal:
cbg_df[signal] = (cbg_df['median_home_dwell_time'])


# Subsetting
return cbg_df[['timestamp', 'county_fips'] + signal_names]
return cbg_df[['county_fips'] + signal_names]


def aggregate(df, signal_names, geo_resolution='county'):
'''Aggregate signals to appropriate resolution and produce standard errors.
"""Aggregate signals to appropriate resolution and produce standard errors.
Parameters
----------
df: pd.DataFrame
Expand All @@ -129,27 +171,22 @@ def aggregate(df, signal_names, geo_resolution='county'):
pd.DataFrame:
DataFrame with one row per geo_id, with columns for the individual
signals, standard errors, and sample sizes.
'''
"""
# Prepare geo resolution
GEO_RESOLUTION = ('county', 'state')
if geo_resolution == 'county':
df['geo_id'] = df['county_fips']
elif geo_resolution == 'state':
df['geo_id'] = df['county_fips'].apply(lambda x:
FIPS_TO_STATE[x[:2]])
else:
raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.')
raise ValueError(
f'`geo_resolution` must be one of {VALID_GEO_RESOLUTIONS}.')

# Aggregation and signal creation
df_mean = df.groupby(['geo_id', 'timestamp'])[
signal_names
].mean()
df_sd = df.groupby(['geo_id', 'timestamp'])[
signal_names
].std()
df_n = df.groupby(['geo_id', 'timestamp'])[
signal_names
].count()
grouped_df = df.groupby(['geo_id'])[signal_names]
df_mean = grouped_df.mean()
df_sd = grouped_df.std()
df_n = grouped_df.count()
agg_df = pd.DataFrame.join(df_mean, df_sd,
lsuffix='_mean', rsuffix='_sd')
agg_df = pd.DataFrame.join(agg_df, df_n.rename({
Expand All @@ -161,39 +198,96 @@ def aggregate(df, signal_names, geo_resolution='county'):
return agg_df.reset_index()


def process(fname, signal_names, geo_resolutions, export_dir):
'''Process an input census block group-level CSV and export it. Assumes
that the input file has _only_ one date of data.
def process_window(df_list: List[pd.DataFrame],
signal_names: List[str],
geo_resolutions: List[str],
export_dir: str):
"""Processes a list of input census block group-level data frames as a
single data set and exports it. Assumes each data frame has _only_ one
date of data.
Parameters
----------
export_dir
path where the output files are saved
signal_names : List[str]
cbg_df: pd.DataFrame
list of census block group-level frames.
signal_names: List[str]
signal names to be processed
fname: str
Input filename.
geo_resolutions: List[str]
List of geo resolutions to export the data.
export_dir
path where the output files are saved
Returns
-------
None
'''
cbg_df = construct_signals(pd.read_csv(fname), signal_names)
unique_date = cbg_df['timestamp'].unique()
if len(unique_date) != 1:
raise ValueError(f'More than one timestamp found in input file {fname}.')
date = unique_date[0].replace('-', '')
None. One file is written per (signal, resolution) pair containing the
aggregated data from `df`.
"""
for df in df_list:
validate(df)
date = date_from_timestamp(df_list[0].at[0, 'date_range_start'])
cbg_df = pd.concat(construct_signals(df, signal_names) for df in df_list)
for geo_res in geo_resolutions:
df = aggregate(cbg_df, signal_names, geo_res)
aggregated_df = aggregate(cbg_df, signal_names, geo_res)
for signal in signal_names:
df_export = df[
df_export = aggregated_df[
['geo_id']
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
].rename({
].rename({
f'{signal}_mean': 'val',
f'{signal}_se': 'se',
f'{signal}_n': 'sample_size',
}, axis=1)
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
na_rep='NA',
index=False, )


def process(current_filename: str,
previous_filenames: List[str],
signal_names: List[str],
wip_signal,
geo_resolutions: List[str],
export_dir: str):
"""Creates and exports signals corresponding both to a single day as well
as averaged over the previous week.
Parameters
----------
current_filename: str
path to file holding the target date's data.
previous_filenames: List[str]
paths to files holding data from each day in the week preceding the
target date.
signal_names: List[str]
signal names to be processed for a single date.
A second version of each such signal named {SIGNAL}_7d_avg will be
created averaging {SIGNAL} over the past 7 days.
wip_signal : List[str] or bool
a list of wip signals: [], OR
all signals in the registry: True OR
only signals that have never been published: False
geo_resolutions: List[str]
List of geo resolutions to export the data.
export_dir
path where the output files are saved.
Returns
-------
None. For each (signal, resolution) pair, one file is written for the
single date values to {export_dir}/{date}_{resolution}_{signal}.csv and
one for the data averaged over the previous week to
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
"""
past_week = [pd.read_csv(current_filename)]
for fname in previous_filenames:
if os.path.exists(fname):
past_week.append(pd.read_csv(fname))

# First process the current file alone...
process_window(past_week[:1],
add_prefix(signal_names, wip_signal, 'wip_'),
geo_resolutions,
export_dir)
# ...then as part of the whole window.
process_window(past_week,
add_prefix(add_suffix(signal_names, '_7d_avg'),
wip_signal,
'wip_'),
geo_resolutions,
export_dir)
28 changes: 20 additions & 8 deletions safegraph/delphi_safegraph/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
import glob
import multiprocessing as mp
import subprocess
from functools import partial

from delphi_utils import read_params

from .constants import SIGNALS, GEO_RESOLUTIONS
from .process import process, add_prefix
from .process import process, files_in_past_week

def run_module():

def run_module():
"""Creates the Safegraph indicator."""
params = read_params()
export_dir = params["export_dir"]
raw_data_dir = params["raw_data_dir"]
Expand All @@ -24,11 +24,22 @@ def run_module():
aws_endpoint = params["aws_endpoint"]
wip_signal = params["wip_signal"]

process_file = partial(process,
signal_names=add_prefix(SIGNALS, wip_signal, prefix='wip_'),
geo_resolutions=GEO_RESOLUTIONS,
export_dir=export_dir,
)
def process_file(current_filename):
"""Wrapper around `process()` that only takes a single argument.

A single argument function is necessary to use `pool.map()` below.
Because each call to `process()` has two arguments that are dependent
on the input file name (`current_filename` and `previous_filenames`),
we choose to use this wrapper rather than something like
`functools.partial()`.
"""
return process(current_filename,
files_in_past_week(current_filename),
signal_names=SIGNALS,
wip_signal=wip_signal,
geo_resolutions=GEO_RESOLUTIONS,
export_dir=export_dir,
)

# Update raw data
# Why call subprocess rather than using a native Python client, e.g. boto3?
Expand All @@ -43,6 +54,7 @@ def run_module():
'AWS_DEFAULT_REGION': aws_default_region,
},
shell=True,
check=True,
)

files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',
Expand Down
5 changes: 4 additions & 1 deletion safegraph/tests/params.json.template
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,8 @@
"aws_secret_access_key": "",
"aws_default_region": "",
"aws_endpoint": "",
"wip_signal" : ""
"wip_signal" : ["median_home_dwell_time_7d_avg",
"completely_home_prop_7d_avg",
"part_time_work_prop_7d_avg",
"full_time_work_prop_7d_avg"]
}
4 changes: 4 additions & 0 deletions safegraph/tests/raw_data/small_raw_data_0.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices
10539707003,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,15,6,35,45
131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,5,5
131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,6,4,6,6
2 changes: 2 additions & 0 deletions safegraph/tests/raw_data/small_raw_data_1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices
10730144081,2020-06-11T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,15,25
3 changes: 3 additions & 0 deletions safegraph/tests/raw_data/small_raw_data_3.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices
420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,10,7,20,30
420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,20,8,30,40
Loading