diff --git a/.github/workflows/execution-report-heartbeat.yml b/.github/workflows/execution-report-heartbeat.yml index 0f995a8..57b398b 100644 --- a/.github/workflows/execution-report-heartbeat.yml +++ b/.github/workflows/execution-report-heartbeat.yml @@ -40,6 +40,8 @@ jobs: RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT || 'true' }} RUNTIME_HEARTBEAT_ACCEPT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_ACCEPT_STATUSES }} RUNTIME_HEARTBEAT_REJECT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_REJECT_STATUSES }} + RUNTIME_HEARTBEAT_SCHEDULER_AWARE: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_AWARE || 'true' }} + RUNTIME_HEARTBEAT_SCHEDULER_LOCATION: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_LOCATION || vars.CLOUD_RUN_REGION || 'us-central1' }} CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }} CLOUD_RUN_SERVICES: ${{ vars.CLOUD_RUN_SERVICES }} CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }} diff --git a/scripts/execution_report_heartbeat.py b/scripts/execution_report_heartbeat.py index 377df6c..066166c 100644 --- a/scripts/execution_report_heartbeat.py +++ b/scripts/execution_report_heartbeat.py @@ -12,6 +12,7 @@ import urllib.parse import urllib.request from typing import Any +from zoneinfo import ZoneInfo DEFAULT_ACCEPT_STATUSES = {"ok", "skipped", "success", "completed", "no_action"} @@ -90,10 +91,10 @@ def _base_report_uris() -> list[str]: return unique -def _load_required_services() -> list[str]: +def _load_required_service_candidates() -> tuple[list[str], bool]: explicit_services = _split_values(os.environ.get("RUNTIME_HEARTBEAT_REQUIRED_SERVICES")) if explicit_services: - return _unique_values(explicit_services) + return _unique_values(explicit_services), True services = [] for name in ( @@ -127,7 +128,36 @@ def _load_required_services() -> list[str]: except json.JSONDecodeError: pass - return _unique_values(services) + return _unique_values(services), False + + +def _load_required_services( + *, + project: str | None = None, + since: dt.datetime | None = None, + now: dt.datetime | None = None, +) -> list[str]: + services, explicit = _load_required_service_candidates() + if explicit or not services: + return services + if not _env_bool("RUNTIME_HEARTBEAT_SCHEDULER_AWARE", True): + return services + if since is None or now is None: + return services + try: + return _filter_scheduler_due_services( + services, + project=project, + since=since, + now=now, + ) + except RuntimeError as exc: + print( + f"Unable to resolve Cloud Scheduler-backed heartbeat services: {exc}; " + "falling back to all configured services.", + file=sys.stderr, + ) + return services def _unique_values(values: list[str]) -> list[str]: @@ -140,6 +170,176 @@ def _unique_values(values: list[str]) -> list[str]: return unique +def _scheduler_location() -> str: + return ( + os.environ.get("RUNTIME_HEARTBEAT_SCHEDULER_LOCATION") + or os.environ.get("CLOUD_RUN_REGION") + or "us-central1" + ) + + +def _list_scheduler_jobs(*, project: str | None) -> list[dict[str, Any]]: + command = [ + "gcloud", + "scheduler", + "jobs", + "list", + "--location", + _scheduler_location(), + "--format=json", + ] + if project: + command.extend(["--project", project]) + result = _run_gcloud(command) + if result.returncode != 0: + detail = (result.stderr or result.stdout or "").strip() + raise RuntimeError(detail or "gcloud scheduler jobs list failed") + if not result.stdout.strip(): + return [] + try: + payload = json.loads(result.stdout) + except json.JSONDecodeError as exc: + raise RuntimeError(f"gcloud scheduler jobs list returned invalid JSON: {exc}") from exc + return payload if isinstance(payload, list) else [] + + +def _scheduler_job_targets_strategy_run(job: dict[str, Any], service: str) -> bool: + if str(job.get("state") or "").strip().upper() not in {"", "ENABLED"}: + return False + uri = str((job.get("httpTarget") or {}).get("uri") or "").strip() + if not uri: + return False + parsed = urllib.parse.urlparse(uri) + path = parsed.path or "/" + if path != "/": + return False + service_text = str(service or "").strip().lower() + return bool(service_text and service_text in parsed.netloc.lower()) + + +def _cron_token_value(token: str, *, names: dict[str, int] | None = None) -> int: + normalized = token.strip().lower() + if names and normalized in names: + return names[normalized] + return int(normalized) + + +def _cron_field_values( + field: str, + *, + minimum: int, + maximum: int, + names: dict[str, int] | None = None, +) -> set[int] | None: + text = str(field or "").strip().lower() + if text in {"", "*"}: + return None + values: set[int] = set() + for raw_part in text.split(","): + part = raw_part.strip() + if not part: + continue + base, raw_step = part, "1" + if "/" in part: + base, raw_step = part.split("/", 1) + step = max(1, int(raw_step)) + if base == "*": + start, end = minimum, maximum + elif "-" in base: + raw_start, raw_end = base.split("-", 1) + start = _cron_token_value(raw_start, names=names) + end = _cron_token_value(raw_end, names=names) + else: + start = end = _cron_token_value(base, names=names) + for value in range(start, end + 1, step): + if minimum <= value <= maximum: + values.add(value) + elif maximum == 6 and value == 7: + values.add(0) + return values + + +def _cron_matches(schedule: str, value: dt.datetime) -> bool: + fields = str(schedule or "").split() + if len(fields) != 5: + return False + minute, hour, day_of_month, month, day_of_week = fields + dow_names = { + "sun": 0, + "mon": 1, + "tue": 2, + "wed": 3, + "thu": 4, + "fri": 5, + "sat": 6, + } + minute_values = _cron_field_values(minute, minimum=0, maximum=59) + hour_values = _cron_field_values(hour, minimum=0, maximum=23) + dom_values = _cron_field_values(day_of_month, minimum=1, maximum=31) + month_values = _cron_field_values(month, minimum=1, maximum=12) + dow_values = _cron_field_values(day_of_week, minimum=0, maximum=6, names=dow_names) + if minute_values is not None and value.minute not in minute_values: + return False + if hour_values is not None and value.hour not in hour_values: + return False + if month_values is not None and value.month not in month_values: + return False + + dom_matches = dom_values is None or value.day in dom_values + cron_weekday = value.isoweekday() % 7 + dow_matches = dow_values is None or cron_weekday in dow_values + if dom_values is not None and dow_values is not None: + return dom_matches or dow_matches + return dom_matches and dow_matches + + +def _scheduler_job_due_between( + job: dict[str, Any], + *, + since: dt.datetime, + now: dt.datetime, +) -> bool: + schedule = str(job.get("schedule") or "").strip() + if not schedule: + return False + try: + timezone = ZoneInfo(str(job.get("timeZone") or "UTC")) + except Exception: # noqa: BLE001 + timezone = dt.timezone.utc + + since_utc = since.astimezone(dt.timezone.utc) + now_utc = now.astimezone(dt.timezone.utc) + cursor = since_utc.replace(second=0, microsecond=0) + if cursor < since_utc: + cursor += dt.timedelta(minutes=1) + while cursor <= now_utc: + if _cron_matches(schedule, cursor.astimezone(timezone)): + return True + cursor += dt.timedelta(minutes=1) + return False + + +def _filter_scheduler_due_services( + services: list[str], + *, + project: str | None, + since: dt.datetime, + now: dt.datetime, +) -> list[str]: + jobs = _list_scheduler_jobs(project=project) + due_services = [] + for service in services: + service_jobs = [ + job for job in jobs if _scheduler_job_targets_strategy_run(job, service) + ] + if not service_jobs or any( + _scheduler_job_due_between(job, since=since, now=now) + for job in service_jobs + ): + due_services.append(service) + return due_services + + def _report_globs(since: dt.datetime, now: dt.datetime) -> list[str]: explicit = _split_values(os.environ.get("RUNTIME_HEARTBEAT_GCS_GLOBS")) if explicit: @@ -332,10 +532,10 @@ def main() -> int: lookback_hours = float(os.environ.get("RUNTIME_HEARTBEAT_LOOKBACK_HOURS") or "36") max_reports = int(os.environ.get("RUNTIME_HEARTBEAT_MAX_REPORTS_TO_READ") or "20") fail_workflow = _env_bool("RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT", True) - required_services = _load_required_services() now = dt.datetime.now(dt.timezone.utc) since = now - dt.timedelta(hours=lookback_hours) + required_services = _load_required_services(project=project, since=since, now=now) globs = _report_globs(since, now) if not globs: raise SystemExit("No heartbeat GCS report URI configured") diff --git a/tests/test_execution_report_heartbeat.py b/tests/test_execution_report_heartbeat.py index 046b5c8..d2ed260 100644 --- a/tests/test_execution_report_heartbeat.py +++ b/tests/test_execution_report_heartbeat.py @@ -1,5 +1,6 @@ from __future__ import annotations +import datetime as dt import json from scripts import execution_report_heartbeat as heartbeat @@ -38,3 +39,78 @@ def test_required_services_fall_back_to_cloud_run_targets(monkeypatch): ) assert heartbeat._load_required_services() == ["svc-a", "svc-b"] + + +def test_scheduler_aware_required_services_only_include_due_main_schedulers(monkeypatch): + monkeypatch.delenv("RUNTIME_HEARTBEAT_REQUIRED_SERVICES", raising=False) + monkeypatch.setenv( + "CLOUD_RUN_SERVICE_TARGETS_JSON", + json.dumps( + { + "targets": [ + {"service": "svc-daily"}, + {"service": "svc-monthly"}, + ] + } + ), + ) + monkeypatch.setattr( + heartbeat, + "_list_scheduler_jobs", + lambda **_kwargs: [ + { + "state": "ENABLED", + "schedule": "45 15 * * 1-5", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-daily.example.run.app/"}, + }, + { + "state": "ENABLED", + "schedule": "45 15 26 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-monthly.example.run.app/"}, + }, + { + "state": "ENABLED", + "schedule": "35 9,15 25-30 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-monthly.example.run.app/probe"}, + }, + ], + ) + + required = heartbeat._load_required_services( + project="project-1", + since=dt.datetime(2026, 6, 5, 0, 0, tzinfo=dt.timezone.utc), + now=dt.datetime(2026, 6, 6, 2, 0, tzinfo=dt.timezone.utc), + ) + + assert required == ["svc-daily"] + + +def test_scheduler_aware_required_services_include_monthly_service_when_due(monkeypatch): + monkeypatch.delenv("RUNTIME_HEARTBEAT_REQUIRED_SERVICES", raising=False) + monkeypatch.setenv( + "CLOUD_RUN_SERVICE_TARGETS_JSON", + json.dumps({"targets": [{"service": "svc-monthly"}]}), + ) + monkeypatch.setattr( + heartbeat, + "_list_scheduler_jobs", + lambda **_kwargs: [ + { + "state": "ENABLED", + "schedule": "45 15 26 * *", + "timeZone": "America/New_York", + "httpTarget": {"uri": "https://svc-monthly.example.run.app/"}, + }, + ], + ) + + required = heartbeat._load_required_services( + project="project-1", + since=dt.datetime(2026, 6, 26, 19, 0, tzinfo=dt.timezone.utc), + now=dt.datetime(2026, 6, 26, 20, 0, tzinfo=dt.timezone.utc), + ) + + assert required == ["svc-monthly"]