Skip to content

Conveyal Update + Automation #1440

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions conveyal_update/conveyal_vars.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,21 @@
from shared_utils import rt_dates

GCS_PATH = 'gs://calitp-analytics-data/data-analyses/conveyal_update/'
TARGET_DATE = rt_dates.DATES['oct2024']
TARGET_DATE = rt_dates.DATES['mar2025']
LOOKBACK_TIME = dt.timedelta(days=60)
OSM_FILE = 'us-west-latest.osm.pbf'
PUBLISHED_FEEDS_YML_PATH = "../gtfs_funnel/published_operators.yml"
# http://download.geofabrik.de/north-america/us-west-latest.osm.pbf
# first download with wget...

conveyal_regions = {}
# boundaries correspond to Conveyal Analysis regions
# conveyal_regions['norcal'] = {'north': 42.03909, 'south': 39.07038, 'east': -119.60541, 'west': -124.49158}
# conveyal_regions['central'] = {'north': 39.64165, 'south': 35.87347, 'east': -117.53174, 'west': -123.83789}
# conveyal_regions['socal'] = {'north': 35.8935, 'south': 32.5005, 'east': -114.13121, 'west': -121.46759}
# conveyal_regions['mojave'] = {'north': 37.81629, 'south': 34.89945, 'east': -114.59015, 'west': -118.38043}
conveyal_regions['norcal'] = {'north': 42.03909, 'south': 39.07038, 'east': -119.60541, 'west': -124.49158}
conveyal_regions['central'] = {'north': 39.64165, 'south': 35.87347, 'east': -117.53174, 'west': -123.83789}
conveyal_regions['socal'] = {'north': 35.8935, 'south': 32.5005, 'east': -114.13121, 'west': -121.46759}
conveyal_regions['mojave'] = {'north': 37.81629, 'south': 34.89945, 'east': -114.59015, 'west': -118.38043}
# special region for one-off SR99 CMCP
conveyal_regions['sr99'] = {'north': 38.71337, 'south': 34.81154, 'east': -118.66882, 'west': -121.66259}
# conveyal_regions['sr99'] = {'north': 38.71337, 'south': 34.81154, 'east': -118.66882, 'west': -121.66259}

# # special region for one-off Centennial Corridor
# conveyal_regions['bakersfield'] = {'north': 36.81, 'south': 34.13, 'east': -117.12, 'west': -120.65}
14 changes: 9 additions & 5 deletions conveyal_update/download_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@
regions = conveyal_vars.conveyal_regions
TARGET_DATE = conveyal_vars.TARGET_DATE

regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet')

def download_feed(row):
# need wildcard for file too -- not all are gtfs.zip!
uri = f'gs://calitp-gtfs-schedule-raw-v2/schedule/dt={row.date.strftime("%Y-%m-%d")}/*/base64_url={row.base64_url}/*.zip'
fs.get(uri, f'{row.path}/{row.gtfs_dataset_name.replace(" ", "_")}_{row.feed_key}_gtfs.zip')
# print(f'downloaded {row.path}/{row.feed_key}_gtfs.zip')
try:
uri = f'gs://calitp-gtfs-schedule-raw-v2/schedule/dt={row.date.strftime("%Y-%m-%d")}/*/base64_url={row.base64_url}/*.zip'
fs.get(uri, f'{row.path}/{row.gtfs_dataset_name.replace(" ", "_")}_{row.feed_key}_gtfs.zip')
# print(f'downloaded {row.path}/{row.feed_key}_gtfs.zip')
except Exception as e:
print(f'\n could not download feed at {e}')

def download_region(feeds_df, region: str):

assert region in regions.keys()
path = f'./feeds_{feeds_df.date.iloc[0].strftime("%Y-%m-%d")}/{region}'
if not os.path.exists(path): os.makedirs(path)
if not os.path.exists(path):
os.makedirs(path)
region = (feeds_df >> filter(_.region == region)).copy()
region['path'] = path
region.progress_apply(download_feed, axis = 1)
Expand All @@ -43,6 +46,7 @@ def generate_script(regions):
f.write('\n'.join(cmds))

