Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/execution-report-heartbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ jobs:
RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT: ${{ inputs.fail_workflow_on_alert || vars.RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT || 'true' }}
RUNTIME_HEARTBEAT_ACCEPT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_ACCEPT_STATUSES }}
RUNTIME_HEARTBEAT_REJECT_STATUSES: ${{ vars.RUNTIME_HEARTBEAT_REJECT_STATUSES }}
RUNTIME_HEARTBEAT_SCHEDULER_AWARE: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_AWARE || 'true' }}
RUNTIME_HEARTBEAT_SCHEDULER_LOCATION: ${{ vars.RUNTIME_HEARTBEAT_SCHEDULER_LOCATION || vars.CLOUD_RUN_REGION || 'us-central1' }}
CLOUD_RUN_SERVICE: ${{ vars.CLOUD_RUN_SERVICE }}
CLOUD_RUN_SERVICES: ${{ vars.CLOUD_RUN_SERVICES }}
CLOUD_RUN_SERVICE_TARGETS_JSON: ${{ vars.CLOUD_RUN_SERVICE_TARGETS_JSON }}
Expand Down
208 changes: 204 additions & 4 deletions scripts/execution_report_heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import urllib.parse
import urllib.request
from typing import Any
from zoneinfo import ZoneInfo


DEFAULT_ACCEPT_STATUSES = {"ok", "skipped", "success", "completed", "no_action"}
Expand Down Expand Up @@ -90,10 +91,10 @@ def _base_report_uris() -> list[str]:
return unique


def _load_required_services() -> list[str]:
def _load_required_service_candidates() -> tuple[list[str], bool]:
explicit_services = _split_values(os.environ.get("RUNTIME_HEARTBEAT_REQUIRED_SERVICES"))
if explicit_services:
return _unique_values(explicit_services)
return _unique_values(explicit_services), True

