Skip to content

Commit e65c5e3

Browse files
authored
Merge pull request #1440 from cal-itp/mar-2025-conveyal-update
Conveyal Update + Automation
2 parents 5728cb6 + 7a825a2 commit e65c5e3

File tree

4 files changed

+173
-33
lines changed

4 files changed

+173
-33
lines changed

Diff for: conveyal_update/conveyal_vars.py

+8-6
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,21 @@
22
from shared_utils import rt_dates
33

44
GCS_PATH = 'gs://calitp-analytics-data/data-analyses/conveyal_update/'
5-
TARGET_DATE = rt_dates.DATES['oct2024']
5+
TARGET_DATE = rt_dates.DATES['mar2025']
6+
LOOKBACK_TIME = dt.timedelta(days=60)
67
OSM_FILE = 'us-west-latest.osm.pbf'
8+
PUBLISHED_FEEDS_YML_PATH = "../gtfs_funnel/published_operators.yml"
79
# http://download.geofabrik.de/north-america/us-west-latest.osm.pbf
810
# first download with wget...
911

1012
conveyal_regions = {}
1113
# boundaries correspond to Conveyal Analysis regions
12-
# conveyal_regions['norcal'] = {'north': 42.03909, 'south': 39.07038, 'east': -119.60541, 'west': -124.49158}
13-
# conveyal_regions['central'] = {'north': 39.64165, 'south': 35.87347, 'east': -117.53174, 'west': -123.83789}
14-
# conveyal_regions['socal'] = {'north': 35.8935, 'south': 32.5005, 'east': -114.13121, 'west': -121.46759}
15-
# conveyal_regions['mojave'] = {'north': 37.81629, 'south': 34.89945, 'east': -114.59015, 'west': -118.38043}
14+
conveyal_regions['norcal'] = {'north': 42.03909, 'south': 39.07038, 'east': -119.60541, 'west': -124.49158}
15+
conveyal_regions['central'] = {'north': 39.64165, 'south': 35.87347, 'east': -117.53174, 'west': -123.83789}
16+
conveyal_regions['socal'] = {'north': 35.8935, 'south': 32.5005, 'east': -114.13121, 'west': -121.46759}
17+
conveyal_regions['mojave'] = {'north': 37.81629, 'south': 34.89945, 'east': -114.59015, 'west': -118.38043}
1618
# special region for one-off SR99 CMCP
17-
conveyal_regions['sr99'] = {'north': 38.71337, 'south': 34.81154, 'east': -118.66882, 'west': -121.66259}
19+
# conveyal_regions['sr99'] = {'north': 38.71337, 'south': 34.81154, 'east': -118.66882, 'west': -121.66259}
1820

1921
# # special region for one-off Centennial Corridor
2022
# conveyal_regions['bakersfield'] = {'north': 36.81, 'south': 34.13, 'east': -117.12, 'west': -120.65}

Diff for: conveyal_update/download_data.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
regions = conveyal_vars.conveyal_regions
1515
TARGET_DATE = conveyal_vars.TARGET_DATE
1616

17-
regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet')
1817

1918
def download_feed(row):
2019
# need wildcard for file too -- not all are gtfs.zip!
21-
uri = f'gs://calitp-gtfs-schedule-raw-v2/schedule/dt={row.date.strftime("%Y-%m-%d")}/*/base64_url={row.base64_url}/*.zip'
22-
fs.get(uri, f'{row.path}/{row.gtfs_dataset_name.replace(" ", "_")}_{row.feed_key}_gtfs.zip')
23-
# print(f'downloaded {row.path}/{row.feed_key}_gtfs.zip')
20+
try:
21+
uri = f'gs://calitp-gtfs-schedule-raw-v2/schedule/dt={row.date.strftime("%Y-%m-%d")}/*/base64_url={row.base64_url}/*.zip'
22+
fs.get(uri, f'{row.path}/{row.gtfs_dataset_name.replace(" ", "_")}_{row.feed_key}_gtfs.zip')
23+
# print(f'downloaded {row.path}/{row.feed_key}_gtfs.zip')
24+
except Exception as e:
25+
print(f'\n could not download feed at {e}')
2426

2527
def download_region(feeds_df, region: str):
2628

2729
assert region in regions.keys()
2830
path = f'./feeds_{feeds_df.date.iloc[0].strftime("%Y-%m-%d")}/{region}'
29-
if not os.path.exists(path): os.makedirs(path)
31+
if not os.path.exists(path):
32+
os.makedirs(path)
3033
region = (feeds_df >> filter(_.region == region)).copy()
3134
region['path'] = path
3235
region.progress_apply(download_feed, axis = 1)
@@ -43,6 +46,7 @@ def generate_script(regions):
4346
f.write('\n'.join(cmds))
4447