if __name__ == '__main__':
regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet')

for region in tqdm(regions.keys()):
download_region(regions_and_feeds, region)
Expand Down
160 changes: 142 additions & 18 deletions conveyal_update/evaluate_feeds.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
import os
os.environ["CALITP_BQ_MAX_BYTES"] = str(800_000_000_000)
from shared_utils import gtfs_utils_v2

from shared_utils import gtfs_utils_v2
from calitp_data_analysis.tables import tbls
from calitp_data_analysis.sql import query_sql
from siuba import *
import pandas as pd
import datetime as dt

import conveyal_vars

TARGET_DATE = conveyal_vars.TARGET_DATE
REGIONAL_SUBFEED_NAME = "Regional Subfeed"
INT_TO_GTFS_WEEKDAY = {
0: "monday",
1: "tuesday",
2: "wednesday",
3: "thursday",
4: "friday",
5: "saturday",
6: "sunday"
}

def check_defined_elsewhere(row, df):
'''
for feeds without service defined, check if the same service is captured in another feed that does include service
Expand All @@ -17,8 +29,6 @@ def check_defined_elsewhere(row, df):
row['service_any_feed'] = is_defined
return row

TARGET_DATE = conveyal_vars.TARGET_DATE

def get_feeds_check_service():
feeds_on_target = gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name(selected_date=TARGET_DATE)
feeds_on_target = feeds_on_target.rename(columns={'name':'gtfs_dataset_name'})
Expand All @@ -38,39 +48,153 @@ def get_feeds_check_service():
return feeds_on_target

def attach_transit_services(feeds_on_target: pd.DataFrame):

"""Associate each feed in feeds_on_target.gtfs_dataset_key with a transit service"""
target_dt = dt.datetime.combine(dt.date.fromisoformat(TARGET_DATE), dt.time(0))

services = (tbls.mart_transit_database.dim_gtfs_service_data()
>> filter(_._valid_from <= target_dt, _._valid_to > target_dt)
>> filter(
_._valid_from <= target_dt, _._valid_to > target_dt
)
# >> filter(_.gtfs_dataset_key == 'da7e9e09d3eec6c7686adc21c8b28b63') # test with BCT
# >> filter(_.service_key == '5bc7371dca26d74a99be945b18b3174e')
>> select(_.service_key, _.gtfs_dataset_key)
>> select(_.service_key, _.gtfs_dataset_key, _.customer_facing)
>> collect()
)

feeds_on_target = feeds_on_target >> left_join(_, services, on='gtfs_dataset_key')
return feeds_on_target
feeds_services_merged = feeds_on_target.merge(
services, how="left", on='gtfs_dataset_key', validate="one_to_many"
)
feeds_services_filtered = feeds_services_merged.loc[
feeds_services_merged["customer_facing"] | (feeds_services_merged["regional_feed_type"] == REGIONAL_SUBFEED_NAME)
].copy()
return feeds_services_filtered

def report_undefined(feeds_on_target: pd.DataFrame):
fname = 'no_apparent_service.csv'
def get_undefined_feeds(feeds_on_target: pd.DataFrame) -> pd.DataFrame:
"""Return feeds in feeds_on_target that do not have service and where service is not defined in another feed"""
undefined = feeds_on_target.apply(check_defined_elsewhere, axis=1, args=[feeds_on_target]) >> filter(-_.service_any_feed)
return undefined

def report_unavailable_feeds(feeds: pd.DataFrame, fname: str) -> None:
"""Create a csv report of unavailable or backdated feeds at the paths specified in fname"""
undefined = feeds.loc[
feeds["valid_date_other_than_service_date"] | feeds["no_schedule_feed_found"]
].copy()
if undefined.empty:
print('no undefined service feeds')
else:
print(undefined.columns)
print('these feeds have no service defined on target date, nor are their services captured in other feeds:')
# gtfs_dataset_name no longer present, this whole script should probably be updated/replaced
print(undefined >> select(_.gtfs_dataset_name, _.service_any_feed))
print(undefined.loc[undefined["no_schedule_feed_found"], "gtfs_dataset_name"].drop_duplicates())
print('these feeds have defined service, but only in a feed defined on a prior day')
print(undefined.loc[undefined["valid_date_other_than_service_date"], "gtfs_dataset_name"].drop_duplicates())
print(f'saving detailed csv to {fname}')
undefined.to_csv(fname)
return
undefined.to_csv(fname, index=False)

ISO_DATE_ONLY_FORMAT = "%y-%m-%d"

def get_old_feeds(undefined_feeds_base64_urls: pd.Series, target_date: dt.date | dt.datetime, max_lookback_timedelta: dt.timedelta) -> pd.Series:
"""
Search the warehouse for feeds downloaded within the time before target_date
defined by max_lookback_timedelta that have service as defined in calendar.txt
on target_date. These feeds will not be valid on target_date, but will be accepted by Conveyal.
This should not be used if the feeds are valid on the target_date, since this will provide needlessly
invalid feeds. Note that this does not check calendar_dates.txt at present

Parameters:
undefined_feeds_base64_urls: a Pandas series containing base64 urls to feeds in the warehouse
target_date: a date or datetime where the feeds should be valid based on calendar.txt
max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for

Returns:
A DataFrame with the following index and columns:
index: The base64 url of the feed, will match entries in undefined_feeds_base64_urls
feed_key: A key to dim_schedule_feeds matching the feed on the date it was last valid in the warehouse
date_processed: A datetime date matching the date on which the feed was last valid in the warehosue
"""
base_64_urls_str = "('" + "', '".join(undefined_feeds_base64_urls) + "')"
day_of_the_week = INT_TO_GTFS_WEEKDAY[target_date.weekday()]
max_lookback_date = target_date - max_lookback_timedelta
target_date_iso = target_date.strftime(ISO_DATE_ONLY_FORMAT)
# Query feeds for the newest feed where service is defined on the target_date,
# that have service on the day of the week of the target date, and
# that are valid before (inclusive) the target date and after (inclusive) the max look back date,
query = f"""
SELECT
`mart_gtfs.dim_schedule_feeds`.base64_url AS base64_url,
`mart_gtfs.dim_schedule_feeds`.key as feed_key,
MAX(`mart_gtfs.dim_schedule_feeds`._valid_to) AS valid_feed_date
from `mart_gtfs.dim_schedule_feeds`
LEFT JOIN `mart_gtfs.dim_calendar`
ON `mart_gtfs.dim_schedule_feeds`.key = `mart_gtfs.dim_calendar`.feed_key
WHERE `mart_gtfs.dim_schedule_feeds`.base64_url IN {base_64_urls_str}
AND `mart_gtfs.dim_schedule_feeds`._valid_to >= '{max_lookback_date}'
AND `mart_gtfs.dim_schedule_feeds`._valid_to <= '{target_date}'
AND `mart_gtfs.dim_calendar`.{day_of_the_week} = 1
AND `mart_gtfs.dim_calendar`.start_date <= '{target_date}'
AND `mart_gtfs.dim_calendar`.end_date >= '{target_date}'
GROUP BY
`mart_gtfs.dim_schedule_feeds`.base64_url,
`mart_gtfs.dim_schedule_feeds`.key
LIMIT 1000
"""
response = query_sql(
query
)
response_grouped = response.groupby("base64_url")
feed_info_by_url = response_grouped[["valid_feed_date", "feed_key"]].first()
feed_info_by_url["date_processed"] = feed_info_by_url["valid_feed_date"].dt.date - dt.timedelta(days=1)
# we have the day the feed becomes invalid, so the day we are interested in where the feed *is* valid is the day after
return feed_info_by_url.drop("valid_feed_date", axis=1)

def merge_old_feeds(df_all_feeds: pd.DataFrame, df_undefined_feeds: pd.DataFrame, target_date: dt.date, max_lookback_timedelta: dt.timedelta) -> pd.DataFrame:
"""
Merge feeds from df_all_feeds with old feeds found as a result of calling get_old_feeds with df_undefined_feeds.base64_url

Params:
df_all_feeds: A DataFrame of feeds, must have feed_key, date, and base64_url as columns and must include the base64_urls in df_undefined_feeds
df_undefined_feeds: A DataFrame of feeds that are not valid on target_date, where an old feed should be searched for.
Must have base64_url as a column
target_date: a date or datetime where the feed should be valid based on its target date
max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for

Returns:
A DataFrame identical to df_all_feeds except with the following columns changed or added:
feed_key: Updated for the found feeds
date: Updated for the found feeds:
no_schedule_feed_found: True if a schedule feed was present in df_undefined_feeds but was not associated with an older feed, otherwise false
valid_date_other_than_service_date: True if a new feed was found, otherwise false
"""
feed_search_result = get_old_feeds(
df_undefined_feeds["base64_url"],
target_date,
max_lookback_timedelta
)
feeds_merged = df_all_feeds.merge(
feed_search_result,
how="left",
left_on="base64_url",
right_index=True,
validate="many_to_one"
)
feeds_merged["feed_key"] = feeds_merged["feed_key_y"].fillna(feeds_merged["feed_key_x"])
feeds_merged["no_schedule_feed_found"] = (
(feeds_merged["base64_url"].isin(df_undefined_feeds["base64_url"])) & (~feeds_merged["base64_url"].isin(feed_search_result.index))
).fillna(False)
feeds_merged["date"] = feeds_merged["date_processed"].fillna(target_date)
feeds_merged["valid_date_other_than_service_date"] = feeds_merged["date"] != target_date

return feeds_merged.drop(
["date_processed", "feed_key_x", "feed_key_y"], axis=1
)

if __name__ == '__main__':

feeds_on_target = get_feeds_check_service()
feeds_on_target = attach_transit_services(feeds_on_target)
print(f'feeds on target date shape: {feeds_on_target.shape}')
report_undefined(feeds_on_target)
feeds_on_target.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
undefined_feeds = get_undefined_feeds(feeds_on_target)
feeds_merged = merge_old_feeds(
feeds_on_target, undefined_feeds, dt.date.fromisoformat(TARGET_DATE), conveyal_vars.LOOKBACK_TIME
)
report_unavailable_feeds(feeds_merged, 'no_apparent_service.csv')
feeds_merged.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')

18 changes: 14 additions & 4 deletions conveyal_update/match_feeds_regions.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

regions = conveyal_vars.conveyal_regions
TARGET_DATE = conveyal_vars.TARGET_DATE
feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')

def create_region_gdf():
# https://shapely.readthedocs.io/en/stable/reference/shapely.box.html#shapely.box
Expand All @@ -23,17 +22,28 @@ def create_region_gdf():
df['bbox'] = df.apply(to_bbox, axis=1)
df['geometry'] = df.apply(lambda x: shapely.geometry.box(*x.bbox), axis = 1)
df = df >> select(-_.bbox)
region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers)
region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers_m)
return region_gdf

def get_stops_dates(feeds_on_target: pd.DataFrame, feed_key_column_name: str = "feed_key", date_column_name: str = "date"):
"""Get stops for the feeds in feeds_on_target based on their date"""
all_stops = feeds_on_target.groupby(date_column_name)[feed_key_column_name].apply(
lambda feed_key_column: gtfs_utils_v2.get_stops(
selected_date=feed_key_column.name,
operator_feeds=feed_key_column
)
)
return all_stops

def join_stops_regions(region_gdf: gpd.GeoDataFrame, feeds_on_target: pd.DataFrame):
all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key).to_crs(geography_utils.CA_NAD83Albers)
#all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key)
all_stops = get_stops_dates(feeds_on_target).to_crs(geography_utils.CA_NAD83Albers_m)
region_join = gpd.sjoin(region_gdf, all_stops)
regions_and_feeds = region_join >> distinct(_.region, _.feed_key)
return regions_and_feeds

if __name__ == '__main__':

feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
region_gdf = create_region_gdf()
regions_and_feeds = join_stops_regions(region_gdf, feeds_on_target)
regions_and_feeds = regions_and_feeds >> inner_join(_, feeds_on_target >> select(_.feed_key, _.gtfs_dataset_name, _.base64_url,
Expand Down