Skip to content

Add NCHS mortality geo aggregation at the HHS and nation levels #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nchs_mortality/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
disable=logging-format-interpolation,
too-many-locals,
too-many-arguments,
fixme,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this for a TODO I left in the code. Let me know if this is something we'd want to keep consistent across all indicators and I can add it to others.

# Allow pytest functions to be part of a class.
no-self-use,
# Allow pytest classes to have one test.
Expand Down
6 changes: 5 additions & 1 deletion nchs_mortality/delphi_nchs_mortality/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
"prop"
]
INCIDENCE_BASE = 100000
GEO_RES = "state"
GEO_RES = [
"nation",
"hhs",
"state"
]

# this is necessary as a delimiter in the f-string expressions we use to
# construct detailed error reports
Expand Down
140 changes: 88 additions & 52 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import time
from datetime import datetime, date, timedelta
from typing import Dict, Any
from itertools import product

import numpy as np
from delphi_utils import S3ArchiveDiffer, get_structured_logger
from delphi_utils import S3ArchiveDiffer, get_structured_logger, GeoMapper

from .archive_diffs import arch_diffs
from .constants import (METRICS, SENSOR_NAME_MAP,
Expand Down Expand Up @@ -43,13 +44,14 @@ def run_module(params: Dict[str, Any]):
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
export_start_date = params["indicator"]["export_start_date"]
if export_start_date == "latest": # Find the previous Saturday
if export_start_date == "latest": # Find the previous Saturday
export_start_date = date.today() - timedelta(
days=date.today().weekday() + 2)
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')
daily_export_dir = params["common"]["daily_export_dir"]
token = params["indicator"]["token"]
test_file = params["indicator"].get("test_file", None)
gmpr = GeoMapper()

if "archive" in params:
daily_arch_diff = S3ArchiveDiffer(
Expand All @@ -60,52 +62,54 @@ def run_module(params: Dict[str, Any]):

stats = []
df_pull = pull_nchs_mortality_data(token, test_file)
for metric in METRICS:
for metric, geo, sensor, in product(METRICS, GEO_RES, SENSORS):
if metric == 'percent_of_expected_deaths':
print(metric)
df = df_pull.copy()
df["val"] = df[metric]
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric]])
dates = export_csv(
df,
geo_name=GEO_RES,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
else:
for sensor in SENSORS:
print(metric, sensor)
df = df_pull.copy()
if sensor == "num":
df["val"] = df[metric]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
dates = export_csv(
df,
geo_name=GEO_RES,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
continue
print(metric, sensor)
sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
df = _safe_copy_df(df_pull, metric)

if geo in ["hhs", "nation"]:
df = _map_from_state(df, geo, gmpr)

if sensor == "prop":
df["val"] = df["val"] / df["population"] * INCIDENCE_BASE

dates = export_csv(
df,
geo_name=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

for geo in GEO_RES:
metric = 'percent_of_expected_deaths'
print(metric)
sensor_name = "_".join([SENSOR_NAME_MAP[metric]])
df = _safe_copy_df(df_pull, metric)

if geo in ["hhs", "nation"]:
df = _map_from_state(df, geo, gmpr, weighted=True)

dates = export_csv(
df,
geo_name=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
if "archive" in params:
arch_diffs(params, daily_arch_diff)

Expand All @@ -115,7 +119,39 @@ def run_module(params: Dict[str, Any]):
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info("Completed indicator run",
elapsed_time_in_seconds = elapsed_time_in_seconds,
csv_export_count = csv_export_count,
max_lag_in_days = max_lag_in_days,
oldest_final_export_date = formatted_min_max_date)
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date)


def _safe_copy_df(df, metric_col_name):
"""Create a copy of the given df, and drop rows where the metric is nan."""
df_copy = df.copy()
df_copy["se"] = np.nan
df_copy["sample_size"] = np.nan
df_copy["val"] = df_copy[metric_col_name]
return df_copy[~df_copy["val"].isnull()]


def _map_from_state(df, geo, gmpr, weighted=False):
"""Map from state_id to another given geocode.

The weighted flag is used when aggregating metrics which come as percentages
rather than raw counts, and therefore need to be weighted by population when
combining.
"""
# TODO - this first mapping from state_id to state_code is necessary because
# the GeoMapper does not currently support going directly from state_id to hhs or
# nation. See issue #1255
df = gmpr.replace_geocode(
df, "state_id", "state_code", from_col="geo_id", date_col="timestamp")
if weighted:
df["weight"] = df["population"]
df = gmpr.replace_geocode(
df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename(
columns={geo: "geo_id"})
if weighted:
df["val"] = df["val"] / df["population"]

return df
20 changes: 16 additions & 4 deletions nchs_mortality/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,29 @@ def test_output_files_exist(self, run_as_module, date):
'deaths_pneumonia_or_flu_or_covid_incidence']
sensors = ["num", "prop"]

expected_files = []
expected_files_nation = []
expected_files_state=[]
expected_files_hhs=[]
for d in dates:
for metric in metrics:
if metric == "deaths_percent_of_expected":
expected_files += ["weekly_" + d + "_state_" \
expected_files_nation += ["weekly_" + d + "_nation_" \
+ metric + ".csv"]
expected_files_state += ["weekly_" + d + "_state_" \
+ metric + ".csv"]
expected_files_hhs += ["weekly_" + d + "_hhs_" \
+ metric + ".csv"]
else:
for sensor in sensors:
expected_files += ["weekly_" + d + "_state_" \
expected_files_nation += ["weekly_" + d + "_nation_" \
+ metric + "_" + sensor + ".csv"]
expected_files_state += ["weekly_" + d + "_state_" \
+ metric + "_" + sensor + ".csv"]
expected_files_hhs += ["weekly_" + d + "_hhs_" \
+ metric + "_" + sensor + ".csv"]
assert set(expected_files).issubset(set(csv_files))
assert set(expected_files_nation).issubset(set(csv_files))
assert set(expected_files_state).issubset(set(csv_files))
assert set(expected_files_hhs).issubset(set(csv_files))

@pytest.mark.parametrize("date", ["2020-09-14", "2020-09-18"])
def test_output_file_format(self, run_as_module, date):
Expand Down