Skip to content

Feat/cal itp import#1670

Open
ianktc wants to merge 4 commits intomainfrom
feat/cal-itp-import
Open

Feat/cal itp import#1670
ianktc wants to merge 4 commits intomainfrom
feat/cal-itp-import

Conversation

@ianktc
Copy link
Copy Markdown
Contributor

@ianktc ianktc commented Apr 24, 2026

Summary:

Closes #1642

This pull request introduces support for importing data from Cal-ITP into the system involving an import handler and its tests.

Cal-ITP Import Feature:

functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py contains import handler and associated import logic
functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql is the CKAN API SQL query to retrieve feeds from Cal-ITP
functions-python/tasks_executor/tests/tasks/data_import/cal_itp/test_cal_itp_import.py contains the associated unit and e2e tests
infra/functions-python/main.tf includes the Google Cloud Scheduler job to run the import monthly
functions-python/tasks_executor/src/main.py includes the handler to the task list

Out of scope:
Redirecting MDB feeds to new Cal-ITP: the redirect and csv defining redirect links will be included in follow up PR
Include licensing for Cal-ITP feeds (follow up PR after confirmation with Cal-ITP)

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new Cal-ITP data import pipeline to the tasks executor, including the import implementation, CKAN query, tests, and a scheduled monthly execution in GCP.

Changes:

  • Introduces Cal-ITP import handler + CKAN SQL query for retrieving feed records.
  • Registers the new cal_itp_import task in the tasks executor and adds unit/e2e tests.
  • Adds a monthly Cloud Scheduler job to invoke the Cal-ITP import task.

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
infra/functions-python/main.tf Adds a monthly Cloud Scheduler job to call the tasks executor with cal_itp_import.
functions-python/tasks_executor/src/main.py Registers the new cal_itp_import task and handler.
functions-python/tasks_executor/src/tasks/data_import/cal_itp/import_cal_itp_feeds.py Implements Cal-ITP dataset retrieval, filtering, upsert logic, and orchestration/commit hooks.
functions-python/tasks_executor/src/tasks/data_import/cal_itp/ckan_query.sql Provides the CKAN datastore SQL used to retrieve Cal-ITP feed records.
functions-python/tasks_executor/tests/tasks/data_import/test_cal_itp_import.py Adds helper/unit tests and an end-to-end DB test for the Cal-ITP import flow.


import logging
import os
import json
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

json is imported but not used anywhere in this module. If unused-import linting is enabled, this will fail; please remove it (or use it if intended).

Suggested change
import json