4548
if __name__ == '__main__':
49+
regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet')
4650

4751
for region in tqdm(regions.keys()):
4852
download_region(regions_and_feeds, region)

Diff for: conveyal_update/evaluate_feeds.py

+142-18
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
import os
22
os.environ["CALITP_BQ_MAX_BYTES"] = str(800_000_000_000)
3-
from shared_utils import gtfs_utils_v2
43

4+
from shared_utils import gtfs_utils_v2
55
from calitp_data_analysis.tables import tbls
6+
from calitp_data_analysis.sql import query_sql
67
from siuba import *
78
import pandas as pd
89
import datetime as dt
9-
1010
import conveyal_vars
1111

12+
TARGET_DATE = conveyal_vars.TARGET_DATE
13+
REGIONAL_SUBFEED_NAME = "Regional Subfeed"
14+
INT_TO_GTFS_WEEKDAY = {
15+
0: "monday",
16+
1: "tuesday",
17+
2: "wednesday",
18+
3: "thursday",
19+
4: "friday",
20+
5: "saturday",
21+
6: "sunday"
22+
}
23+
1224
def check_defined_elsewhere(row, df):
1325
'''
1426
for feeds without service defined, check if the same service is captured in another feed that does include service
@@ -17,8 +29,6 @@ def check_defined_elsewhere(row, df):
1729
row['service_any_feed'] = is_defined
1830
return row
1931

20-
TARGET_DATE = conveyal_vars.TARGET_DATE
21-
2232
def get_feeds_check_service():
2333
feeds_on_target = gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name(selected_date=TARGET_DATE)
2434
feeds_on_target = feeds_on_target.rename(columns={'name':'gtfs_dataset_name'})
@@ -38,39 +48,153 @@ def get_feeds_check_service():
3848
return feeds_on_target
3949

4050
def attach_transit_services(feeds_on_target: pd.DataFrame):
41-
51+
"""Associate each feed in feeds_on_target.gtfs_dataset_key with a transit service"""
4252
target_dt = dt.datetime.combine(dt.date.fromisoformat(TARGET_DATE), dt.time(0))
4353

4454
services = (tbls.mart_transit_database.dim_gtfs_service_data()
45-
>> filter(_._valid_from <= target_dt, _._valid_to > target_dt)
55+
>> filter(
56+
_._valid_from <= target_dt, _._valid_to > target_dt
57+
)
4658
# >> filter(_.gtfs_dataset_key == 'da7e9e09d3eec6c7686adc21c8b28b63') # test with BCT
4759
# >> filter(_.service_key == '5bc7371dca26d74a99be945b18b3174e')
48-
>> select(_.service_key, _.gtfs_dataset_key)
60+
>> select(_.service_key, _.gtfs_dataset_key, _.customer_facing)
4961
>> collect()
5062
)
5163

52-
feeds_on_target = feeds_on_target >> left_join(_, services, on='gtfs_dataset_key')
53-
return feeds_on_target
64+
feeds_services_merged = feeds_on_target.merge(
65+
services, how="left", on='gtfs_dataset_key', validate="one_to_many"
66+
)
67+
feeds_services_filtered = feeds_services_merged.loc[
68+
feeds_services_merged["customer_facing"] | (feeds_services_merged["regional_feed_type"] == REGIONAL_SUBFEED_NAME)
69+
].copy()
70+
return feeds_services_filtered
5471

55-
def report_undefined(feeds_on_target: pd.DataFrame):
56-
fname = 'no_apparent_service.csv'
72+
def get_undefined_feeds(feeds_on_target: pd.DataFrame) -> pd.DataFrame:
73+
"""Return feeds in feeds_on_target that do not have service and where service is not defined in another feed"""
5774
undefined = feeds_on_target.apply(check_defined_elsewhere, axis=1, args=[feeds_on_target]) >> filter(-_.service_any_feed)
75+
return undefined
76+
77+
def report_unavailable_feeds(feeds: pd.DataFrame, fname: str) -> None:
78+
"""Create a csv report of unavailable or backdated feeds at the paths specified in fname"""
79+
undefined = feeds.loc[
80+
feeds["valid_date_other_than_service_date"] | feeds["no_schedule_feed_found"]
81+
].copy()
5882
if undefined.empty:
5983
print('no undefined service feeds')
6084
else:
61-
print(undefined.columns)
6285
print('these feeds have no service defined on target date, nor are their services captured in other feeds:')
63-
# gtfs_dataset_name no longer present, this whole script should probably be updated/replaced
64-
print(undefined >> select(_.gtfs_dataset_name, _.service_any_feed))
86+
print(undefined.loc[undefined["no_schedule_feed_found"], "gtfs_dataset_name"].drop_duplicates())
87+
print('these feeds have defined service, but only in a feed defined on a prior day')
88+
print(undefined.loc[undefined["valid_date_other_than_service_date"], "gtfs_dataset_name"].drop_duplicates())
6589
print(f'saving detailed csv to {fname}')
66-
undefined.to_csv(fname)
67-
return
90+
undefined.to_csv(fname, index=False)
91+
92+
ISO_DATE_ONLY_FORMAT = "%y-%m-%d"
93+
94+
def get_old_feeds(undefined_feeds_base64_urls: pd.Series, target_date: dt.date | dt.datetime, max_lookback_timedelta: dt.timedelta) -> pd.Series:
95+
"""
96+
Search the warehouse for feeds downloaded within the time before target_date
97+
defined by max_lookback_timedelta that have service as defined in calendar.txt
98+
on target_date. These feeds will not be valid on target_date, but will be accepted by Conveyal.
99+
This should not be used if the feeds are valid on the target_date, since this will provide needlessly
100+
invalid feeds. Note that this does not check calendar_dates.txt at present
101+
102+
Parameters:
103+
undefined_feeds_base64_urls: a Pandas series containing base64 urls to feeds in the warehouse
104+
target_date: a date or datetime where the feeds should be valid based on calendar.txt
105+
max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for
106+
107+
Returns:
108+
A DataFrame with the following index and columns:
109+
index: The base64 url of the feed, will match entries in undefined_feeds_base64_urls
110+
feed_key: A key to dim_schedule_feeds matching the feed on the date it was last valid in the warehouse
111+
date_processed: A datetime date matching the date on which the feed was last valid in the warehosue
112+
"""
113+
base_64_urls_str = "('" + "', '".join(undefined_feeds_base64_urls) + "')"
114+
day_of_the_week = INT_TO_GTFS_WEEKDAY[target_date.weekday()]
115+
max_lookback_date = target_date - max_lookback_timedelta
116+
target_date_iso = target_date.strftime(ISO_DATE_ONLY_FORMAT)
117+
# Query feeds for the newest feed where service is defined on the target_date,
118+
# that have service on the day of the week of the target date, and
119+
# that are valid before (inclusive) the target date and after (inclusive) the max look back date,
120+
query = f"""
121+
SELECT
122+
`mart_gtfs.dim_schedule_feeds`.base64_url AS base64_url,
123+
`mart_gtfs.dim_schedule_feeds`.key as feed_key,
124+
MAX(`mart_gtfs.dim_schedule_feeds`._valid_to) AS valid_feed_date
125+
from `mart_gtfs.dim_schedule_feeds`
126+
LEFT JOIN `mart_gtfs.dim_calendar`
127+
ON `mart_gtfs.dim_schedule_feeds`.key = `mart_gtfs.dim_calendar`.feed_key
128+
WHERE `mart_gtfs.dim_schedule_feeds`.base64_url IN {base_64_urls_str}
129+
AND `mart_gtfs.dim_schedule_feeds`._valid_to >= '{max_lookback_date}'
130+
AND `mart_gtfs.dim_schedule_feeds`._valid_to <= '{target_date}'
131+
AND `mart_gtfs.dim_calendar`.{day_of_the_week} = 1
132+
AND `mart_gtfs.dim_calendar`.start_date <= '{target_date}'
133+
AND `mart_gtfs.dim_calendar`.end_date >= '{target_date}'
134+
GROUP BY
135+
`mart_gtfs.dim_schedule_feeds`.base64_url,
136+
`mart_gtfs.dim_schedule_feeds`.key
137+
LIMIT 1000
138+
"""
139+
response = query_sql(
140+
query
141+
)
142+
response_grouped = response.groupby("base64_url")
143+
feed_info_by_url = response_grouped[["valid_feed_date", "feed_key"]].first()
144+
feed_info_by_url["date_processed"] = feed_info_by_url["valid_feed_date"].dt.date - dt.timedelta(days=1)
145+
# we have the day the feed becomes invalid, so the day we are interested in where the feed *is* valid is the day after
146+
return feed_info_by_url.drop("valid_feed_date", axis=1)
147+
148+
def merge_old_feeds(df_all_feeds: pd.DataFrame, df_undefined_feeds: pd.DataFrame, target_date: dt.date, max_lookback_timedelta: dt.timedelta) -> pd.DataFrame:
149+
"""
150+
Merge feeds from df_all_feeds with old feeds found as a result of calling get_old_feeds with df_undefined_feeds.base64_url
151+
152+
Params:
153+
df_all_feeds: A DataFrame of feeds, must have feed_key, date, and base64_url as columns and must include the base64_urls in df_undefined_feeds
154+
df_undefined_feeds: A DataFrame of feeds that are not valid on target_date, where an old feed should be searched for.
155+
Must have base64_url as a column
156+
target_date: a date or datetime where the feed should be valid based on its target date
157+
max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for
158+
159+
Returns:
160+
A DataFrame identical to df_all_feeds except with the following columns changed or added:
161+
feed_key: Updated for the found feeds
162+
date: Updated for the found feeds:
163+
no_schedule_feed_found: True if a schedule feed was present in df_undefined_feeds but was not associated with an older feed, otherwise false
164+
valid_date_other_than_service_date: True if a new feed was found, otherwise false
165+
"""
166+
feed_search_result = get_old_feeds(
167+
df_undefined_feeds["base64_url"],
168+
target_date,
169+
max_lookback_timedelta
170+
)
171+
feeds_merged = df_all_feeds.merge(
172+
feed_search_result,
173+
how="left",
174+
left_on="base64_url",
175+
right_index=True,
176+
validate="many_to_one"
177+
)
178+
feeds_merged["feed_key"] = feeds_merged["feed_key_y"].fillna(feeds_merged["feed_key_x"])
179+
feeds_merged["no_schedule_feed_found"] = (
180+
(feeds_merged["base64_url"].isin(df_undefined_feeds["base64_url"])) & (~feeds_merged["base64_url"].isin(feed_search_result.index))
181+
).fillna(False)
182+
feeds_merged["date"] = feeds_merged["date_processed"].fillna(target_date)
183+
feeds_merged["valid_date_other_than_service_date"] = feeds_merged["date"] != target_date
184+
185+
return feeds_merged.drop(
186+
["date_processed", "feed_key_x", "feed_key_y"], axis=1
187+
)
68188

69189
if __name__ == '__main__':
70190

71191
feeds_on_target = get_feeds_check_service()
72192
feeds_on_target = attach_transit_services(feeds_on_target)
73193
print(f'feeds on target date shape: {feeds_on_target.shape}')
74-
report_undefined(feeds_on_target)
75-
feeds_on_target.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
194+
undefined_feeds = get_undefined_feeds(feeds_on_target)
195+
feeds_merged = merge_old_feeds(
196+
feeds_on_target, undefined_feeds, dt.date.fromisoformat(TARGET_DATE), conveyal_vars.LOOKBACK_TIME
197+
)
198+
report_unavailable_feeds(feeds_merged, 'no_apparent_service.csv')
199+
feeds_merged.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
76200

Diff for: conveyal_update/match_feeds_regions.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
regions = conveyal_vars.conveyal_regions
1515
TARGET_DATE = conveyal_vars.TARGET_DATE
16-
feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
1716

1817
def create_region_gdf():
1918
# https://shapely.readthedocs.io/en/stable/reference/shapely.box.html#shapely.box
@@ -23,17 +22,28 @@ def create_region_gdf():
2322
df['bbox'] = df.apply(to_bbox, axis=1)
2423
df['geometry'] = df.apply(lambda x: shapely.geometry.box(*x.bbox), axis = 1)
2524
df = df >> select(-_.bbox)
26-
region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers)
25+
region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers_m)
2726
return region_gdf
2827

28+
def get_stops_dates(feeds_on_target: pd.DataFrame, feed_key_column_name: str = "feed_key", date_column_name: str = "date"):
29+
"""Get stops for the feeds in feeds_on_target based on their date"""
30+
all_stops = feeds_on_target.groupby(date_column_name)[feed_key_column_name].apply(
31+
lambda feed_key_column: gtfs_utils_v2.get_stops(
32+
selected_date=feed_key_column.name,
33+
operator_feeds=feed_key_column
34+
)
35+
)
36+
return all_stops
37+
2938
def join_stops_regions(region_gdf: gpd.GeoDataFrame, feeds_on_target: pd.DataFrame):
30-
all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key).to_crs(geography_utils.CA_NAD83Albers)
39+
#all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key)
40+
all_stops = get_stops_dates(feeds_on_target).to_crs(geography_utils.CA_NAD83Albers_m)
3141
region_join = gpd.sjoin(region_gdf, all_stops)
3242
regions_and_feeds = region_join >> distinct(_.region, _.feed_key)
3343
return regions_and_feeds
3444

3545
if __name__ == '__main__':
36-
46+
feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
3747
region_gdf = create_region_gdf()
3848
regions_and_feeds = join_stops_regions(region_gdf, feeds_on_target)
3949
regions_and_feeds = regions_and_feeds >> inner_join(_, feeds_on_target >> select(_.feed_key, _.gtfs_dataset_name, _.base64_url,

0 commit comments

Comments
 (0)