Skip to content

Commit 206dfa8

Browse files
authored
Merge pull request #138 from cmu-delphi/climatological_backfill
Climatological backfill
2 parents e49d428 + 5f3baaa commit 206dfa8

21 files changed

+1367
-1170
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ dashboard:
8686
Rscript scripts/dashboard.R
8787

8888
update_site:
89-
Rscript -e "source('R/utils.R'); update_site()"
89+
Rscript -e "suppressPackageStartupMessages(source(here::here('R', 'load_all.R'))); update_site()"
9090

9191
netlify:
9292
netlify deploy --dir=reports --prod

R/aux_data_utils.R

+91
Original file line numberDiff line numberDiff line change
@@ -617,3 +617,94 @@ process_nhsn_data <- function(raw_nhsn_data) {
617617
mutate(version = fixed_version) %>%
618618
relocate(geo_value, disease, time_value, version)
619619
}
620+
621+
622+
# for filenames of the form nhsn_data_2024-11-19_16-29-43.191649.rds
623+
get_version_timestamp <- function(filename) ymd_hms(str_match(filename, "[0-9]{4}-..-.._..-..-..\\.[^.^_]*"))
624+
625+
#' all in one function to get and cache a nhsn archive from raw files
626+
#' @description
627+
#' This takes in all of the raw data files for the nhsn data, creates a
628+
#' quasi-archive (it keeps one example per version-day, rather than one per
629+
#' change), and puts it in the bucket. The stored value has the columns
630+
#' geo_value, time_value, disease, endpoint (either basic or prelim), version,
631+
#' version_timestamp (to enable keeping the most recent value), and value.
632+
#' The returned value on the other hand is an actual epi_archive, only
633+
#' containing the data for `disease_name`.
634+
create_nhsn_data_archive <- function(disease_name) {
635+
if (aws.s3::head_object("archive_timestamped.parquet", bucket = "forecasting-team-data")) {
636+
aws.s3::save_object("archive_timestamped.parquet", bucket = "forecasting-team-data", file = here::here("cache/archive_timestamped.parquet"))
637+
previous_archive <- qs::qread(here::here("cache/archive_timestamped.parquet"))
638+
last_timestamp <- max(previous_archive$version_timestamp)
639+
} else {
640+
# there is no remote
641+
previous_archive <- NULL
642+
last_timestamp <- as.Date("1000-01-01")
643+
}
644+
new_data <- aws.s3::get_bucket_df(bucket = "forecasting-team-data", prefix = "nhsn_data_") %>%
645+
filter(get_version_timestamp(Key) > last_timestamp) %>%
646+
pull(Key) %>%
647+
lapply(
648+
function(filename) {
649+
version_timestamp <- get_version_timestamp(filename)
650+
res <- NULL
651+
tryCatch(
652+
{
653+
s3load(object = filename, bucket = "forecasting-team-data")
654+
if (grepl("prelim", filename)) {
655+
res <- epi_data_raw_prelim
656+
endpoint_val <- "prelim"
657+
} else {
658+
res <- epi_data_raw
659+
endpoint_val <- "basic"
660+
}
661+
res <- res %>%
662+
process_nhsn_data() %>%
663+
select(geo_value, disease, time_value, value) %>%
664+
mutate(version_timestamp = version_timestamp, endpoint = endpoint_val)
665+
},
666+
error = function(cond) {}
667+
)
668+
res
669+
}
670+
)
671+
# drop any duplicates on the same day
672+
compactified <-
673+
new_data %>%
674+
bind_rows()
675+
if (nrow(compactified) == 0) {
676+
one_per_day <- previous_archive
677+
} else {
678+
compactified <-
679+
compactified %>%
680+
arrange(geo_value, time_value, disease, endpoint, version_timestamp) %>%
681+
mutate(version = as.Date(version_timestamp)) %>%
682+
filter(if_any(
683+
c(everything(), -endpoint, -version_timestamp), # all non-version, non-endpoint columns
684+
~ !epiprocess:::is_locf(., .Machine$double.eps^0.5)
685+
))
686+
687+
unchanged <- previous_archive %>% filter(!(version %in% unique(compactified$version)))
688+
# only keep the last value for a given version (so across version_timestamps)
689+
# we only need to do this for the versions in compactified, as the other versions can't possibly change
690+
one_per_day <-
691+
previous_archive %>%
692+
filter(version %in% unique(compactified$version)) %>%
693+
bind_rows(compactified) %>%
694+
group_by(geo_value, disease, time_value, version) %>%
695+
arrange(version_timestamp) %>%
696+
filter(row_number() == n()) %>%
697+
ungroup() %>%
698+
bind_rows(unchanged)
699+
qs::qsave(one_per_day, here::here("cache/archive_timestamped.parquet"))
700+
aws.s3::put_object(here::here("cache/archive_timestamped.parquet"), "archive_timestamped.parquet", bucket = "forecasting-team-data")
701+
}
702+
one_per_day %>%
703+
filter(disease == disease_name) %>%
704+
select(-version_timestamp, -endpoint, -disease) %>%
705+
mutate(
706+
geo_value = ifelse(geo_value == "usa", "us", geo_value)
707+
) %>%
708+
filter(geo_value != "mp") %>%
709+
as_epi_archive(compactify = TRUE)
710+
}

