Skip to content

Commit bb8e17e

Browse files
authored
Merge pull request #331 from cmu-delphi/deploy-safegraph
Propagate safegraph 7-day average signals into main
2 parents 0783ebd + ee49484 commit bb8e17e

File tree

9 files changed

+330
-71
lines changed

9 files changed

+330
-71
lines changed

safegraph/delphi_safegraph/constants.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
1+
"""Constants for constructing Safegraph indicator."""
22

33
HOME_DWELL = 'median_home_dwell_time'
44
COMPLETELY_HOME = 'completely_home_prop'

safegraph/delphi_safegraph/geo.py

+2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# -*- coding: utf-8 -*-
2+
"""Geo location constants for constructing Safegraph indicator."""
23

34
# https://code.activestate.com/recipes/577775-state-fips-codes-dict/
45
STATE_TO_FIPS = {
@@ -62,3 +63,4 @@
6263

6364
FIPS_TO_STATE = {v: k.lower() for k, v in STATE_TO_FIPS.items()}
6465

66+
VALID_GEO_RESOLUTIONS = ('county', 'state')

safegraph/delphi_safegraph/process.py

+136-42
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,58 @@
1-
import covidcast
1+
"""Internal functions for creating Safegraph indicator."""
2+
import datetime
3+
import os
4+
from typing import List
25
import numpy as np
36
import pandas as pd
7+
import covidcast
48

59
from .constants import HOME_DWELL, COMPLETELY_HOME, FULL_TIME_WORK, PART_TIME_WORK
6-
from .geo import FIPS_TO_STATE
10+
from .geo import FIPS_TO_STATE, VALID_GEO_RESOLUTIONS
711

812
# Magic number for modular arithmetic; CBG -> FIPS
913
MOD = 10000000
1014

15+
# Base file name for raw data CSVs.
16+
CSV_NAME = 'social-distancing.csv.gz'
17+
18+
def validate(df):
19+
"""Confirms that a data frame has only one date."""
20+
timestamps = df['date_range_start'].apply(date_from_timestamp)
21+
assert len(timestamps.unique()) == 1
22+
23+
24+
def date_from_timestamp(timestamp) -> datetime.date:
25+
"""Extracts the date from a timestamp beginning with {YYYY}-{MM}-{DD}T."""
26+
return datetime.date.fromisoformat(timestamp.split('T')[0])
27+
28+
29+
def files_in_past_week(current_filename) -> List[str]:
30+
"""Constructs file paths from previous 6 days.
31+
Parameters
32+
----------
33+
current_filename: str
34+
name of CSV file. Must be of the form
35+
{path}/{YYYY}/{MM}/{DD}/{YYYY}-{MM}-{DD}-{CSV_NAME}
36+
Returns
37+
-------
38+
List of file names corresponding to the 6 days prior to YYYY-MM-DD.
39+
"""
40+
path, year, month, day, _ = current_filename.rsplit('/', 4)
41+
current_date = datetime.date(int(year), int(month), int(day))
42+
one_day = datetime.timedelta(days=1)
43+
for _ in range(1, 7):
44+
current_date = current_date - one_day
45+
date_str = current_date.isoformat()
46+
date_path = date_str.replace('-', '/')
47+
new_filename = f'{path}/{date_path}/{date_str}-{CSV_NAME}'
48+
yield new_filename
49+
50+
51+
def add_suffix(signals, suffix):
52+
"""Adds `suffix` to every element of `signals`."""
53+
return [s + suffix for s in signals]
54+
55+
1156
def add_prefix(signal_names, wip_signal, prefix: str):
1257
"""Adds prefix to signal if there is a WIP signal
1358
Parameters
@@ -42,7 +87,7 @@ def add_prefix(signal_names, wip_signal, prefix: str):
4287
]
4388
raise ValueError("Supply True | False or '' or [] | list()")
4489

45-
# Check if the signal name is public
90+
4691
def public_signal(signal_):
4792
"""Checks if the signal name is already public using COVIDcast
4893
Parameters
@@ -89,32 +134,29 @@ def construct_signals(cbg_df, signal_names):
89134
"""
90135

91136
# Preparation
92-
cbg_df['timestamp'] = cbg_df['date_range_start'].apply(
93-
lambda x: str(x).split('T')[0])
94137
cbg_df['county_fips'] = (cbg_df['origin_census_block_group'] // MOD).apply(
95138
lambda x: f'{int(x):05d}')
96139

97140
# Transformation: create signal not available in raw data
98141
for signal in signal_names:
99-
if signal.endswith(FULL_TIME_WORK):
142+
if FULL_TIME_WORK in signal:
100143
cbg_df[signal] = (cbg_df['full_time_work_behavior_devices']
101144
/ cbg_df['device_count'])
102-
elif signal.endswith(COMPLETELY_HOME):
145+
elif COMPLETELY_HOME in signal:
103146
cbg_df[signal] = (cbg_df['completely_home_device_count']
104147
/ cbg_df['device_count'])
105-
elif signal.endswith(PART_TIME_WORK):
148+
elif PART_TIME_WORK in signal:
106149
cbg_df[signal] = (cbg_df['part_time_work_behavior_devices']
107150
/ cbg_df['device_count'])
108-
elif signal.endswith(HOME_DWELL):
151+
elif HOME_DWELL in signal:
109152
cbg_df[signal] = (cbg_df['median_home_dwell_time'])
110153

111-
112154
# Subsetting
113-
return cbg_df[['timestamp', 'county_fips'] + signal_names]
155+
return cbg_df[['county_fips'] + signal_names]
114156

115157

116158
def aggregate(df, signal_names, geo_resolution='county'):
117-
'''Aggregate signals to appropriate resolution and produce standard errors.
159+
"""Aggregate signals to appropriate resolution and produce standard errors.
118160
Parameters
119161
----------
120162
df: pd.DataFrame
@@ -129,27 +171,22 @@ def aggregate(df, signal_names, geo_resolution='county'):
129171
pd.DataFrame:
130172
DataFrame with one row per geo_id, with columns for the individual
131173
signals, standard errors, and sample sizes.
132-
'''
174+
"""
133175
# Prepare geo resolution
134-
GEO_RESOLUTION = ('county', 'state')
135176
if geo_resolution == 'county':
136177
df['geo_id'] = df['county_fips']
137178
elif geo_resolution == 'state':
138179
df['geo_id'] = df['county_fips'].apply(lambda x:
139180
FIPS_TO_STATE[x[:2]])
140181
else:
141-
raise ValueError(f'`geo_resolution` must be one of {GEO_RESOLUTION}.')
182+
raise ValueError(
183+
f'`geo_resolution` must be one of {VALID_GEO_RESOLUTIONS}.')
142184

143185
# Aggregation and signal creation
144-
df_mean = df.groupby(['geo_id', 'timestamp'])[
145-
signal_names
146-
].mean()
147-
df_sd = df.groupby(['geo_id', 'timestamp'])[
148-
signal_names
149-
].std()
150-
df_n = df.groupby(['geo_id', 'timestamp'])[
151-
signal_names
152-
].count()
186+
grouped_df = df.groupby(['geo_id'])[signal_names]
187+
df_mean = grouped_df.mean()
188+
df_sd = grouped_df.std()
189+
df_n = grouped_df.count()
153190
agg_df = pd.DataFrame.join(df_mean, df_sd,
154191
lsuffix='_mean', rsuffix='_sd')
155192
agg_df = pd.DataFrame.join(agg_df, df_n.rename({
@@ -161,39 +198,96 @@ def aggregate(df, signal_names, geo_resolution='county'):
161198
return agg_df.reset_index()
162199

163200

164-
def process(fname, signal_names, geo_resolutions, export_dir):
165-
'''Process an input census block group-level CSV and export it. Assumes
166-
that the input file has _only_ one date of data.
201+
def process_window(df_list: List[pd.DataFrame],
202+
signal_names: List[str],
203+
geo_resolutions: List[str],
204+
export_dir: str):
205+
"""Processes a list of input census block group-level data frames as a
206+
single data set and exports it. Assumes each data frame has _only_ one
207+
date of data.
167208
Parameters
168209
----------
169-
export_dir
170-
path where the output files are saved
171-
signal_names : List[str]
210+
cbg_df: pd.DataFrame
211+
list of census block group-level frames.
212+
signal_names: List[str]
172213
signal names to be processed
173-
fname: str
174-
Input filename.
175214
geo_resolutions: List[str]
176215
List of geo resolutions to export the data.
216+
export_dir
217+
path where the output files are saved
177218
Returns
178219
-------
179-
None
180-
'''
181-
cbg_df = construct_signals(pd.read_csv(fname), signal_names)
182-
unique_date = cbg_df['timestamp'].unique()
183-
if len(unique_date) != 1:
184-
raise ValueError(f'More than one timestamp found in input file {fname}.')
185-
date = unique_date[0].replace('-', '')
220+
None. One file is written per (signal, resolution) pair containing the
221+
aggregated data from `df`.
222+
"""
223+
for df in df_list:
224+
validate(df)
225+
date = date_from_timestamp(df_list[0].at[0, 'date_range_start'])
226+
cbg_df = pd.concat(construct_signals(df, signal_names) for df in df_list)
186227
for geo_res in geo_resolutions:
187-
df = aggregate(cbg_df, signal_names, geo_res)
228+
aggregated_df = aggregate(cbg_df, signal_names, geo_res)
188229
for signal in signal_names:
189-
df_export = df[
230+
df_export = aggregated_df[
190231
['geo_id']
191232
+ [f'{signal}_{x}' for x in ('mean', 'se', 'n')]
192-
].rename({
233+
].rename({
193234
f'{signal}_mean': 'val',
194235
f'{signal}_se': 'se',
195236
f'{signal}_n': 'sample_size',
196237
}, axis=1)
197238
df_export.to_csv(f'{export_dir}/{date}_{geo_res}_{signal}.csv',
198239
na_rep='NA',
199240
index=False, )
241+
242+
243+
def process(current_filename: str,
244+
previous_filenames: List[str],
245+
signal_names: List[str],
246+
wip_signal,
247+
geo_resolutions: List[str],
248+
export_dir: str):
249+
"""Creates and exports signals corresponding both to a single day as well
250+
as averaged over the previous week.
251+
Parameters
252+
----------
253+
current_filename: str
254+
path to file holding the target date's data.
255+
previous_filenames: List[str]
256+
paths to files holding data from each day in the week preceding the
257+
target date.
258+
signal_names: List[str]
259+
signal names to be processed for a single date.
260+
A second version of each such signal named {SIGNAL}_7d_avg will be
261+
created averaging {SIGNAL} over the past 7 days.
262+
wip_signal : List[str] or bool
263+
a list of wip signals: [], OR
264+
all signals in the registry: True OR
265+
only signals that have never been published: False
266+
geo_resolutions: List[str]
267+
List of geo resolutions to export the data.
268+
export_dir
269+
path where the output files are saved.
270+
Returns
271+
-------
272+
None. For each (signal, resolution) pair, one file is written for the
273+
single date values to {export_dir}/{date}_{resolution}_{signal}.csv and
274+
one for the data averaged over the previous week to
275+
{export_dir}/{date}_{resolution}_{signal}_7d_avg.csv.
276+
"""
277+
past_week = [pd.read_csv(current_filename)]
278+
for fname in previous_filenames:
279+
if os.path.exists(fname):
280+
past_week.append(pd.read_csv(fname))
281+
282+
# First process the current file alone...
283+
process_window(past_week[:1],
284+
add_prefix(signal_names, wip_signal, 'wip_'),
285+
geo_resolutions,
286+
export_dir)
287+
# ...then as part of the whole window.
288+
process_window(past_week,
289+
add_prefix(add_suffix(signal_names, '_7d_avg'),
290+
wip_signal,
291+
'wip_'),
292+
geo_resolutions,
293+
export_dir)

safegraph/delphi_safegraph/run.py

+20-8
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
import glob
66
import multiprocessing as mp
77
import subprocess
8-
from functools import partial
98

109
from delphi_utils import read_params
1110

1211
from .constants import SIGNALS, GEO_RESOLUTIONS
13-
from .process import process, add_prefix
12+
from .process import process, files_in_past_week
1413

15-
def run_module():
1614

15+
def run_module():
16+
"""Creates the Safegraph indicator."""
1717
params = read_params()
1818
export_dir = params["export_dir"]
1919
raw_data_dir = params["raw_data_dir"]
@@ -24,11 +24,22 @@ def run_module():
2424
aws_endpoint = params["aws_endpoint"]
2525
wip_signal = params["wip_signal"]
2626

27-
process_file = partial(process,
28-
signal_names=add_prefix(SIGNALS, wip_signal, prefix='wip_'),
29-
geo_resolutions=GEO_RESOLUTIONS,
30-
export_dir=export_dir,
31-
)
27+
def process_file(current_filename):
28+
"""Wrapper around `process()` that only takes a single argument.
29+
30+
A single argument function is necessary to use `pool.map()` below.
31+
Because each call to `process()` has two arguments that are dependent
32+
on the input file name (`current_filename` and `previous_filenames`),
33+
we choose to use this wrapper rather than something like
34+
`functools.partial()`.
35+
"""
36+
return process(current_filename,
37+
files_in_past_week(current_filename),
38+
signal_names=SIGNALS,
39+
wip_signal=wip_signal,
40+
geo_resolutions=GEO_RESOLUTIONS,
41+
export_dir=export_dir,
42+
)
3243

3344
# Update raw data
3445
# Why call subprocess rather than using a native Python client, e.g. boto3?
@@ -43,6 +54,7 @@ def run_module():
4354
'AWS_DEFAULT_REGION': aws_default_region,
4455
},
4556
shell=True,
57+
check=True,
4658
)
4759

4860
files = glob.glob(f'{raw_data_dir}/social-distancing/**/*.csv.gz',

safegraph/tests/params.json.template

+4-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,8 @@
88
"aws_secret_access_key": "",
99
"aws_default_region": "",
1010
"aws_endpoint": "",
11-
"wip_signal" : ""
11+
"wip_signal" : ["median_home_dwell_time_7d_avg",
12+
"completely_home_prop_7d_avg",
13+
"part_time_work_prop_7d_avg",
14+
"full_time_work_prop_7d_avg"]
1215
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices
2+
10539707003,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,15,6,35,45
3+
131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,5,5
4+
131510702031,2020-06-12T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,6,4,6,6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices
2+
10730144081,2020-06-11T00:00:00-05:00,2020-06-13T00:00:00-05:00,100,5,3,15,25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
origin_census_block_group,date_range_start,date_range_end,device_count,completely_home_device_count,median_home_dwell_time,part_time_work_behavior_devices,full_time_work_behavior_devices
2+
420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,10,7,20,30
3+
420430239002,2020-06-10T00:00:00-04:00,2020-06-13T00:00:00-04:00,100,20,8,30,40

0 commit comments

Comments
 (0)