Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.d/3440.changed.md
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
Dual-write live simulation and report create/update traffic into the new run tables, keep parent run pointers in sync, and harden report mutations to remain country-scoped and transactionally consistent.

Preserve explicit report definitions and execution metadata across later syncs, key new report creation and alias validation by canonical report identity, and resolve report reads through canonical parents plus display-run selection instead of recreating current-version parent rows.
2 changes: 2 additions & 0 deletions policyengine_api/data/initialise.sql
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ CREATE TABLE IF NOT EXISTS report_outputs (
report_spec_json JSON DEFAULT NULL,
report_spec_schema_version INT DEFAULT NULL,
report_spec_status VARCHAR(32) DEFAULT NULL,
report_identity_hash VARCHAR(64) DEFAULT NULL,
report_identity_schema_version INT DEFAULT NULL,
active_run_id CHAR(36) DEFAULT NULL,
latest_successful_run_id CHAR(36) DEFAULT NULL
);
Expand Down
2 changes: 2 additions & 0 deletions policyengine_api/data/initialise_local.sql
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ CREATE TABLE IF NOT EXISTS report_outputs (
report_spec_json JSON DEFAULT NULL,
report_spec_schema_version INT DEFAULT NULL,
report_spec_status VARCHAR(32) DEFAULT NULL,
report_identity_hash VARCHAR(64) DEFAULT NULL,
report_identity_schema_version INT DEFAULT NULL,
active_run_id CHAR(36) DEFAULT NULL,
latest_successful_run_id CHAR(36) DEFAULT NULL
);
Expand Down
50 changes: 49 additions & 1 deletion policyengine_api/routes/report_output_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@

report_output_bp = Blueprint("report_output", __name__)
report_output_service = ReportOutputService()
RUN_METADATA_FIELDS = (
"country_package_version",
"policyengine_version",
"data_version",
"runtime_app_name",
"resolved_dataset",
)


def _parse_report_run_metadata(payload: dict) -> dict[str, str | None]:
metadata: dict[str, str | None] = {}
for field_name in RUN_METADATA_FIELDS:
if field_name not in payload:
continue
value = payload.get(field_name)
if value is not None and not isinstance(value, str):
raise BadRequest(f"{field_name} must be a string or null")
metadata[field_name] = value
return metadata


@report_output_bp.route("/<country_id>/report", methods=["POST"])
Expand All @@ -33,6 +52,8 @@ def create_report_output(country_id: str) -> Response:
simulation_1_id = payload.get("simulation_1_id")
simulation_2_id = payload.get("simulation_2_id") # Optional
year = payload.get("year", CURRENT_YEAR) # Default to current year as string
report_spec_payload = payload.get("report_spec")
report_spec_schema_version = payload.get("report_spec_schema_version")

# Validate required fields
if simulation_1_id is None:
Expand All @@ -43,21 +64,44 @@ def create_report_output(country_id: str) -> Response:
raise BadRequest("simulation_2_id must be an integer or null")
if not isinstance(year, str):
raise BadRequest("year must be a string")
if report_spec_payload is not None and not isinstance(report_spec_payload, dict):
raise BadRequest("report_spec must be an object")
if report_spec_schema_version is not None and not isinstance(
report_spec_schema_version, int
):
raise BadRequest("report_spec_schema_version must be an integer")

report_spec = None
if report_spec_payload is not None:
try:
report_spec = report_output_service.parse_report_spec_payload(
report_spec_payload,
(
report_spec_schema_version
if report_spec_schema_version is not None
else 1
),
)
except ValueError as exc:
raise BadRequest(str(exc)) from exc

try:
# Check if report already exists with these simulation IDs and year
existing_report = report_output_service.find_existing_report_output(
existing_report = report_output_service.find_existing_report_output_for_create(
country_id=country_id,
simulation_1_id=simulation_1_id,
simulation_2_id=simulation_2_id,
year=year,
report_spec=report_spec,
)

if existing_report:
existing_report = (
report_output_service.ensure_report_output_dual_write_state(
existing_report["id"],
country_id=country_id,
explicit_report_spec=report_spec,
report_spec_schema_version=report_spec_schema_version,
)
)
# Report already exists, return it with 200 status
Expand All @@ -79,6 +123,8 @@ def create_report_output(country_id: str) -> Response:
simulation_1_id=simulation_1_id,
simulation_2_id=simulation_2_id,
year=year,
report_spec=report_spec,
report_spec_schema_version=report_spec_schema_version,
)

