diff --git a/conveyal_update/conveyal_vars.py b/conveyal_update/conveyal_vars.py index c7996961d..a5d6b3c24 100644 --- a/conveyal_update/conveyal_vars.py +++ b/conveyal_update/conveyal_vars.py @@ -2,19 +2,21 @@ from shared_utils import rt_dates GCS_PATH = 'gs://calitp-analytics-data/data-analyses/conveyal_update/' -TARGET_DATE = rt_dates.DATES['oct2024'] +TARGET_DATE = rt_dates.DATES['mar2025'] +LOOKBACK_TIME = dt.timedelta(days=60) OSM_FILE = 'us-west-latest.osm.pbf' +PUBLISHED_FEEDS_YML_PATH = "../gtfs_funnel/published_operators.yml" # http://download.geofabrik.de/north-america/us-west-latest.osm.pbf # first download with wget... conveyal_regions = {} # boundaries correspond to Conveyal Analysis regions -# conveyal_regions['norcal'] = {'north': 42.03909, 'south': 39.07038, 'east': -119.60541, 'west': -124.49158} -# conveyal_regions['central'] = {'north': 39.64165, 'south': 35.87347, 'east': -117.53174, 'west': -123.83789} -# conveyal_regions['socal'] = {'north': 35.8935, 'south': 32.5005, 'east': -114.13121, 'west': -121.46759} -# conveyal_regions['mojave'] = {'north': 37.81629, 'south': 34.89945, 'east': -114.59015, 'west': -118.38043} +conveyal_regions['norcal'] = {'north': 42.03909, 'south': 39.07038, 'east': -119.60541, 'west': -124.49158} +conveyal_regions['central'] = {'north': 39.64165, 'south': 35.87347, 'east': -117.53174, 'west': -123.83789} +conveyal_regions['socal'] = {'north': 35.8935, 'south': 32.5005, 'east': -114.13121, 'west': -121.46759} +conveyal_regions['mojave'] = {'north': 37.81629, 'south': 34.89945, 'east': -114.59015, 'west': -118.38043} # special region for one-off SR99 CMCP -conveyal_regions['sr99'] = {'north': 38.71337, 'south': 34.81154, 'east': -118.66882, 'west': -121.66259} +# conveyal_regions['sr99'] = {'north': 38.71337, 'south': 34.81154, 'east': -118.66882, 'west': -121.66259} # # special region for one-off Centennial Corridor # conveyal_regions['bakersfield'] = {'north': 36.81, 'south': 34.13, 'east': -117.12, 'west': -120.65} \ No newline at end of file diff --git a/conveyal_update/download_data.py b/conveyal_update/download_data.py index 3dcdf0f84..1b5880115 100644 --- a/conveyal_update/download_data.py +++ b/conveyal_update/download_data.py @@ -14,19 +14,22 @@ regions = conveyal_vars.conveyal_regions TARGET_DATE = conveyal_vars.TARGET_DATE -regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet') def download_feed(row): # need wildcard for file too -- not all are gtfs.zip! - uri = f'gs://calitp-gtfs-schedule-raw-v2/schedule/dt={row.date.strftime("%Y-%m-%d")}/*/base64_url={row.base64_url}/*.zip' - fs.get(uri, f'{row.path}/{row.gtfs_dataset_name.replace(" ", "_")}_{row.feed_key}_gtfs.zip') - # print(f'downloaded {row.path}/{row.feed_key}_gtfs.zip') + try: + uri = f'gs://calitp-gtfs-schedule-raw-v2/schedule/dt={row.date.strftime("%Y-%m-%d")}/*/base64_url={row.base64_url}/*.zip' + fs.get(uri, f'{row.path}/{row.gtfs_dataset_name.replace(" ", "_")}_{row.feed_key}_gtfs.zip') + # print(f'downloaded {row.path}/{row.feed_key}_gtfs.zip') + except Exception as e: + print(f'\n could not download feed at {e}') def download_region(feeds_df, region: str): assert region in regions.keys() path = f'./feeds_{feeds_df.date.iloc[0].strftime("%Y-%m-%d")}/{region}' - if not os.path.exists(path): os.makedirs(path) + if not os.path.exists(path): + os.makedirs(path) region = (feeds_df >> filter(_.region == region)).copy() region['path'] = path region.progress_apply(download_feed, axis = 1) @@ -43,6 +46,7 @@ def generate_script(regions): f.write('\n'.join(cmds)) if __name__ == '__main__': + regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet') for region in tqdm(regions.keys()): download_region(regions_and_feeds, region) diff --git a/conveyal_update/evaluate_feeds.py b/conveyal_update/evaluate_feeds.py index 3e1fe3ae7..6a852d8d2 100644 --- a/conveyal_update/evaluate_feeds.py +++ b/conveyal_update/evaluate_feeds.py @@ -1,14 +1,26 @@ import os os.environ["CALITP_BQ_MAX_BYTES"] = str(800_000_000_000) -from shared_utils import gtfs_utils_v2 +from shared_utils import gtfs_utils_v2 from calitp_data_analysis.tables import tbls +from calitp_data_analysis.sql import query_sql from siuba import * import pandas as pd import datetime as dt - import conveyal_vars +TARGET_DATE = conveyal_vars.TARGET_DATE +REGIONAL_SUBFEED_NAME = "Regional Subfeed" +INT_TO_GTFS_WEEKDAY = { + 0: "monday", + 1: "tuesday", + 2: "wednesday", + 3: "thursday", + 4: "friday", + 5: "saturday", + 6: "sunday" +} + def check_defined_elsewhere(row, df): ''' for feeds without service defined, check if the same service is captured in another feed that does include service @@ -17,8 +29,6 @@ def check_defined_elsewhere(row, df): row['service_any_feed'] = is_defined return row -TARGET_DATE = conveyal_vars.TARGET_DATE - def get_feeds_check_service(): feeds_on_target = gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name(selected_date=TARGET_DATE) feeds_on_target = feeds_on_target.rename(columns={'name':'gtfs_dataset_name'}) @@ -38,39 +48,153 @@ def get_feeds_check_service(): return feeds_on_target def attach_transit_services(feeds_on_target: pd.DataFrame): - + """Associate each feed in feeds_on_target.gtfs_dataset_key with a transit service""" target_dt = dt.datetime.combine(dt.date.fromisoformat(TARGET_DATE), dt.time(0)) services = (tbls.mart_transit_database.dim_gtfs_service_data() - >> filter(_._valid_from <= target_dt, _._valid_to > target_dt) + >> filter( + _._valid_from <= target_dt, _._valid_to > target_dt + ) # >> filter(_.gtfs_dataset_key == 'da7e9e09d3eec6c7686adc21c8b28b63') # test with BCT # >> filter(_.service_key == '5bc7371dca26d74a99be945b18b3174e') - >> select(_.service_key, _.gtfs_dataset_key) + >> select(_.service_key, _.gtfs_dataset_key, _.customer_facing) >> collect() ) - feeds_on_target = feeds_on_target >> left_join(_, services, on='gtfs_dataset_key') - return feeds_on_target + feeds_services_merged = feeds_on_target.merge( + services, how="left", on='gtfs_dataset_key', validate="one_to_many" + ) + feeds_services_filtered = feeds_services_merged.loc[ + feeds_services_merged["customer_facing"] | (feeds_services_merged["regional_feed_type"] == REGIONAL_SUBFEED_NAME) + ].copy() + return feeds_services_filtered -def report_undefined(feeds_on_target: pd.DataFrame): - fname = 'no_apparent_service.csv' +def get_undefined_feeds(feeds_on_target: pd.DataFrame) -> pd.DataFrame: + """Return feeds in feeds_on_target that do not have service and where service is not defined in another feed""" undefined = feeds_on_target.apply(check_defined_elsewhere, axis=1, args=[feeds_on_target]) >> filter(-_.service_any_feed) + return undefined + +def report_unavailable_feeds(feeds: pd.DataFrame, fname: str) -> None: + """Create a csv report of unavailable or backdated feeds at the paths specified in fname""" + undefined = feeds.loc[ + feeds["valid_date_other_than_service_date"] | feeds["no_schedule_feed_found"] + ].copy() if undefined.empty: print('no undefined service feeds') else: - print(undefined.columns) print('these feeds have no service defined on target date, nor are their services captured in other feeds:') - # gtfs_dataset_name no longer present, this whole script should probably be updated/replaced - print(undefined >> select(_.gtfs_dataset_name, _.service_any_feed)) + print(undefined.loc[undefined["no_schedule_feed_found"], "gtfs_dataset_name"].drop_duplicates()) + print('these feeds have defined service, but only in a feed defined on a prior day') + print(undefined.loc[undefined["valid_date_other_than_service_date"], "gtfs_dataset_name"].drop_duplicates()) print(f'saving detailed csv to {fname}') - undefined.to_csv(fname) - return + undefined.to_csv(fname, index=False) + +ISO_DATE_ONLY_FORMAT = "%y-%m-%d" + +def get_old_feeds(undefined_feeds_base64_urls: pd.Series, target_date: dt.date | dt.datetime, max_lookback_timedelta: dt.timedelta) -> pd.Series: + """ + Search the warehouse for feeds downloaded within the time before target_date + defined by max_lookback_timedelta that have service as defined in calendar.txt + on target_date. These feeds will not be valid on target_date, but will be accepted by Conveyal. + This should not be used if the feeds are valid on the target_date, since this will provide needlessly + invalid feeds. Note that this does not check calendar_dates.txt at present + + Parameters: + undefined_feeds_base64_urls: a Pandas series containing base64 urls to feeds in the warehouse + target_date: a date or datetime where the feeds should be valid based on calendar.txt + max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for + + Returns: + A DataFrame with the following index and columns: + index: The base64 url of the feed, will match entries in undefined_feeds_base64_urls + feed_key: A key to dim_schedule_feeds matching the feed on the date it was last valid in the warehouse + date_processed: A datetime date matching the date on which the feed was last valid in the warehosue + """ + base_64_urls_str = "('" + "', '".join(undefined_feeds_base64_urls) + "')" + day_of_the_week = INT_TO_GTFS_WEEKDAY[target_date.weekday()] + max_lookback_date = target_date - max_lookback_timedelta + target_date_iso = target_date.strftime(ISO_DATE_ONLY_FORMAT) + # Query feeds for the newest feed where service is defined on the target_date, + # that have service on the day of the week of the target date, and + # that are valid before (inclusive) the target date and after (inclusive) the max look back date, + query = f""" + SELECT + `mart_gtfs.dim_schedule_feeds`.base64_url AS base64_url, + `mart_gtfs.dim_schedule_feeds`.key as feed_key, + MAX(`mart_gtfs.dim_schedule_feeds`._valid_to) AS valid_feed_date + from `mart_gtfs.dim_schedule_feeds` + LEFT JOIN `mart_gtfs.dim_calendar` + ON `mart_gtfs.dim_schedule_feeds`.key = `mart_gtfs.dim_calendar`.feed_key + WHERE `mart_gtfs.dim_schedule_feeds`.base64_url IN {base_64_urls_str} + AND `mart_gtfs.dim_schedule_feeds`._valid_to >= '{max_lookback_date}' + AND `mart_gtfs.dim_schedule_feeds`._valid_to <= '{target_date}' + AND `mart_gtfs.dim_calendar`.{day_of_the_week} = 1 + AND `mart_gtfs.dim_calendar`.start_date <= '{target_date}' + AND `mart_gtfs.dim_calendar`.end_date >= '{target_date}' + GROUP BY + `mart_gtfs.dim_schedule_feeds`.base64_url, + `mart_gtfs.dim_schedule_feeds`.key + LIMIT 1000 + """ + response = query_sql( + query + ) + response_grouped = response.groupby("base64_url") + feed_info_by_url = response_grouped[["valid_feed_date", "feed_key"]].first() + feed_info_by_url["date_processed"] = feed_info_by_url["valid_feed_date"].dt.date - dt.timedelta(days=1) + # we have the day the feed becomes invalid, so the day we are interested in where the feed *is* valid is the day after + return feed_info_by_url.drop("valid_feed_date", axis=1) + +def merge_old_feeds(df_all_feeds: pd.DataFrame, df_undefined_feeds: pd.DataFrame, target_date: dt.date, max_lookback_timedelta: dt.timedelta) -> pd.DataFrame: + """ + Merge feeds from df_all_feeds with old feeds found as a result of calling get_old_feeds with df_undefined_feeds.base64_url + + Params: + df_all_feeds: A DataFrame of feeds, must have feed_key, date, and base64_url as columns and must include the base64_urls in df_undefined_feeds + df_undefined_feeds: A DataFrame of feeds that are not valid on target_date, where an old feed should be searched for. + Must have base64_url as a column + target_date: a date or datetime where the feed should be valid based on its target date + max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for + + Returns: + A DataFrame identical to df_all_feeds except with the following columns changed or added: + feed_key: Updated for the found feeds + date: Updated for the found feeds: + no_schedule_feed_found: True if a schedule feed was present in df_undefined_feeds but was not associated with an older feed, otherwise false + valid_date_other_than_service_date: True if a new feed was found, otherwise false + """ + feed_search_result = get_old_feeds( + df_undefined_feeds["base64_url"], + target_date, + max_lookback_timedelta + ) + feeds_merged = df_all_feeds.merge( + feed_search_result, + how="left", + left_on="base64_url", + right_index=True, + validate="many_to_one" + ) + feeds_merged["feed_key"] = feeds_merged["feed_key_y"].fillna(feeds_merged["feed_key_x"]) + feeds_merged["no_schedule_feed_found"] = ( + (feeds_merged["base64_url"].isin(df_undefined_feeds["base64_url"])) & (~feeds_merged["base64_url"].isin(feed_search_result.index)) + ).fillna(False) + feeds_merged["date"] = feeds_merged["date_processed"].fillna(target_date) + feeds_merged["valid_date_other_than_service_date"] = feeds_merged["date"] != target_date + + return feeds_merged.drop( + ["date_processed", "feed_key_x", "feed_key_y"], axis=1 + ) if __name__ == '__main__': feeds_on_target = get_feeds_check_service() feeds_on_target = attach_transit_services(feeds_on_target) print(f'feeds on target date shape: {feeds_on_target.shape}') - report_undefined(feeds_on_target) - feeds_on_target.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet') + undefined_feeds = get_undefined_feeds(feeds_on_target) + feeds_merged = merge_old_feeds( + feeds_on_target, undefined_feeds, dt.date.fromisoformat(TARGET_DATE), conveyal_vars.LOOKBACK_TIME + ) + report_unavailable_feeds(feeds_merged, 'no_apparent_service.csv') + feeds_merged.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet') \ No newline at end of file diff --git a/conveyal_update/match_feeds_regions.py b/conveyal_update/match_feeds_regions.py index 7f980437c..8e201accd 100644 --- a/conveyal_update/match_feeds_regions.py +++ b/conveyal_update/match_feeds_regions.py @@ -13,7 +13,6 @@ regions = conveyal_vars.conveyal_regions TARGET_DATE = conveyal_vars.TARGET_DATE -feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet') def create_region_gdf(): # https://shapely.readthedocs.io/en/stable/reference/shapely.box.html#shapely.box @@ -23,17 +22,28 @@ def create_region_gdf(): df['bbox'] = df.apply(to_bbox, axis=1) df['geometry'] = df.apply(lambda x: shapely.geometry.box(*x.bbox), axis = 1) df = df >> select(-_.bbox) - region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers) + region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers_m) return region_gdf +def get_stops_dates(feeds_on_target: pd.DataFrame, feed_key_column_name: str = "feed_key", date_column_name: str = "date"): + """Get stops for the feeds in feeds_on_target based on their date""" + all_stops = feeds_on_target.groupby(date_column_name)[feed_key_column_name].apply( + lambda feed_key_column: gtfs_utils_v2.get_stops( + selected_date=feed_key_column.name, + operator_feeds=feed_key_column + ) + ) + return all_stops + def join_stops_regions(region_gdf: gpd.GeoDataFrame, feeds_on_target: pd.DataFrame): - all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key).to_crs(geography_utils.CA_NAD83Albers) + #all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key) + all_stops = get_stops_dates(feeds_on_target).to_crs(geography_utils.CA_NAD83Albers_m) region_join = gpd.sjoin(region_gdf, all_stops) regions_and_feeds = region_join >> distinct(_.region, _.feed_key) return regions_and_feeds if __name__ == '__main__': - + feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet') region_gdf = create_region_gdf() regions_and_feeds = join_stops_regions(region_gdf, feeds_on_target) regions_and_feeds = regions_and_feeds >> inner_join(_, feeds_on_target >> select(_.feed_key, _.gtfs_dataset_name, _.base64_url,