diff --git a/docs/CICs/UNFAOPostProcessorManager.md b/docs/CICs/UNFAOPostProcessorManager.md index 568bbaa..76a7cda 100644 --- a/docs/CICs/UNFAOPostProcessorManager.md +++ b/docs/CICs/UNFAOPostProcessorManager.md @@ -156,7 +156,7 @@ The input-integrity guards (S0–S6, epic #51) are representation-free invariant - Partner-specific output formats are **evolving** — the UN FAO schema may change (see C-24, D-06 for schema divergence investigation) - The source of forecast data (Appwrite bucket/collection) is **evolving** — operational configuration - Null validation is **active** (C-01 resolved 2026-06-02) -- The enrichment source is the **precomputed GAUL lookup table** (`GaulLookupEnricher`, ADR-011), as of the Stage 3 swap; the runtime mapper (`mapping.py`) remains in the repo but is no longer used by this manager and is slated for removal after one verified production cycle +- The enrichment source is the **precomputed GAUL lookup table** (`GaulLookupEnricher`, ADR-011), as of the Stage 3 swap; the old runtime mapper was **removed** (C-39 / PR #42) — it no longer exists in the repo --- diff --git a/views_postprocessing/unfao/managers/README.md b/views_postprocessing/unfao/managers/README.md index 59156ad..1812df1 100644 --- a/views_postprocessing/unfao/managers/README.md +++ b/views_postprocessing/unfao/managers/README.md @@ -1,387 +1,65 @@ # UNFAOPostProcessorManager -A postprocessor manager for transforming ViEWS pipeline predictions into UN FAO-compatible formats with geographic metadata enrichment. +The manager for the UN FAO delivery: it reads finished VIEWS forecasts + historical +actuals, enriches them with GAUL geographic metadata, enforces input-integrity invariants, +and uploads the result to the FAO Appwrite store. -## Overview +> For the big picture — how this repo relates to pipeline-core / faoapi / datafactory and +> where its seams are — read [`docs/architecture/role_and_seams.md`](../../../docs/architecture/role_and_seams.md). +> For the class contract (guarantees, failure modes), see the +> [CIC](../../../docs/CICs/UNFAOPostProcessorManager.md). This file is the operational summary. -The `UNFAOPostProcessorManager` is a specialized postprocessor that prepares VIEWS conflict prediction data for delivery to the United Nations Food and Agriculture Organization (UN FAO). It handles: +## What it is -- Reading historical observation data from Viewser -- Downloading forecast predictions from the Appwrite datastore -- Enriching data with geographic metadata (coordinates, country codes, admin boundaries) -- Validating output schema compliance -- Uploading processed data to the UN FAO Appwrite storage bucket +`UNFAOPostProcessorManager` is a **concrete pipeline-core postprocessor** — it subclasses +`PostprocessorManager` + `ForecastingModelManager` (Template Method) and fills the +`read → transform → validate → save` lifecycle. It is a thin orchestrator: the +representation-free input-integrity rules live in `views_postprocessing/delivery/` and are +**called** by the manager (via the `unfao/extraction.py` seam), never inherited into it. -## Architecture +It does **not** transform prediction values (no collapse, no reconciliation — those are +downstream). It joins metadata, guards integrity, and delivers. -``` -┌─────────────────────────────────────────────────────────────────┐ -│ UNFAOPostProcessorManager │ -│ (extends PostprocessorManager + ForecastingModelManager) │ -├─────────────────────────────────────────────────────────────────┤ -│ │ -│ ┌──────────────────┐ ┌──────────────────┐ │ -│ │ Historical Data │ │ Forecast Data │ │ -│ │ (Viewser) │ │ (Appwrite) │ │ -│ └────────┬─────────┘ └────────┬─────────┘ │ -│ │ │ │ -│ └───────────┬───────────┘ │ -│ ▼ │ -│ ┌───────────────────────┐ │ -│ │ GaulLookupEnricher │ │ -│ │ (Geographic Metadata) │ │ -│ └───────────┬───────────┘ │ -│ ▼ │ -│ ┌───────────────────────┐ │ -│ │ Validation Layer │ │ -│ └───────────┬───────────┘ │ -│ ▼ │ -│ ┌───────────────────────┐ │ -│ │ UN FAO Appwrite │ │ -│ │ (Output Storage) │ │ -│ └───────────────────────┘ │ -└─────────────────────────────────────────────────────────────────┘ -``` - -## Data Flow - -1. **Read Phase** (`_read`) - - Fetches historical data from ViewsER via `ViewsDataLoader` - - Downloads latest forecast predictions from Appwrite production forecasts bucket - -2. **Transform Phase** (`_transform`) - - Enriches both historical and forecast dataframes with geographic metadata - - Uses `GaulLookupEnricher` to merge a precomputed GAUL lookup onto PRIO-GRID cells (ADR-011) - -3. **Validate Phase** (`_validate`) - - Ensures all required metadata columns are present - - Checks for schema compliance before output - -4. **Save Phase** (`_save`) - - Saves processed data to local parquet files - - Uploads to UN FAO-specific Appwrite bucket with metadata - -## Installation - -Part of `views-postprocessing` package: - -```bash -pip install views-postprocessing -``` - -## Configuration - -### Required Environment Variables +## Stages -The manager requires several environment variables for Appwrite connectivity. These should be set in the `.env` file found at the root of views-models: +| Stage | Method(s) | What happens | +|-------|-----------|--------------| +| **Read** | `_read_historical_data`, `_read_forecast_data` | Historical actuals via the inherited `ViewsDataLoader`; the forecast file from the Appwrite prediction store. The selected forecast file's **identity** (name/loa) is asserted before download (`delivery.identity`, C-25). | +| **Transform** | `_transform` → `_append_metadata` | Joins the 9 GAUL metadata columns via `GaulLookupEnricher` (a precomputed parquet lookup, ADR-011). | +| **Validate** | `_validate`, `_check_coverage` | Null-gate on the 9 metadata columns; region **coverage** + GAUL-**excluded-cell** guards (`delivery.coverage`, C-34 / C-30). | +| **Clip** | `_clip_observed_history` | Drops fabricated zero-padded tail months from the *historical* actuals (`delivery.observed_range`, C-26); the forecast is untouched. The boundary (`last_valid_month_id`) is read from the producer (datafactory) via `unfao/source_metadata.py`. | +| **Save** | `_save` | Writes timestamped parquet and uploads to the FAO bucket, stamping each upload with **structured provenance** (`delivery.provenance`, C-15). | -```bash -# Appwrite Connection -APPWRITE_ENDPOINT=https://cloud.appwrite.io/v1 -APPWRITE_DATASTORE_PROJECT_ID=your_project_id -APPWRITE_DATASTORE_API_KEY=your_api_key +The 9 metadata columns are the single-source contract in `unfao/gaul_schema.py` +(`METADATA_COLS`). -# Production Forecasts Bucket (Input) -APPWRITE_PROD_FORECASTS_BUCKET_ID=production_forecasts -APPWRITE_PROD_FORECASTS_BUCKET_NAME=Production Forecasts -APPWRITE_PROD_FORECASTS_COLLECTION_ID=forecasts_metadata -APPWRITE_PROD_FORECASTS_COLLECTION_NAME=Forecasts Metadata - -# UN FAO Bucket (Output) -APPWRITE_UNFAO_BUCKET_ID=unfao_data -APPWRITE_UNFAO_BUCKET_NAME=UN FAO Data -APPWRITE_UNFAO_COLLECTION_ID=unfao_metadata -APPWRITE_UNFAO_COLLECTION_NAME=UN FAO Metadata - -# Metadata Database -APPWRITE_METADATA_DATABASE_ID=file_metadata -APPWRITE_METADATA_DATABASE_NAME=File Metadata -``` - -## Usage - -### Basic Usage +## Running it ```python from views_pipeline_core.managers.postprocessor import PostprocessorPathManager from views_postprocessing.unfao.managers import UNFAOPostProcessorManager -# Initialize with path manager -path_manager = PostprocessorPathManager("un_fao") -manager = UNFAOPostProcessorManager( - model_path=path_manager, - wandb_notifications=True, - use_prediction_store=False -) - -# Execute the full pipeline -manager.execute() -``` - -### CLI Execution - -From the postprocessor directory: - -```bash -python main.py -``` - -### Step-by-Step Execution - -```python -# Initialize -manager = UNFAOPostProcessorManager(path_manager) - -# Read data -manager._read() # Loads historical + forecast data - -# Transform (add geographic metadata) -manager._transform() - -# Validate schema -manager._validate() - -# Save and upload -manager._save() -``` - -## Class Reference - -### UNFAOPostProcessorManager - -```python -class UNFAOPostProcessorManager(PostprocessorManager, ForecastingModelManager): - def __init__( - self, - model_path: PostprocessorPathManager, - wandb_notifications: bool = True, - use_prediction_store: bool = False, - ) -> None -``` - -#### Parameters - -| Parameter | Type | Default | Description | -|-----------|------|---------|-------------| -| `model_path` | PostprocessorPathManager | Required | Path manager for the postprocessor | -| `wandb_notifications` | bool | True | Enable Weights & Biases logging | -| `use_prediction_store` | bool | False | Whether to use prediction store (legacy) | - -#### Attributes - -| Attribute | Type | Description | -|-----------|------|-------------| -| `_historical_dataframe` | pd.DataFrame | Historical observation data from ViewsER | -| `_forecast_dataframe` | pd.DataFrame | Forecast predictions from ensemble | -| `_historical_dataset` | PGMDataset | Wrapped historical data with utilities | -| `_forecast_dataset` | PGMDataset | Wrapped forecast data with utilities | -| `_enricher` | GaulLookupEnricher | Precomputed-lookup geographic enrichment (ADR-011) | -| `ensemble_path_manager` | EnsemblePathManager | Path manager for source ensemble | - -## Methods - -### _read_historical_data() - -Fetches historical observation data from ViewsER. - -```python -manager._read_historical_data() -# Populates: _historical_dataframe, _historical_dataset -``` - -**Process:** -1. Uses `ViewsDataLoader` to fetch data for "forecasting" partition -2. Reads saved dataframe from `data_raw` directory -3. Creates `PGMDataset` wrapper with configured targets - -### _read_forecast_data() - -Downloads latest forecast predictions from Appwrite. - -```python -manager._read_forecast_data() -# Populates: _forecast_dataframe, _forecast_dataset +manager = UNFAOPostProcessorManager(model_path=PostprocessorPathManager("un_fao")) +manager.execute() # read → transform → validate → save ``` -**Process:** -1. Reads ensemble name from config -2. Initializes `EnsemblePathManager` for the source ensemble -3. Configures Appwrite connection from ensemble's `.env` -4. Downloads latest forecast file with `category="forecast"` filter -5. Creates `PGMDataset` wrapper +In production this is invoked by **views-models** (`postprocessors/un_fao/main.py`), not +directly. The manager cannot be instantiated without `views-pipeline-core` + Appwrite env, +so its stage logic is covered by replica tests (`tests/test_validation.py`, +`tests/test_append_metadata.py`) and the input-integrity e2e suite +(`tests/test_input_integrity_e2e.py`) — see the CIC §10. -**Raises:** -- `ValueError`: If ensemble name not configured -- `Exception`: If download fails - -### _append_metadata(dataset) - -Enriches a dataset with geographic metadata. - -```python -enriched_df = manager._append_metadata(dataset) -``` - -**Parameters:** -- `dataset`: PGMDataset to enrich - -**Returns:** DataFrame with added metadata columns - -**Added Columns:** -| Column | Description | -|--------|-------------| -| `pg_xcoord` | PRIO-GRID cell X coordinate (longitude) | -| `pg_ycoord` | PRIO-GRID cell Y coordinate (latitude) | -| `country_iso_a3` | ISO 3166-1 alpha-3 country code | -| `admin1_gaul1_code` | GAUL level 1 admin code | -| `admin1_gaul1_name` | GAUL level 1 admin name | -| `admin1_gaul0_code` | Parent country GAUL code | -| `admin1_gaul0_name` | Parent country name | -| `admin2_gaul2_code` | GAUL level 2 admin code | -| `admin2_gaul2_name` | GAUL level 2 admin name | - -### _transform() - -Applies geographic metadata to both dataframes. - -```python -manager._transform() -# Updates: _historical_dataframe, _forecast_dataframe -``` - -### _validate() - -Validates that required metadata columns are present. - -```python -manager._validate() -``` - -**Required Columns:** -- `pg_xcoord`, `pg_ycoord` -- `country_iso_a3` -- `admin1_gaul1_code`, `admin1_gaul1_name` -- `admin1_gaul0_code`, `admin1_gaul0_name` -- `admin2_gaul2_code`, `admin2_gaul2_name` - -**Raises:** -- `ValueError`: If any required column is missing - -### _save() - -Saves and uploads processed data to UN FAO Appwrite bucket. - -```python -manager._save() -``` - -**Output Files:** -- `historical_dataset_{timestamp}.parquet` - Historical data with metadata -- `forecast_dataset_{timestamp}.parquet` - Forecast data with metadata - -**Upload Metadata:** -| Field | Historical | Forecast | -|-------|-----------|----------| -| `name` | Postprocessor name | Ensemble name | -| `loa` | "pgm" | "pgm" | -| `type` | "model" | "model" | -| `category` | "historical" | "forecast" | -| `targets` | From config | Prediction columns | - -## Output Schema Example - -The processed dataframes have the following structure: - -``` -Index: (month_id, priogrid_gid) - -Columns: -├── Target Variables -│ ├── ged_sb_dep (or configured targets) -│ ├── ged_ns_dep -│ └── ged_os_dep -│ -├── Predictions (forecast only) -│ ├── pred_ln_sb_best -│ ├── pred_ln_ns_best -│ ├── pred_ln_os_best -│ ├── pred_ln_sb_prob -│ ├── pred_ln_ns_prob -│ └── pred_ln_os_prob -│ -└── Geographic Metadata - ├── pg_xcoord - ├── pg_ycoord - ├── country_iso_a3 - ├── admin1_gaul1_code - ├── admin1_gaul1_name - ├── admin1_gaul0_code - ├── admin1_gaul0_name - ├── admin2_gaul2_code - └── admin2_gaul2_name -``` - -## Error Handling - -### Common Errors - -**Appwrite Connection Failed:** -``` -Error while trying to download the latest forecast data... -``` -*Solution:* Verify environment variables are set correctly in the ensemble's `.env` file - -**Missing Metadata Columns:** -``` -ValueError: Historical dataframe is missing required metadata column: country_iso_a3 -``` -*Solution:* Ensure the `GaulLookupEnricher`'s lookup table (`views_postprocessing/data/gaul_lookup.parquet`) is present and covers the requested cells - -## Dependencies - -- `views-pipeline-core`: Core pipeline infrastructure -- `views-postprocessing`: Postprocessor base classes and mapping utilities -- `pandas`: DataFrame operations -- `polars`: Alternative DataFrame operations -- `python-dotenv`: Environment variable management - -## Example Workflow - -```python -import logging -from views_pipeline_core.managers.postprocessor import PostprocessorPathManager -from views_postprocessing.unfao.managers import UNFAOPostProcessorManager - -# Setup logging -logging.basicConfig(level=logging.INFO) - -# Initialize -path_manager = PostprocessorPathManager("un_fao") -manager = UNFAOPostProcessorManager(path_manager) +## Configuration -# Full execution -try: - # 1. Load data - manager._read() - print(f"Historical shape: {manager._historical_dataframe.shape}") - print(f"Forecast shape: {manager._forecast_dataframe.shape}") - - # 2. Add geographic metadata - manager._transform() - print(f"Added columns: {manager._historical_dataframe.columns.tolist()}") - - # 3. Validate output - manager._validate() - print("Validation passed!") - - # 4. Save and upload - manager._save() - print("Data uploaded to UN FAO bucket") - -except Exception as e: - print(f"Pipeline failed: {e}") -``` +Appwrite settings are read from the environment via `os.getenv` (no startup validation yet — +tracked in #11). The full variable set is documented in the repo +[README](../../../README.md#configuration); note `APPWRITE_PROD_FORECASTS_COLLECTION_ID` is +flagged there as needs-verify (the previously-documented value was found absent in live +Appwrite). -## See Also +## See also -- [GaulLookupEnricher](../../../docs/CICs/GaulLookupEnricher.md) - Precomputed-lookup enrichment (ADR-011) \ No newline at end of file +- [`role_and_seams.md`](../../../docs/architecture/role_and_seams.md) — role + seams +- [CIC: UNFAOPostProcessorManager](../../../docs/CICs/UNFAOPostProcessorManager.md) — class contract +- [CIC: GaulLookupEnricher](../../../docs/CICs/GaulLookupEnricher.md) — the enrichment join +- ADR-011 (mapper → lookup), ADR-012 (current ontology)