Skip to content

Climatological backfill #138

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ dashboard:
Rscript scripts/dashboard.R

update_site:
Rscript -e "source('R/utils.R'); update_site()"
Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); update_site()"

netlify:
netlify deploy --dir=reports --prod
91 changes: 91 additions & 0 deletions R/aux_data_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -617,3 +617,94 @@ process_nhsn_data <- function(raw_nhsn_data) {
mutate(version = fixed_version) %>%
relocate(geo_value, disease, time_value, version)
}


# for filenames of the form nhsn_data_2024-11-19_16-29-43.191649.rds
get_version_timestamp <- function(filename) ymd_hms(str_match(filename, "[0-9]{4}-..-.._..-..-..\\.[^.^_]*"))

#' all in one function to get and cache a nhsn archive from raw files
#' @description
#' This takes in all of the raw data files for the nhsn data, creates a
#' quasi-archive (it keeps one example per version-day, rather than one per
#' change), and puts it in the bucket. The stored value has the columns
#' geo_value, time_value, disease, endpoint (either basic or prelim), version,
#' version_timestamp (to enable keeping the most recent value), and value.
#' The returned value on the other hand is an actual epi_archive, only
#' containing the data for `disease_name`.
create_nhsn_data_archive <- function(disease_name) {
if (aws.s3::head_object("archive_timestamped.parquet", bucket = "forecasting-team-data")) {
aws.s3::save_object("archive_timestamped.parquet", bucket = "forecasting-team-data", file = here::here("cache/archive_timestamped.parquet"))
previous_archive <- qs::qread(here::here("cache/archive_timestamped.parquet"))
last_timestamp <- max(previous_archive$version_timestamp)
} else {
# there is no remote
previous_archive <- NULL
last_timestamp <- as.Date("1000-01-01")
}
new_data <- aws.s3::get_bucket_df(bucket = "forecasting-team-data", prefix = "nhsn_data_") %>%
filter(get_version_timestamp(Key) > last_timestamp) %>%
pull(Key) %>%
lapply(
function(filename) {
version_timestamp <- get_version_timestamp(filename)
res <- NULL
tryCatch(
{
s3load(object = filename, bucket = "forecasting-team-data")
if (grepl("prelim", filename)) {
res <- epi_data_raw_prelim
endpoint_val <- "prelim"
} else {
res <- epi_data_raw
endpoint_val <- "basic"
}
res <- res %>%
process_nhsn_data() %>%
select(geo_value, disease, time_value, value) %>%
mutate(version_timestamp = version_timestamp, endpoint = endpoint_val)
},
error = function(cond) {}
)
res
}
)
# drop any duplicates on the same day
compactified <-
new_data %>%
bind_rows()
if (nrow(compactified) == 0) {
one_per_day <- previous_archive
} else {
compactified <-
compactified %>%
arrange(geo_value, time_value, disease, endpoint, version_timestamp) %>%
mutate(version = as.Date(version_timestamp)) %>%
filter(if_any(
c(everything(), -endpoint, -version_timestamp), # all non-version, non-endpoint columns
~ !epiprocess:::is_locf(., .Machine$double.eps^0.5)
))

unchanged <- previous_archive %>% filter(!(version %in% unique(compactified$version)))
# only keep the last value for a given version (so across version_timestamps)
# we only need to do this for the versions in compactified, as the other versions can't possibly change
one_per_day <-
previous_archive %>%
filter(version %in% unique(compactified$version)) %>%
bind_rows(compactified) %>%
group_by(geo_value, disease, time_value, version) %>%
arrange(version_timestamp) %>%
filter(row_number() == n()) %>%
ungroup() %>%
bind_rows(unchanged)
qs::qsave(one_per_day, here::here("cache/archive_timestamped.parquet"))
aws.s3::put_object(here::here("cache/archive_timestamped.parquet"), "archive_timestamped.parquet", bucket = "forecasting-team-data")
}
one_per_day %>%
filter(disease == disease_name) %>%
select(-version_timestamp, -endpoint, -disease) %>%
mutate(
geo_value = ifelse(geo_value == "usa", "us", geo_value)
) %>%
filter(geo_value != "mp") %>%
as_epi_archive(compactify = TRUE)
}
32 changes: 17 additions & 15 deletions R/forecasters/forecaster_climatological.R
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
climate_linear_ensembled <- function(epi_data,
outcome,
extra_sources = "",
ahead = 7,
trainer = parsnip::linear_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
scale_method = c("quantile", "std", "none"),
center_method = c("median", "mean", "none"),
nonlin_method = c("quart_root", "none"),
drop_non_seasons = FALSE,
residual_tail = 0.99,
residual_center = 0.35,
...) {
outcome,
extra_sources = "",
ahead = 7,
trainer = parsnip::linear_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
scale_method = c("quantile", "std", "none"),
center_method = c("median", "mean", "none"),
nonlin_method = c("quart_root", "none"),
drop_non_seasons = FALSE,
residual_tail = 0.99,
residual_center = 0.35,
...) {
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)
Expand Down Expand Up @@ -49,7 +49,9 @@ climate_linear_ensembled <- function(epi_data,
}
learned_params <- calculate_whitening_params(season_data, outcome, scale_method, center_method, nonlin_method)
epi_data %<>% data_whitening(outcome, learned_params, nonlin_method)
epi_data <- epi_data %>% select(geo_value, source, time_value, season, value = !!outcome) %>% mutate(epiweek = epiweek(time_value))
epi_data <- epi_data %>%
select(geo_value, source, time_value, season, value = !!outcome) %>%
mutate(epiweek = epiweek(time_value))
pred_climate <- climatological_model(epi_data, ahead) %>% mutate(forecaster = "climate")
pred_geo_climate <- climatological_model(epi_data, ahead, geo_agg = FALSE) %>% mutate(forecaster = "climate_geo")
pred_linear <- forecaster_baseline_linear(epi_data, ahead, residual_tail = residual_tail, residual_center = residual_center) %>% mutate(forecaster = "linear")
Expand Down
4 changes: 2 additions & 2 deletions R/forecasters/forecaster_flusion.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ flusion <- function(epi_data,
adding_source <- FALSE
if (!("source" %in% names(epi_data))) {
adding_source <- TRUE
epi_data$source <- c("none")
epi_data$source <- c("nhsn")
attributes(epi_data)$metadata$other_keys <- "source"
}
if ("season_week" %nin% names(epi_data) | "season" %nin% names(epi_data)) {
Expand Down Expand Up @@ -146,7 +146,7 @@ flusion <- function(epi_data,
preproc %<>%
add_role(all_of(starts_with("slide_value")), new_role = "pre-predictor")
# one-hot encoding of the data source
if (all(levels(epi_data$source) != "none") && dummy_source) {
if (all(levels(epi_data$source) != "nhsn") && dummy_source) {
preproc %<>% step_dummy(source, one_hot = TRUE, keep_original_cols = TRUE, role = "pre-predictor")
}
# one-hot encoding of location
Expand Down
11 changes: 7 additions & 4 deletions R/forecasters/forecaster_no_recent_outcome.R
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ no_recent_outcome <- function(epi_data,
adding_source <- FALSE
if (!("source" %in% names(epi_data))) {
adding_source <- TRUE
epi_data$source <- c("none")
epi_data$source <- c("nhsn")
attributes(epi_data)$metadata$other_keys <- "source"
}
if ("season_week" %nin% names(epi_data)) {
Expand All @@ -59,10 +59,13 @@ no_recent_outcome <- function(epi_data,
args_input[["quantile_levels"]] <- quantile_levels
args_list <- do.call(default_args_list, args_input)
# if you want to hardcode particular predictors in a particular forecaster
predictors <- extra_sources[[1]]
# TODO: Partial match quantile_level coming from here (on Dmitry's machine)
predictors <- c(outcome, extra_sources[[1]])
c(args_list, tmp_pred, trainer) %<-% sanitize_args_predictors_trainer(epi_data, outcome, predictors, trainer, args_list)

if (extra_sources[[1]] == "") {
predictors <- character()
} else {
predictors <- extra_sources[[1]]
}
# end of the copypasta
# finally, any other pre-processing (e.g. smoothing) that isn't performed by
# epipredict
Expand Down
2 changes: 1 addition & 1 deletion R/forecasters/forecaster_scaled_pop.R
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ scaled_pop <- function(epi_data,
scale_method = c("none", "quantile", "std"),
center_method = c("median", "mean", "none"),
nonlin_method = c("quart_root", "none"),
trainer = parsnip::linear_reg(),
trainer = epipredict::quantile_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
Expand Down
2 changes: 1 addition & 1 deletion R/forecasters/forecaster_scaled_pop_seasonal.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ scaled_pop_seasonal <- function(epi_data,
seasonal_backward_window = 5 * 7,
seasonal_forward_window = 3 * 7,
train_residual = FALSE,
trainer = parsnip::linear_reg(),
trainer = epipredict::quantile_reg(),
quantile_levels = covidhub_probs(),
filter_source = "",
filter_agg_level = "",
Expand Down
2 changes: 1 addition & 1 deletion R/forecasters/forecaster_smoothed_scaled.R
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ smoothed_scaled <- function(epi_data,
adding_source <- FALSE
if (!("source" %in% names(epi_data))) {
adding_source <- TRUE
epi_data$source <- c("none")
epi_data$source <- c("nhsn")
attributes(epi_data)$metadata$other_keys <- "source"
}
args_input[["ahead"]] <- ahead
Expand Down
37 changes: 28 additions & 9 deletions R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -172,18 +172,18 @@ data_substitutions <- function(dataset, disease, forecast_generation_date) {
}

parse_prod_weights <- function(filename = here::here("covid_geo_exclusions.csv"),
gen_forecast_date) {
forecast_date) {
all_states <- c(
unique(readr::read_csv("https://raw.githubusercontent.com/cmu-delphi/covidcast-indicators/refs/heads/main/_delphi_utils_python/delphi_utils/data/2020/state_pop.csv", show_col_types = FALSE)$state_id),
"usa", "us"
)
all_prod_weights <- readr::read_csv(filename, comment = "#", show_col_types = FALSE)
# if we haven't set specific weights, use the overall defaults
useful_prod_weights <- filter(all_prod_weights, forecast_date == gen_forecast_date)
useful_prod_weights <- filter(all_prod_weights, forecast_date == forecast_date)
if (nrow(useful_prod_weights) == 0) {
useful_prod_weights <- all_prod_weights %>%
filter(forecast_date == min(forecast_date)) %>%
mutate(forecast_date = gen_forecast_date)
mutate(forecast_date = forecast_date)
}
useful_prod_weights %>%
mutate(
Expand Down Expand Up @@ -280,7 +280,10 @@ write_submission_file <- function(pred, forecast_reference_date, submission_dire
#' Utility to get the reference date for a given date. This is the last day of
#' the epiweek that the date falls in.
get_forecast_reference_date <- function(date) {
MMWRweek::MMWRweek2Date(epiyear(date), epiweek(date)) + 6
if (!lubridate::is.Date(date)) {
date <- as.Date(date)
}
MMWRweek::MMWRweek2Date(lubridate::epiyear(date), lubridate::epiweek(date)) + 6
}

update_site <- function() {
Expand All @@ -303,15 +306,31 @@ update_site <- function() {
stop("Template file does not exist.")
}
report_md_content <- readLines(template_path)

# Get the list of files in the reports directory
report_files <- dir_ls(reports_dir, regexp = ".*_prod.html")
report_files <- dir_ls(reports_dir, regexp = ".*_prod_on_.*.html")
report_table <- tibble(
filename = report_files,
dates = str_match_all(filename, "[0-9]{4}-..-..")
) %>%
unnest_wider(dates, names_sep = "_") %>%
rename(forecast_date = dates_1, generation_date = dates_2) %>%
mutate(
forecast_date = ymd(forecast_date),
generation_date = ymd(generation_date),
disease = str_match(filename, "flu|covid")
)

# Extract dates and sort files by date in descending order
report_files <- report_files[order(as.Date(str_extract(report_files, "\\d{4}-\\d{2}-\\d{2}")), decreasing = FALSE)]
# use the most recently generated forecast, and sort descending on the
# forecast date
used_reports <- report_table %>%
group_by(forecast_date, disease) %>%
arrange(generation_date) %>%
filter(generation_date == max(generation_date)) %>%
ungroup() %>%
arrange(forecast_date)

# Process each report file
for (report_file in report_files) {
for (report_file in used_reports$filename) {
file_name <- path_file(report_file)
file_parts <- str_split(fs::path_ext_remove(file_name), "_", simplify = TRUE)
date <- file_parts[1]
Expand Down
2 changes: 1 addition & 1 deletion scripts/covid_hosp_explore.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if (!exists("ref_time_values_")) {
start_date <- as.Date("2023-10-04")
end_date <- as.Date("2024-04-24")
date_step <- 7L
#ref_time_values_ <- as.Date(c("2023-11-08", "2023-11-22"))
# ref_time_values_ <- as.Date(c("2023-11-08", "2023-11-22"))
}
time_value_adjust <- 3 # this moves the week marker from Saturday to Wednesday

Expand Down
Loading
Loading