services = []
for name in (
Expand Down Expand Up @@ -127,7 +128,36 @@ def _load_required_services() -> list[str]:
except json.JSONDecodeError:
pass

return _unique_values(services)
return _unique_values(services), False


def _load_required_services(
*,
project: str | None = None,
since: dt.datetime | None = None,
now: dt.datetime | None = None,
) -> list[str]:
services, explicit = _load_required_service_candidates()
if explicit or not services:
return services
if not _env_bool("RUNTIME_HEARTBEAT_SCHEDULER_AWARE", True):
return services
if since is None or now is None:
return services
try:
return _filter_scheduler_due_services(
services,
project=project,
since=since,
now=now,
)
except RuntimeError as exc:
print(
f"Unable to resolve Cloud Scheduler-backed heartbeat services: {exc}; "
"falling back to all configured services.",
file=sys.stderr,
)
return services


def _unique_values(values: list[str]) -> list[str]:
Expand All @@ -140,6 +170,176 @@ def _unique_values(values: list[str]) -> list[str]:
return unique


def _scheduler_location() -> str:
return (
os.environ.get("RUNTIME_HEARTBEAT_SCHEDULER_LOCATION")
or os.environ.get("CLOUD_RUN_REGION")
or "us-central1"
)


def _list_scheduler_jobs(*, project: str | None) -> list[dict[str, Any]]:
command = [
"gcloud",
"scheduler",
"jobs",
"list",
"--location",
_scheduler_location(),
"--format=json",
]
if project:
command.extend(["--project", project])
result = _run_gcloud(command)
if result.returncode != 0:
detail = (result.stderr or result.stdout or "").strip()
raise RuntimeError(detail or "gcloud scheduler jobs list failed")
if not result.stdout.strip():
return []
try:
payload = json.loads(result.stdout)
except json.JSONDecodeError as exc:
raise RuntimeError(f"gcloud scheduler jobs list returned invalid JSON: {exc}") from exc
return payload if isinstance(payload, list) else []


def _scheduler_job_targets_strategy_run(job: dict[str, Any], service: str) -> bool:
if str(job.get("state") or "").strip().upper() not in {"", "ENABLED"}:
return False
uri = str((job.get("httpTarget") or {}).get("uri") or "").strip()
if not uri:
return False
parsed = urllib.parse.urlparse(uri)
path = parsed.path or "/"
if path != "/":
return False
service_text = str(service or "").strip().lower()
return bool(service_text and service_text in parsed.netloc.lower())


def _cron_token_value(token: str, *, names: dict[str, int] | None = None) -> int:
normalized = token.strip().lower()
if names and normalized in names:
return names[normalized]
return int(normalized)


def _cron_field_values(
field: str,
*,
minimum: int,
maximum: int,
names: dict[str, int] | None = None,
) -> set[int] | None:
text = str(field or "").strip().lower()
if text in {"", "*"}:
return None
values: set[int] = set()
for raw_part in text.split(","):
part = raw_part.strip()
if not part:
continue
base, raw_step = part, "1"
if "/" in part:
base, raw_step = part.split("/", 1)
step = max(1, int(raw_step))
if base == "*":
start, end = minimum, maximum
elif "-" in base:
raw_start, raw_end = base.split("-", 1)
start = _cron_token_value(raw_start, names=names)
end = _cron_token_value(raw_end, names=names)
else:
start = end = _cron_token_value(base, names=names)
for value in range(start, end + 1, step):
if minimum <= value <= maximum:
values.add(value)
elif maximum == 6 and value == 7:
values.add(0)
return values


def _cron_matches(schedule: str, value: dt.datetime) -> bool:
fields = str(schedule or "").split()
if len(fields) != 5:
return False
minute, hour, day_of_month, month, day_of_week = fields
dow_names = {
"sun": 0,
"mon": 1,
"tue": 2,
"wed": 3,
"thu": 4,
"fri": 5,
"sat": 6,
}
minute_values = _cron_field_values(minute, minimum=0, maximum=59)
hour_values = _cron_field_values(hour, minimum=0, maximum=23)
dom_values = _cron_field_values(day_of_month, minimum=1, maximum=31)
month_values = _cron_field_values(month, minimum=1, maximum=12)
dow_values = _cron_field_values(day_of_week, minimum=0, maximum=6, names=dow_names)
if minute_values is not None and value.minute not in minute_values:
return False
if hour_values is not None and value.hour not in hour_values:
return False
if month_values is not None and value.month not in month_values:
return False

dom_matches = dom_values is None or value.day in dom_values
cron_weekday = value.isoweekday() % 7
dow_matches = dow_values is None or cron_weekday in dow_values
if dom_values is not None and dow_values is not None:
return dom_matches or dow_matches
return dom_matches and dow_matches


def _scheduler_job_due_between(
job: dict[str, Any],
*,
since: dt.datetime,
now: dt.datetime,
) -> bool:
schedule = str(job.get("schedule") or "").strip()
if not schedule:
return False
try:
timezone = ZoneInfo(str(job.get("timeZone") or "UTC"))
except Exception: # noqa: BLE001
timezone = dt.timezone.utc

since_utc = since.astimezone(dt.timezone.utc)
now_utc = now.astimezone(dt.timezone.utc)
cursor = since_utc.replace(second=0, microsecond=0)
if cursor < since_utc:
cursor += dt.timedelta(minutes=1)
while cursor <= now_utc:
if _cron_matches(schedule, cursor.astimezone(timezone)):
return True
cursor += dt.timedelta(minutes=1)
return False


def _filter_scheduler_due_services(
services: list[str],
*,
project: str | None,
since: dt.datetime,
now: dt.datetime,
) -> list[str]:
jobs = _list_scheduler_jobs(project=project)
due_services = []
for service in services:
service_jobs = [
job for job in jobs if _scheduler_job_targets_strategy_run(job, service)
]
if not service_jobs or any(
_scheduler_job_due_between(job, since=since, now=now)
for job in service_jobs
):
due_services.append(service)
return due_services


def _report_globs(since: dt.datetime, now: dt.datetime) -> list[str]:
explicit = _split_values(os.environ.get("RUNTIME_HEARTBEAT_GCS_GLOBS"))
if explicit:
Expand Down Expand Up @@ -332,10 +532,10 @@ def main() -> int:
lookback_hours = float(os.environ.get("RUNTIME_HEARTBEAT_LOOKBACK_HOURS") or "36")
max_reports = int(os.environ.get("RUNTIME_HEARTBEAT_MAX_REPORTS_TO_READ") or "20")
fail_workflow = _env_bool("RUNTIME_HEARTBEAT_FAIL_WORKFLOW_ON_ALERT", True)
required_services = _load_required_services()

now = dt.datetime.now(dt.timezone.utc)
since = now - dt.timedelta(hours=lookback_hours)
required_services = _load_required_services(project=project, since=since, now=now)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Handle empty due service sets as success

When scheduler-aware filtering determines that none of the configured services has a main scheduler due in the lookback window (for example a repo with only monthly schedulers on an ordinary weekday, or a skipped holiday window), this assignment leaves required_services as an empty list. The rest of main() then treats that the same as “no service filter configured” and still scans for any acceptable recent report, so it can emit a missing-report alert even though no scheduler was expected to produce one. Preserve the distinction between “no services configured” and “configured services, but none due” and short-circuit the latter as OK.

Useful? React with 👍 / 👎.

globs = _report_globs(since, now)
if not globs:
raise SystemExit("No heartbeat GCS report URI configured")
Expand Down
76 changes: 76 additions & 0 deletions tests/test_execution_report_heartbeat.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import datetime as dt
import json

from scripts import execution_report_heartbeat as heartbeat
Expand Down Expand Up @@ -38,3 +39,78 @@ def test_required_services_fall_back_to_cloud_run_targets(monkeypatch):
)

