diff --git a/NEWS.md b/NEWS.md index 48c91ea4..2e2dba99 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**04/23/2026:** Added `waterdata.get_nearest_continuous(targets, ...)` — for each of N target timestamps, fetches the single continuous observation closest to that timestamp in one HTTP round-trip (auto-chunked when the resulting CQL filter is long, via the facility added in #238). The helper is designed for workflows that pair many discrete-measurement timestamps with surrounding instantaneous data, which the OGC `time` parameter can't express since it only accepts one instant or one interval per request. Ties at window midpoints are resolved per a configurable `on_tie` ∈ {`"first"`, `"last"`, `"mean"`}; the default `window="PT7M30S"` matches a 15-minute continuous gauge. + **04/22/2026:** Highlights since the `v1.1.0` release (2025-11-26), which shipped the `waterdata` module: - Added `get_channel` for channel-measurement data (#218) and `get_stats_por` / `get_stats_date_range` for period-of-record and daily statistics (#207). diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 9bd80c91..cacf8f14 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -19,6 +19,7 @@ get_latest_continuous, get_latest_daily, get_monitoring_locations, + get_nearest_continuous, get_reference_table, get_samples, get_stats_date_range, @@ -47,6 +48,7 @@ "get_latest_continuous", "get_latest_daily", "get_monitoring_locations", + "get_nearest_continuous", "get_reference_table", "get_samples", "get_stats_date_range", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index f9a66b31..370d610c 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -9,7 +9,7 @@ import json import logging from io import StringIO -from typing import get_args +from typing import Literal, get_args import pandas as pd import requests @@ -440,6 +440,239 @@ def get_continuous( return get_ogc_data(args, output_id, service) +def get_nearest_continuous( + targets, + monitoring_location_id: str | list[str] | None = None, + parameter_code: str | list[str] | None = None, + *, + window: str | pd.Timedelta = "PT7M30S", + on_tie: Literal["first", "last", "mean"] = "first", + **kwargs, +) -> tuple[pd.DataFrame, BaseMetadata]: + """For each target timestamp, return the nearest continuous observation. + + Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause + per target, joins them as a top-level CQL ``OR`` filter, and lets + ``get_continuous`` (with its auto-chunking) fetch every observation + that falls in any window. Then, per ``(monitoring_location_id, target)`` + pair, picks the single observation with the smallest ``|time - target|``. + + The USGS continuous endpoint matches ``time`` parameters exactly rather + than fuzzily, and it does not implement ``sortby`` for arbitrary fields; + this function is the single-round-trip way to ask "what reading is + nearest this timestamp?" for many timestamps at once. + + Parameters + ---------- + targets : list-like of datetime-convertible + Target timestamps. Naive datetimes are treated as UTC. Accepts a + list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``, + or anything ``pandas.to_datetime`` consumes. + monitoring_location_id : string or list of strings, optional + Forwarded to ``get_continuous``. + parameter_code : string or list of strings, optional + Forwarded to ``get_continuous``. + window : string or ``pandas.Timedelta``, default ``"PT7M30S"`` + Half-window around each target, as an ISO 8601 duration + (``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts + any other form ``pandas.Timedelta`` parses — ``HH:MM:SS`` + (``"00:07:30"``), pandas shorthand (``"7min30s"``, + ``"450s"``), or a ``pd.Timedelta`` directly. See the + `pandas.Timedelta docs + `_ + for the full grammar. + + Must be small enough that every target's window captures + roughly one observation at the service cadence. The default + matches a 15-minute continuous gauge; widen (e.g. + ``"PT15M"``) for irregular cadences or resilience to data + gaps. + on_tie : {"first", "last", "mean"}, default ``"first"`` + How to resolve ties when two observations are exactly equidistant + from a target (which happens when the target falls at the midpoint + between grid points — e.g. target ``10:22:30`` for a 15-minute + gauge). + + - ``"first"``: keep the earlier observation. + - ``"last"``: keep the later observation. + - ``"mean"``: average numeric columns; set the ``time`` column to + the target, since no real observation exists at the midpoint. + + **kwargs + Additional keyword arguments forwarded to ``get_continuous`` + (e.g. ``statistic_id``, ``approval_status``, ``properties``). + Passing ``time``, ``filter``, or ``filter_lang`` raises + ``TypeError`` — this function builds those itself. + + Returns + ------- + df : ``pandas.DataFrame`` + One row per ``(target, monitoring_location_id)`` combination that + had at least one observation in its window. Rows are augmented + with a ``target_time`` column indicating which target they + correspond to. Targets with no observations in their window are + silently dropped. + md : :class:`~dataretrieval.utils.BaseMetadata` + Metadata from the underlying ``get_continuous`` call. + + Notes + ----- + *Window sizing and ties.* When ``window`` is exactly half the service + cadence, most targets' windows contain a single observation and + ``on_tie`` is moot. Ties arise only when a target sits exactly at the + window edge — rare in practice but possible. Setting ``window`` to a + full cadence (or larger) guarantees at least one observation per + target in steady state at the cost of more bytes per response. + + *Why windowed CQL rather than sort+limit.* The API's advertised + ``sortby`` parameter would make this a one-liner per target (``filter`` + by ``time <= t`` and ``limit 1``), but it is per-query — you would need + one HTTP round-trip per target. The CQL ``OR``-chain approach folds + all N targets into one request (auto-chunked when the URL is long). + + Examples + -------- + .. code:: + + >>> import pandas as pd + >>> from dataretrieval import waterdata + + >>> # Pair three off-grid timestamps with nearby observations + >>> targets = pd.to_datetime( + ... [ + ... "2023-06-15T10:30:31Z", + ... "2023-06-15T14:07:12Z", + ... "2023-06-16T03:45:19Z", + ... ] + ... ) + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... ) + + >>> # Widen the window for an irregular-cadence gauge + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... window="PT30M", + ... on_tie="mean", + ... ) + """ + _check_nearest_kwargs(kwargs, on_tie) + targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True)) + window_td = pd.Timedelta(window) + + if len(targets) == 0: + raise ValueError("targets must contain at least one timestamp") + + filter_expr = _build_window_or_filter(targets, window_td) + df, md = get_continuous( + monitoring_location_id=monitoring_location_id, + parameter_code=parameter_code, + filter=filter_expr, + filter_lang="cql-text", + **kwargs, + ) + if df.empty: + return _empty_nearest_result(df), md + + df = df.assign(time=pd.to_datetime(df["time"], utc=True)) + site_groups = ( + df.groupby("monitoring_location_id", sort=False) + if "monitoring_location_id" in df.columns + else [(None, df)] + ) + + selected = [ + row + for _, site_df in site_groups + for target in targets + if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None + ] + if not selected: + return _empty_nearest_result(df), md + return pd.DataFrame(selected).reset_index(drop=True), md + + +_VALID_ON_TIE = ("first", "last", "mean") + + +def _check_nearest_kwargs(kwargs: dict, on_tie: str) -> None: + """Reject kwargs the helper owns; validate ``on_tie``.""" + for forbidden in ("time", "filter", "filter_lang"): + if forbidden in kwargs: + raise TypeError( + f"get_nearest_continuous constructs its own {forbidden!r}; " + "do not pass it directly" + ) + if on_tie not in _VALID_ON_TIE: + raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}") + + +def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str: + """Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows. + + ``get_continuous`` auto-chunks the result if the full URL would + exceed the server's length limit, so this is always safe to build + as one string even for many targets. + """ + return " OR ".join( + f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' " + f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')" + for t in targets + ) + + +def _pick_nearest_row( + site_df: pd.DataFrame, + target: pd.Timestamp, + window_td: pd.Timedelta, + on_tie: str, +) -> pd.Series | None: + """Return the single row within ``window_td`` of ``target``, or ``None``. + + Resolves ties (two rows equidistant from ``target``) per ``on_tie``. + The returned row carries a ``target_time`` column identifying which + target it was selected for. + """ + in_window = site_df[ + (site_df["time"] >= target - window_td) + & (site_df["time"] <= target + window_td) + ] + if in_window.empty: + return None + deltas = (in_window["time"] - target).abs() + candidates = in_window[deltas == deltas.min()].sort_values("time") + + if len(candidates) == 1 or on_tie == "first": + row = candidates.iloc[0].copy() + elif on_tie == "last": + row = candidates.iloc[-1].copy() + else: # "mean" — synthesize a row whose numeric cols are averaged and + # whose ``time`` is the target (no real observation sits at the midpoint). + row = candidates.iloc[0].copy() + for col in candidates.select_dtypes("number").columns: + row[col] = candidates[col].mean() + row["time"] = target + + row["target_time"] = target + return row + + +def _empty_nearest_result(template: pd.DataFrame | None = None) -> pd.DataFrame: + """Empty frame with a ``target_time`` column, for no-match cases. + + When ``template`` is provided, preserve its columns/dtypes so the + returned frame matches the shape of a real ``get_continuous`` + response. + """ + base = pd.DataFrame() if template is None else template.iloc[0:0].copy() + base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") + return base + + def get_monitoring_locations( monitoring_location_id: list[str] | None = None, agency_code: list[str] | None = None, diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py new file mode 100644 index 00000000..9873d821 --- /dev/null +++ b/tests/waterdata_nearest_test.py @@ -0,0 +1,267 @@ +"""Tests for ``waterdata.get_nearest_continuous``. + +All network interaction is mocked at the ``get_continuous`` boundary, so +these run without an API key and without touching the USGS servers. +""" + +from unittest import mock + +import pandas as pd +import pytest + +from dataretrieval.waterdata.api import get_nearest_continuous + + +def _fake_df(rows): + """Build a minimal continuous-response-shaped DataFrame.""" + return pd.DataFrame( + { + "time": pd.to_datetime([r["time"] for r in rows], utc=True), + "value": [r["value"] for r in rows], + "monitoring_location_id": [r.get("site", "USGS-02238500") for r in rows], + } + ) + + +@pytest.fixture +def patch_get_continuous(): + """Replace ``waterdata.api.get_continuous`` with a controllable stub.""" + with mock.patch("dataretrieval.waterdata.api.get_continuous") as m: + yield m + + +def test_returns_nearest_per_target(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z", "2023-06-15T10:45:16Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + {"time": "2023-06-15T10:45:00Z", "value": 22.5}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + parameter_code="00060", + ) + assert len(result) == 2 + assert list(result["value"]) == [22.4, 22.5] + assert list(result["target_time"]) == list(targets) + + +def test_builds_one_or_clause_per_target(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:00Z", "2023-06-16T12:00:00Z"], utc=True) + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + parameter_code="00060", + window="PT7M30S", + ) + _, kwargs = patch_get_continuous.call_args + filter_expr = kwargs["filter"] + assert kwargs["filter_lang"] == "cql-text" + # Two windows — one top-level OR separator + assert filter_expr.count(") OR (") == 1 + # Each target produces >= and <= bounds + assert filter_expr.count("time >= '") == 2 + assert filter_expr.count("time <= '") == 2 + # Lower bound of the first window is 7:30 before the target + assert "'2023-06-15T10:22:30Z'" in filter_expr + assert "'2023-06-15T10:37:30Z'" in filter_expr + + +def test_tie_first_keeps_earlier(patch_get_continuous): + # Target at the midpoint between two grid points + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="first", + window="PT7M30S", + ) + assert len(result) == 1 + assert result.iloc[0]["value"] == 22.0 + assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:15:00Z") + + +def test_tie_last_keeps_later(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="last", + window="PT7M30S", + ) + assert result.iloc[0]["value"] == 22.4 + assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:30:00Z") + + +def test_tie_mean_averages_numeric_and_uses_target_time(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="mean", + window="PT7M30S", + ) + assert result.iloc[0]["value"] == pytest.approx(22.2) + # Time is set to the target since no real observation sits at the midpoint + assert result.iloc[0]["time"] == targets[0] + + +def test_target_without_observations_is_dropped(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z", "2023-07-15T10:30:31Z"], utc=True) + # Only the June target has nearby data; July returns nothing. + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous(targets, monitoring_location_id="USGS-02238500") + assert len(result) == 1 + assert result.iloc[0]["target_time"] == targets[0] + + +def test_multi_site_returns_row_per_target_per_site(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:30:00Z", "value": 22.4, "site": "USGS-1"}, + {"time": "2023-06-15T10:30:00Z", "value": 99.9, "site": "USGS-2"}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id=["USGS-1", "USGS-2"], + parameter_code="00060", + ) + assert len(result) == 2 + assert set(result["monitoring_location_id"]) == {"USGS-1", "USGS-2"} + + +def test_empty_targets_raises(patch_get_continuous): + """An empty ``targets`` is a call with no useful work to do and + almost always a caller bug — raise rather than silently issuing a + no-op HTTP request.""" + with pytest.raises(ValueError, match="targets"): + get_nearest_continuous([], monitoring_location_id="USGS-02238500") + patch_get_continuous.assert_not_called() + + +def test_rejects_time_kwarg(patch_get_continuous): + with pytest.raises(TypeError, match="time"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + time="2023-06-01/2023-07-01", + ) + + +def test_rejects_filter_kwarg(patch_get_continuous): + with pytest.raises(TypeError, match="filter"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + filter="x = 1", + ) + + +def test_rejects_invalid_on_tie(patch_get_continuous): + with pytest.raises(ValueError, match="on_tie"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + on_tie="random", + ) + + +def test_accepts_naive_datetimes_as_utc(patch_get_continuous): + """Naive inputs must be treated as UTC (matching pandas default).""" + naive = [pd.Timestamp("2023-06-15T10:30:00")] + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous(naive, monitoring_location_id="USGS-02238500") + assert len(result) == 1 + + +def test_accepts_list_of_strings(patch_get_continuous): + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + ["2023-06-15T10:30:31Z"], monitoring_location_id="USGS-02238500" + ) + assert len(result) == 1 + + +@pytest.mark.parametrize( + "window", + [ + "00:07:30", # HH:MM:SS + "7min30s", # pandas shorthand + "450s", # seconds shorthand + "PT7M30S", # ISO 8601 duration + pd.Timedelta(minutes=7, seconds=30), # Timedelta object + ], +) +def test_window_accepts_any_pandas_timedelta_form(patch_get_continuous, window): + """Every representation ``pandas.Timedelta`` parses must produce the + same CQL filter. Documents the public contract: ``window`` is + whatever ``pd.Timedelta(window)`` returns.""" + targets = pd.to_datetime(["2023-06-15T10:30:00Z"], utc=True) + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + + get_nearest_continuous(targets, monitoring_location_id="USGS-1", window=window) + filter_expr = patch_get_continuous.call_args.kwargs["filter"] + # Bounds are 7:30 away from the target regardless of input spelling + assert "'2023-06-15T10:22:30Z'" in filter_expr + assert "'2023-06-15T10:37:30Z'" in filter_expr + + +def test_forwards_kwargs_to_get_continuous(patch_get_continuous): + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + parameter_code="00060", + statistic_id="00011", + approval_status="Approved", + ) + _, kwargs = patch_get_continuous.call_args + assert kwargs["statistic_id"] == "00011" + assert kwargs["approval_status"] == "Approved"