Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new Cal-ITP data import pipeline to the tasks executor, including the import implementation, CKAN query, tests, and a scheduled monthly execution in GCP.
Changes:
- Introduces Cal-ITP import handler + CKAN SQL query for retrieving feed records.
- Registers the new
cal_itp_importtask in the tasks executor and adds unit/e2e tests. - Adds a monthly Cloud Scheduler job to invoke the Cal-ITP import task.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| infra/functions-python/main.tf | Adds a monthly Cloud Scheduler job to call the tasks executor with cal_itp_import. |
| functions-python/tasks_executor/src/main.py | Registers the new cal_itp_import task and handler. |
| functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py | Implements Cal-ITP dataset retrieval, filtering, upsert logic, and orchestration/commit hooks. |
| functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql | Provides the CKAN datastore SQL used to retrieve Cal-ITP feed records. |
| functions-python/tasks_executor/tests/tasks/data_import/test_cal_itp_import.py | Adds helper/unit tests and an end-to-end DB test for the Cal-ITP import flow. |
|
|
||
| import logging | ||
| import os | ||
| import json |
There was a problem hiding this comment.
json is imported but not used anywhere in this module. If unused-import linting is enabled, this will fail; please remove it (or use it if intended).
| import json |
| from tasks.data_import.data_import_utils import ( | ||
| get_or_create_feed, | ||
| get_or_create_entity_type, | ||
| get_license, |
There was a problem hiding this comment.
get_license is imported but only referenced in commented-out code. Remove the unused import until license handling is implemented to avoid unused-import lint failures.
| get_license, |
| self.assertEqual(rt.producer_url, "https://cal-itp.example/tu.pb") | ||
|
|
||
| # RT should be linked to the schedule feed | ||
| rt_sched_ids = [f.id for f in rt.gtfs_feeds] | ||
| self.assertEqual(rt_sched_ids, [sched.id]) |
There was a problem hiding this comment.
The happy-path import test verifies the RT feed URL and schedule linkage, but it doesn’t assert that the RT feed’s entitytypes were populated (e.g. ["tu"] for trip_updates). Adding that assertion (as done in test_tdg_import.py) would catch regressions where entity_type parsing breaks and RT feeds end up with empty entity types.
| district_name = dataset.get("caltrans_district_name", []) or [] | ||
| country_name = "United States" | ||
| state_province = "California" | ||
|
|
||
| locations: List[Any] = [] |
There was a problem hiding this comment.
district_name falls back to [], but create_or_get_location(..., city_name=...) expects a string/None and will fail when building the location id (it joins components). Use None (or an empty string) as the fallback instead of a list, and avoid appending a None location to the list.
| _raw_resources.append({ | ||
| **_common_fields, | ||
| "format": GTFS_REALTIME, | ||
| "entity_type":f"{_rt_type}", |
There was a problem hiding this comment.
RT resources are built with entity_type as a string, but _get_entity_types_from_resource iterates that field assuming it’s a list of strings. Iterating a string yields characters, so entity types end up empty and the RT feed won’t get its entitytypes set correctly. Pass a list (e.g. [_rt_type]) or normalize string→list in _get_entity_types_from_resource.
| "entity_type":f"{_rt_type}", | |
| "entity_type": [_rt_type], |
| service_id = resource.get("service_source_record_id") | ||
| res_format = resource.get("format") | ||
| if res_format == GTFS_SCHEDULE: | ||
| feed_type = 'schedule' | ||
| try: | ||
| res_id = resource.get("schedule_source_record_id") | ||
| res_name = resource.get("schedule_gtfs_dataset_name") | ||
| res_url = resource.get("schedule_dataset_url") | ||
| except Exception as e: | ||
| raise InvalidCalItpFeedError(e) | ||
| elif res_format == GTFS_REALTIME: | ||
| feed_type = next( | ||
| (t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")), | ||
| None, | ||
| ) | ||
| if feed_type is None: | ||
| raise InvalidCalItpFeedError("Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP") | ||
| try: | ||
| res_id = resource.get(f"{feed_type}_source_record_id") | ||
| res_name = resource.get(f"{feed_type}_gtfs_dataset_name") | ||
| res_url = resource.get(f"{feed_type}_dataset_url") | ||
| except Exception as e: | ||
| raise InvalidCalItpFeedError(e) |
There was a problem hiding this comment.
_validate_required_cal_itp_fields doesn’t validate that required values are present/non-empty (and dict.get(...) won’t raise, so the try/except blocks won’t catch missing fields). This can allow service_id, res_id, res_name, or res_url to be None and later produce invalid stable_ids / DB rows. Add explicit checks and raise InvalidCalItpFeedError with a clear message when any required field is missing.
| service_id = resource.get("service_source_record_id") | |
| res_format = resource.get("format") | |
| if res_format == GTFS_SCHEDULE: | |
| feed_type = 'schedule' | |
| try: | |
| res_id = resource.get("schedule_source_record_id") | |
| res_name = resource.get("schedule_gtfs_dataset_name") | |
| res_url = resource.get("schedule_dataset_url") | |
| except Exception as e: | |
| raise InvalidCalItpFeedError(e) | |
| elif res_format == GTFS_REALTIME: | |
| feed_type = next( | |
| (t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")), | |
| None, | |
| ) | |
| if feed_type is None: | |
| raise InvalidCalItpFeedError("Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP") | |
| try: | |
| res_id = resource.get(f"{feed_type}_source_record_id") | |
| res_name = resource.get(f"{feed_type}_gtfs_dataset_name") | |
| res_url = resource.get(f"{feed_type}_dataset_url") | |
| except Exception as e: | |
| raise InvalidCalItpFeedError(e) | |
| def _get_required_field(field_name: str, context: str) -> str: | |
| value = resource.get(field_name) | |
| if value is None or (isinstance(value, str) and not value.strip()): | |
| raise InvalidCalItpFeedError( | |
| f"Cal-ITP resource is missing required field '{field_name}' for {context}" | |
| ) | |
| return value | |
| service_id = _get_required_field("service_source_record_id", "service") | |
| res_format = resource.get("format") | |
| if res_format == GTFS_SCHEDULE: | |
| feed_type = "schedule" | |
| res_id = _get_required_field("schedule_source_record_id", "schedule feed") | |
| res_name = _get_required_field("schedule_gtfs_dataset_name", "schedule feed") | |
| res_url = _get_required_field("schedule_dataset_url", "schedule feed") | |
| elif res_format == GTFS_REALTIME: | |
| feed_type = next( | |
| (t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")), | |
| None, | |
| ) | |
| if feed_type is None: | |
| raise InvalidCalItpFeedError( | |
| "Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP" | |
| ) | |
| res_id = _get_required_field( | |
| f"{feed_type}_source_record_id", f"realtime {feed_type} feed" | |
| ) | |
| res_name = _get_required_field( | |
| f"{feed_type}_gtfs_dataset_name", f"realtime {feed_type} feed" | |
| ) | |
| res_url = _get_required_field( | |
| f"{feed_type}_dataset_url", f"realtime {feed_type} feed" | |
| ) |
| def _process_cal_itp_dataset( | ||
| db_session: Session, | ||
| session_http: requests.Session, | ||
| dataset: dict, | ||
| processed_stable_ids: Optional[set] = None, |
There was a problem hiding this comment.
processed_stable_ids is declared optional (Optional[set] = None) but later used unconditionally via .add(...). Either make it a required argument (no default) or initialize it to set() when None to avoid an AttributeError if _process_cal_itp_dataset is called directly.
| if res_format == GTFS_SCHEDULE: | ||
| # Requirement: if GTFS url returns non zip, skip it | ||
| status_code, content_type, detected_format = _probe_head_format( | ||
| session_http, res_url | ||
| ) |
There was a problem hiding this comment.
The comment says “skip non zip” but the actual skip logic is commented out below, so non-zip schedule URLs will still be imported. Either enforce the requirement (enable the check) or remove/update the comment and dead code to match current behavior.
Summary:
Closes #1642
This pull request introduces support for importing data from Cal-ITP into the system involving an import handler and its tests.
Cal-ITP Import Feature:
functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.pycontains import handler and associated import logicfunctions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sqlis the CKAN API SQL query to retrieve feeds from Cal-ITPfunctions-python/tasks_executor/tests/tasks/data_import/cal_itp/test_cal_itp_import.pycontains the associated unit and e2e testsinfra/functions-python/main.tfincludes the Google Cloud Scheduler job to run the import monthlyfunctions-python/tasks_executor/src/main.pyincludes the handler to the task listOut of scope:
Redirecting MDB feeds to new Cal-ITP: the redirect and csv defining redirect links will be included in follow up PR
Include licensing for Cal-ITP feeds (follow up PR after confirmation with Cal-ITP)