R/forecasters/forecaster_climatological.R

+17-15
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
climate_linear_ensembled <- function(epi_data,
2-
outcome,
3-
extra_sources = "",
4-
ahead = 7,
5-
trainer = parsnip::linear_reg(),
6-
quantile_levels = covidhub_probs(),
7-
filter_source = "",
8-
filter_agg_level = "",
9-
scale_method = c("quantile", "std", "none"),
10-
center_method = c("median", "mean", "none"),
11-
nonlin_method = c("quart_root", "none"),
12-
drop_non_seasons = FALSE,
13-
residual_tail = 0.99,
14-
residual_center = 0.35,
15-
...) {
2+
outcome,
3+
extra_sources = "",
4+
ahead = 7,
5+
trainer = parsnip::linear_reg(),
6+
quantile_levels = covidhub_probs(),
7+
filter_source = "",
8+
filter_agg_level = "",
9+
scale_method = c("quantile", "std", "none"),
10+
center_method = c("median", "mean", "none"),
11+
nonlin_method = c("quart_root", "none"),
12+
drop_non_seasons = FALSE,
13+
residual_tail = 0.99,
14+
residual_center = 0.35,
15+
...) {
1616
scale_method <- arg_match(scale_method)
1717
center_method <- arg_match(center_method)
1818
nonlin_method <- arg_match(nonlin_method)
@@ -49,7 +49,9 @@ climate_linear_ensembled <- function(epi_data,
4949
}
5050
learned_params <- calculate_whitening_params(season_data, outcome, scale_method, center_method, nonlin_method)
5151
epi_data %<>% data_whitening(outcome, learned_params, nonlin_method)
52-
epi_data <- epi_data %>% select(geo_value, source, time_value, season, value = !!outcome) %>% mutate(epiweek = epiweek(time_value))
52+
epi_data <- epi_data %>%
53+
select(geo_value, source, time_value, season, value = !!outcome) %>%
54+
mutate(epiweek = epiweek(time_value))
5355
pred_climate <- climatological_model(epi_data, ahead) %>% mutate(forecaster = "climate")
5456
pred_geo_climate <- climatological_model(epi_data, ahead, geo_agg = FALSE) %>% mutate(forecaster = "climate_geo")
5557
pred_linear <- forecaster_baseline_linear(epi_data, ahead, residual_tail = residual_tail, residual_center = residual_center) %>% mutate(forecaster = "linear")

R/forecasters/forecaster_flusion.R

+2-2
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ flusion <- function(epi_data,
4040
adding_source <- FALSE
4141
if (!("source" %in% names(epi_data))) {
4242
adding_source <- TRUE
43-
epi_data$source <- c("none")
43+
epi_data$source <- c("nhsn")
4444
attributes(epi_data)$metadata$other_keys <- "source"
4545
}
4646
if ("season_week" %nin% names(epi_data) | "season" %nin% names(epi_data)) {
@@ -146,7 +146,7 @@ flusion <- function(epi_data,
146146
preproc %<>%
147147
add_role(all_of(starts_with("slide_value")), new_role = "pre-predictor")
148148
# one-hot encoding of the data source
149-
if (all(levels(epi_data$source) != "none") && dummy_source) {
149+
if (all(levels(epi_data$source) != "nhsn") && dummy_source) {
150150
preproc %<>% step_dummy(source, one_hot = TRUE, keep_original_cols = TRUE, role = "pre-predictor")
151151
}
152152
# one-hot encoding of location

R/forecasters/forecaster_no_recent_outcome.R

+7-4
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ no_recent_outcome <- function(epi_data,
4242
adding_source <- FALSE
4343
if (!("source" %in% names(epi_data))) {
4444
adding_source <- TRUE
45-
epi_data$source <- c("none")
45+
epi_data$source <- c("nhsn")
4646
attributes(epi_data)$metadata$other_keys <- "source"
4747
}
4848
if ("season_week" %nin% names(epi_data)) {
@@ -59,10 +59,13 @@ no_recent_outcome <- function(epi_data,
5959
args_input[["quantile_levels"]] <- quantile_levels
6060
args_list <- do.call(default_args_list, args_input)
6161
# if you want to hardcode particular predictors in a particular forecaster
62-
predictors <- extra_sources[[1]]
63-
# TODO: Partial match quantile_level coming from here (on Dmitry's machine)
62+
predictors <- c(outcome, extra_sources[[1]])
6463
c(args_list, tmp_pred, trainer) %<-% sanitize_args_predictors_trainer(epi_data, outcome, predictors, trainer, args_list)
65-
64+
if (extra_sources[[1]] == "") {
65+
predictors <- character()
66+
} else {
67+
predictors <- extra_sources[[1]]
68+
}
6669
# end of the copypasta
6770
# finally, any other pre-processing (e.g. smoothing) that isn't performed by
6871
# epipredict

R/forecasters/forecaster_scaled_pop.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ scaled_pop <- function(epi_data,
5454
scale_method = c("none", "quantile", "std"),
5555
center_method = c("median", "mean", "none"),
5656
nonlin_method = c("quart_root", "none"),
57-
trainer = parsnip::linear_reg(),
57+
trainer = epipredict::quantile_reg(),
5858
quantile_levels = covidhub_probs(),
5959
filter_source = "",
6060
filter_agg_level = "",

R/forecasters/forecaster_scaled_pop_seasonal.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ scaled_pop_seasonal <- function(epi_data,
4848
seasonal_backward_window = 5 * 7,
4949
seasonal_forward_window = 3 * 7,
5050
train_residual = FALSE,
51-
trainer = parsnip::linear_reg(),
51+
trainer = epipredict::quantile_reg(),
5252
quantile_levels = covidhub_probs(),
5353
filter_source = "",
5454
filter_agg_level = "",

R/forecasters/forecaster_smoothed_scaled.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ smoothed_scaled <- function(epi_data,
9494
adding_source <- FALSE
9595
if (!("source" %in% names(epi_data))) {
9696
adding_source <- TRUE
97-
epi_data$source <- c("none")
97+
epi_data$source <- c("nhsn")
9898
attributes(epi_data)$metadata$other_keys <- "source"
9999
}
100100
args_input[["ahead"]] <- ahead

R/utils.R

+28-10
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,19 @@ data_substitutions <- function(dataset, disease, forecast_generation_date) {
172172
}
173173

174174
parse_prod_weights <- function(filename = here::here("covid_geo_exclusions.csv"),
175-
gen_forecast_date) {
175+
forecast_date_int, forecaster_fns) {
176+
forecast_date <- as.Date(forecast_date_int)
176177
all_states <- c(
177178
unique(readr::read_csv("https://raw.githubusercontent.com/cmu-delphi/covidcast-indicators/refs/heads/main/_delphi_utils_python/delphi_utils/data/2020/state_pop.csv", show_col_types = FALSE)$state_id),
178179
"usa", "us"
179180
)
180181
all_prod_weights <- readr::read_csv(filename, comment = "#", show_col_types = FALSE)
181182
# if we haven't set specific weights, use the overall defaults
182-
useful_prod_weights <- filter(all_prod_weights, forecast_date == gen_forecast_date)
183+
useful_prod_weights <- filter(all_prod_weights, forecast_date == forecast_date)
183184
if (nrow(useful_prod_weights) == 0) {
184185
useful_prod_weights <- all_prod_weights %>%
185186
filter(forecast_date == min(forecast_date)) %>%
186-
mutate(forecast_date = gen_forecast_date)
187+
mutate(forecast_date = forecast_date)
187188
}
188189
useful_prod_weights %>%
189190
mutate(
@@ -196,7 +197,7 @@ parse_prod_weights <- function(filename = here::here("covid_geo_exclusions.csv")
196197
unnest_longer(forecaster) %>%
197198
group_by(forecast_date, forecaster, geo_value) %>%
198199
summarize(weight = min(weight), .groups = "drop") %>%
199-
mutate(forecast_date = as.Date(forecast_date)) %>%
200+
mutate(forecast_date = as.Date(forecast_date_int)) %>%
200201
group_by(forecast_date, geo_value) %>%
201202
mutate(weight = ifelse(near(weight, 0), 0, weight / sum(weight)))
202203
}
@@ -280,7 +281,8 @@ write_submission_file <- function(pred, forecast_reference_date, submission_dire
280281
#' Utility to get the reference date for a given date. This is the last day of
281282
#' the epiweek that the date falls in.
282283
get_forecast_reference_date <- function(date) {
283-
MMWRweek::MMWRweek2Date(epiyear(date), epiweek(date)) + 6
284+
date <- as.Date(date)
285+
MMWRweek::MMWRweek2Date(lubridate::epiyear(date), lubridate::epiweek(date)) + 6
284286
}
285287

286288
update_site <- function() {
@@ -303,15 +305,31 @@ update_site <- function() {
303305
stop("Template file does not exist.")
304306
}
305307
report_md_content <- readLines(template_path)
306-
307308
# Get the list of files in the reports directory
308-
report_files <- dir_ls(reports_dir, regexp = ".*_prod.html")
309+
report_files <- dir_ls(reports_dir, regexp = ".*_prod_on_.*.html")
310+
report_table <- tibble(
311+
filename = report_files,
312+
dates = str_match_all(filename, "[0-9]{4}-..-..")
313+
) %>%
314+
unnest_wider(dates, names_sep = "_") %>%
315+
rename(forecast_date = dates_1, generation_date = dates_2) %>%
316+
mutate(
317+
forecast_date = ymd(forecast_date),
318+
generation_date = ymd(generation_date),
319+
disease = str_match(filename, "flu|covid")
320+
)
309321

310-
# Extract dates and sort files by date in descending order
311-
report_files <- report_files[order(as.Date(str_extract(report_files, "\\d{4}-\\d{2}-\\d{2}")), decreasing = FALSE)]
322+
# use the most recently generated forecast, and sort descending on the
323+
# forecast date
324+
used_reports <- report_table %>%
325+
group_by(forecast_date, disease) %>%
326+
arrange(generation_date) %>%
327+
filter(generation_date == max(generation_date)) %>%
328+
ungroup() %>%
329+
arrange(forecast_date)
312330

313331
# Process each report file
314-
for (report_file in report_files) {
332+
for (report_file in used_reports$filename) {
315333
file_name <- path_file(report_file)
316334
file_parts <- str_split(fs::path_ext_remove(file_name), "_", simplify = TRUE)
317335
date <- file_parts[1]

scripts/covid_hosp_explore.R

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ if (!exists("ref_time_values_")) {
99
start_date <- as.Date("2023-10-04")
1010
end_date <- as.Date("2024-04-24")
1111
date_step <- 7L
12-
#ref_time_values_ <- as.Date(c("2023-11-08", "2023-11-22"))
12+
# ref_time_values_ <- as.Date(c("2023-11-08", "2023-11-22"))
1313
}
1414
time_value_adjust <- 3 # this moves the week marker from Saturday to Wednesday
1515

0 commit comments

Comments
 (0)