assert heartbeat._load_required_services() == ["svc-a", "svc-b"]


def test_scheduler_aware_required_services_only_include_due_main_schedulers(monkeypatch):
monkeypatch.delenv("RUNTIME_HEARTBEAT_REQUIRED_SERVICES", raising=False)
monkeypatch.setenv(
"CLOUD_RUN_SERVICE_TARGETS_JSON",
json.dumps(
{
"targets": [
{"service": "svc-daily"},
{"service": "svc-monthly"},
]
}
),
)
monkeypatch.setattr(
heartbeat,
"_list_scheduler_jobs",
lambda **_kwargs: [
{
"state": "ENABLED",
"schedule": "45 15 * * 1-5",
"timeZone": "America/New_York",
"httpTarget": {"uri": "https://svc-daily.example.run.app/"},
},
{
"state": "ENABLED",
"schedule": "45 15 26 * *",
"timeZone": "America/New_York",
"httpTarget": {"uri": "https://svc-monthly.example.run.app/"},
},
{
"state": "ENABLED",
"schedule": "35 9,15 25-30 * *",
"timeZone": "America/New_York",
"httpTarget": {"uri": "https://svc-monthly.example.run.app/probe"},
},
],
)

required = heartbeat._load_required_services(
project="project-1",
since=dt.datetime(2026, 6, 5, 0, 0, tzinfo=dt.timezone.utc),
now=dt.datetime(2026, 6, 6, 2, 0, tzinfo=dt.timezone.utc),
)

assert required == ["svc-daily"]


def test_scheduler_aware_required_services_include_monthly_service_when_due(monkeypatch):
monkeypatch.delenv("RUNTIME_HEARTBEAT_REQUIRED_SERVICES", raising=False)
monkeypatch.setenv(
"CLOUD_RUN_SERVICE_TARGETS_JSON",
json.dumps({"targets": [{"service": "svc-monthly"}]}),
)
monkeypatch.setattr(
heartbeat,
"_list_scheduler_jobs",
lambda **_kwargs: [
{
"state": "ENABLED",
"schedule": "45 15 26 * *",
"timeZone": "America/New_York",
"httpTarget": {"uri": "https://svc-monthly.example.run.app/"},
},
],
)

required = heartbeat._load_required_services(
project="project-1",
since=dt.datetime(2026, 6, 26, 19, 0, tzinfo=dt.timezone.utc),
now=dt.datetime(2026, 6, 26, 20, 0, tzinfo=dt.timezone.utc),
)

assert required == ["svc-monthly"]