response_body = dict(
Expand Down Expand Up @@ -156,6 +202,7 @@ def update_report_output(country_id: str) -> Response:
report_id = payload.get("id")
output = payload.get("output")
error_message = payload.get("error_message")
version_manifest_overrides = _parse_report_run_metadata(payload)
print(f"Updating report #{report_id} for country {country_id}")

# Validate status if provided
Expand All @@ -181,6 +228,7 @@ def update_report_output(country_id: str) -> Response:
status=status,
output=output,
error_message=error_message,
version_manifest_overrides=version_manifest_overrides,
)

if not success:
Expand Down
20 changes: 20 additions & 0 deletions policyengine_api/routes/simulation_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@

simulation_bp = Blueprint("simulation", __name__)
simulation_service = SimulationService()
RUN_METADATA_FIELDS = (
"country_package_version",
"policyengine_version",
"data_version",
"runtime_app_name",
)


def _parse_simulation_run_metadata(payload: dict) -> dict[str, str | None]:
metadata: dict[str, str | None] = {}
for field_name in RUN_METADATA_FIELDS:
if field_name not in payload:
continue
value = payload.get(field_name)
if value is not None and not isinstance(value, str):
raise BadRequest(f"{field_name} must be a string or null")
metadata[field_name] = value
return metadata


@simulation_bp.route("/<country_id>/simulation", methods=["POST"])
Expand Down Expand Up @@ -161,6 +179,7 @@ def update_simulation(country_id: str) -> Response:
simulation_id = payload.get("id")
output = payload.get("output")
error_message = payload.get("error_message")
version_manifest_overrides = _parse_simulation_run_metadata(payload)
print(f"Updating simulation #{simulation_id} for country {country_id}")

# Validate status if provided
Expand All @@ -186,6 +205,7 @@ def update_simulation(country_id: str) -> Response:
status=status,
output=output,
error_message=error_message,
version_manifest_overrides=version_manifest_overrides,
)

if not success:
Expand Down
53 changes: 44 additions & 9 deletions policyengine_api/services/report_output_alias_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,53 @@ class ReportOutputAliasService:
def _get_report_output_row(self, report_output_id: int) -> dict | None:
row: Row | None = database.query(
"""
SELECT id, country_id, simulation_1_id, simulation_2_id, year
SELECT id, country_id, report_identity_hash, report_identity_schema_version
FROM report_outputs
WHERE id = ?
""",
(report_output_id,),
).fetchone()
return dict(row) if row is not None else None

def _validate_alias_identity_compatibility(
self,
legacy_report_output: dict,
canonical_report_output: dict,
) -> None:
if legacy_report_output["country_id"] != canonical_report_output["country_id"]:
raise ValueError(
"Legacy and canonical report outputs must describe the same report"
)

if (
legacy_report_output["report_identity_hash"] is None
or legacy_report_output["report_identity_schema_version"] is None
):
raise ValueError(
"Legacy report output must have canonical report identity before "
"aliasing"
)

if (
canonical_report_output["report_identity_hash"] is None
or canonical_report_output["report_identity_schema_version"] is None
):
raise ValueError(
"Canonical report output must have canonical report identity before "
"aliasing"
)

if (
legacy_report_output["report_identity_hash"]
!= canonical_report_output["report_identity_hash"]
or legacy_report_output["report_identity_schema_version"]
!= canonical_report_output["report_identity_schema_version"]
):
raise ValueError(
"Legacy and canonical report outputs must share canonical report "
"identity"
)

def get_alias(self, legacy_report_output_id: int) -> dict | None:
row: Row | None = database.query(
"""
Expand Down Expand Up @@ -78,14 +117,10 @@ def set_alias(
f"#{existing_alias['canonical_report_output_id']}"
)

logical_key = ("country_id", "simulation_1_id", "simulation_2_id", "year")
if any(
legacy_report_output[field] != canonical_report_output[field]
for field in logical_key
):
raise ValueError(
"Legacy and canonical report outputs must describe the same report"
)
self._validate_alias_identity_compatibility(
legacy_report_output,
canonical_report_output,
)
database.query(
"""
INSERT INTO legacy_report_output_aliases
Expand Down
Loading