Copilot uses AI. Check for mistakes.
from tasks.data_import.data_import_utils import (
get_or_create_feed,
get_or_create_entity_type,
get_license,
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_license is imported but only referenced in commented-out code. Remove the unused import until license handling is implemented to avoid unused-import lint failures.

Suggested change
get_license,

Copilot uses AI. Check for mistakes.
Comment on lines +547 to +551
self.assertEqual(rt.producer_url, "https://cal-itp.example/tu.pb")

# RT should be linked to the schedule feed
rt_sched_ids = [f.id for f in rt.gtfs_feeds]
self.assertEqual(rt_sched_ids, [sched.id])
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The happy-path import test verifies the RT feed URL and schedule linkage, but it doesn’t assert that the RT feed’s entitytypes were populated (e.g. ["tu"] for trip_updates). Adding that assertion (as done in test_tdg_import.py) would catch regressions where entity_type parsing breaks and RT feeds end up with empty entity types.

Copilot generated this review using guidance from repository custom instructions.
Comment on lines +314 to +318
district_name = dataset.get("caltrans_district_name", []) or []
country_name = "United States"
state_province = "California"

locations: List[Any] = []
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

district_name falls back to [], but create_or_get_location(..., city_name=...) expects a string/None and will fail when building the location id (it joins components). Use None (or an empty string) as the fallback instead of a list, and avoid appending a None location to the list.

Copilot uses AI. Check for mistakes.
_raw_resources.append({
**_common_fields,
"format": GTFS_REALTIME,
"entity_type":f"{_rt_type}",
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RT resources are built with entity_type as a string, but _get_entity_types_from_resource iterates that field assuming it’s a list of strings. Iterating a string yields characters, so entity types end up empty and the RT feed won’t get its entitytypes set correctly. Pass a list (e.g. [_rt_type]) or normalize string→list in _get_entity_types_from_resource.

Suggested change
"entity_type":f"{_rt_type}",
"entity_type": [_rt_type],

Copilot uses AI. Check for mistakes.
Comment on lines +347 to +369
service_id = resource.get("service_source_record_id")
res_format = resource.get("format")
if res_format == GTFS_SCHEDULE:
feed_type = 'schedule'
try:
res_id = resource.get("schedule_source_record_id")
res_name = resource.get("schedule_gtfs_dataset_name")
res_url = resource.get("schedule_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
elif res_format == GTFS_REALTIME:
feed_type = next(
(t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")),
None,
)
if feed_type is None:
raise InvalidCalItpFeedError("Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP")
try:
res_id = resource.get(f"{feed_type}_source_record_id")
res_name = resource.get(f"{feed_type}_gtfs_dataset_name")
res_url = resource.get(f"{feed_type}_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_validate_required_cal_itp_fields doesn’t validate that required values are present/non-empty (and dict.get(...) won’t raise, so the try/except blocks won’t catch missing fields). This can allow service_id, res_id, res_name, or res_url to be None and later produce invalid stable_ids / DB rows. Add explicit checks and raise InvalidCalItpFeedError with a clear message when any required field is missing.

Suggested change
service_id = resource.get("service_source_record_id")
res_format = resource.get("format")
if res_format == GTFS_SCHEDULE:
feed_type = 'schedule'
try:
res_id = resource.get("schedule_source_record_id")
res_name = resource.get("schedule_gtfs_dataset_name")
res_url = resource.get("schedule_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
elif res_format == GTFS_REALTIME:
feed_type = next(
(t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")),
None,
)
if feed_type is None:
raise InvalidCalItpFeedError("Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP")
try:
res_id = resource.get(f"{feed_type}_source_record_id")
res_name = resource.get(f"{feed_type}_gtfs_dataset_name")
res_url = resource.get(f"{feed_type}_dataset_url")
except Exception as e:
raise InvalidCalItpFeedError(e)
def _get_required_field(field_name: str, context: str) -> str:
value = resource.get(field_name)
if value is None or (isinstance(value, str) and not value.strip()):
raise InvalidCalItpFeedError(
f"Cal-ITP resource is missing required field '{field_name}' for {context}"
)
return value
service_id = _get_required_field("service_source_record_id", "service")
res_format = resource.get("format")
if res_format == GTFS_SCHEDULE:
feed_type = "schedule"
res_id = _get_required_field("schedule_source_record_id", "schedule feed")
res_name = _get_required_field("schedule_gtfs_dataset_name", "schedule feed")
res_url = _get_required_field("schedule_dataset_url", "schedule feed")
elif res_format == GTFS_REALTIME:
feed_type = next(
(t for t in ENTITY_TYPES_MAP if resource.get(f"{t}_gtfs_dataset_name")),
None,
)
if feed_type is None:
raise InvalidCalItpFeedError(
"Cal-ITP RT resource has no recognised type in ENTITY_TYPES_MAP"
)
res_id = _get_required_field(
f"{feed_type}_source_record_id", f"realtime {feed_type} feed"
)
res_name = _get_required_field(
f"{feed_type}_gtfs_dataset_name", f"realtime {feed_type} feed"
)
res_url = _get_required_field(
f"{feed_type}_dataset_url", f"realtime {feed_type} feed"
)

Copilot uses AI. Check for mistakes.
Comment on lines +579 to +583
def _process_cal_itp_dataset(
db_session: Session,
session_http: requests.Session,
dataset: dict,
processed_stable_ids: Optional[set] = None,
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processed_stable_ids is declared optional (Optional[set] = None) but later used unconditionally via .add(...). Either make it a required argument (no default) or initialize it to set() when None to avoid an AttributeError if _process_cal_itp_dataset is called directly.

Copilot uses AI. Check for mistakes.
Comment on lines +679 to +683
if res_format == GTFS_SCHEDULE:
# Requirement: if GTFS url returns non zip, skip it
status_code, content_type, detected_format = _probe_head_format(
session_http, res_url
)
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says “skip non zip” but the actual skip logic is commented out below, so non-zip schedule URLs will still be imported. Either enforce the requirement (enable the check) or remove/update the comment and dead code to match current behavior.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Define the sync logic between CAOD and MobilityDatabase

2 participants