From 4abe9762efbc0275060c67dc8c2168fe2de9e255 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 09:47:44 -0500 Subject: [PATCH 1/5] Add waterdata.get_nearest_continuous helper For each target timestamp, fetch the nearest continuous observation in a single round trip. Builds a CQL OR-chain of per-target bracketed windows, pipes it through ``get_continuous`` (which already auto-chunks long filters across multiple sub-requests), then selects the single observation closest to each target client-side. Exists because the Water Data API matches a single-instant ``time=`` parameter exactly (10:30:31 returns zero rows on a 15-minute gauge), does not implement ``sortby`` for arbitrary queryables, and does not expose a ``T_NEAREST`` CQL function. The narrow-window + client-side reduction is the one pattern that works today for multi-target nearest lookups in one API call. Tie handling is configurable via ``on_tie``: - "first" (default): keep the earlier observation - "last": keep the later observation - "mean": average numeric columns; set ``time`` to the target Default ``window="7min30s"`` matches the 15-minute gauge cadence so most targets' windows contain exactly one observation. Users with irregular-cadence gauges or known data gaps can widen to "15min" or "30min" at the cost of more bytes per response. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/__init__.py | 2 + dataretrieval/waterdata/api.py | 195 +++++++++++++++++++++- tests/waterdata_nearest_test.py | 246 ++++++++++++++++++++++++++++ 3 files changed, 442 insertions(+), 1 deletion(-) create mode 100644 tests/waterdata_nearest_test.py diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index 9bd80c91..cacf8f14 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -19,6 +19,7 @@ get_latest_continuous, get_latest_daily, get_monitoring_locations, + get_nearest_continuous, get_reference_table, get_samples, get_stats_date_range, @@ -47,6 +48,7 @@ "get_latest_continuous", "get_latest_daily", "get_monitoring_locations", + "get_nearest_continuous", "get_reference_table", "get_samples", "get_stats_date_range", diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index f9a66b31..b65d6b86 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -9,7 +9,7 @@ import json import logging from io import StringIO -from typing import get_args +from typing import Literal, get_args import pandas as pd import requests @@ -440,6 +440,199 @@ def get_continuous( return get_ogc_data(args, output_id, service) +def get_nearest_continuous( + targets, + monitoring_location_id: str | list[str] | None = None, + parameter_code: str | list[str] | None = None, + *, + window: str | pd.Timedelta = "7min30s", + on_tie: Literal["first", "last", "mean"] = "first", + **kwargs, +) -> tuple[pd.DataFrame, BaseMetadata]: + """For each target timestamp, return the nearest continuous observation. + + Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause + per target, joins them as a top-level CQL ``OR`` filter, and lets + ``get_continuous`` (with its auto-chunking) fetch every observation + that falls in any window. Then, per ``(monitoring_location_id, target)`` + pair, picks the single observation with the smallest ``|time - target|``. + + The USGS continuous endpoint matches ``time`` parameters exactly rather + than fuzzily, and it does not implement ``sortby`` for arbitrary fields; + this function is the single-round-trip way to ask "what reading is + nearest this timestamp?" for many timestamps at once. + + Parameters + ---------- + targets : list-like of datetime-convertible + Target timestamps. Naive datetimes are treated as UTC. Accepts a + list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``, + or anything ``pandas.to_datetime`` consumes. + monitoring_location_id : string or list of strings, optional + Forwarded to ``get_continuous``. + parameter_code : string or list of strings, optional + Forwarded to ``get_continuous``. + window : string or ``pandas.Timedelta``, default ``"7min30s"`` + Half-window around each target. Must be small enough that every + target's window captures roughly one observation at the service + cadence. The 7min30s default matches a 15-minute continuous gauge; + use a larger value (e.g. ``"15min"``) when the gauge cadence is + longer or you need more resilience to data gaps. + on_tie : {"first", "last", "mean"}, default ``"first"`` + How to resolve ties when two observations are exactly equidistant + from a target (which happens when the target falls at the midpoint + between grid points — e.g. target ``10:22:30`` for a 15-minute + gauge). + + - ``"first"``: keep the earlier observation. + - ``"last"``: keep the later observation. + - ``"mean"``: average numeric columns; set the ``time`` column to + the target, since no real observation exists at the midpoint. + + **kwargs + Additional keyword arguments forwarded to ``get_continuous`` + (e.g. ``statistic_id``, ``approval_status``, ``properties``). + Passing ``time``, ``filter``, or ``filter_lang`` raises + ``TypeError`` — this function builds those itself. + + Returns + ------- + df : ``pandas.DataFrame`` + One row per ``(target, monitoring_location_id)`` combination that + had at least one observation in its window. Rows are augmented + with a ``target_time`` column indicating which target they + correspond to. Targets with no observations in their window are + silently dropped. + md : :class:`~dataretrieval.utils.BaseMetadata` + Metadata from the underlying ``get_continuous`` call. + + Notes + ----- + *Window sizing and ties.* When ``window`` is exactly half the service + cadence, most targets' windows contain a single observation and + ``on_tie`` is moot. Ties arise only when a target sits exactly at the + window edge — rare in practice but possible. Setting ``window`` to a + full cadence (or larger) guarantees at least one observation per + target in steady state at the cost of more bytes per response. + + *Why windowed CQL rather than sort+limit.* The API's advertised + ``sortby`` parameter would make this a one-liner per target (``filter`` + by ``time <= t`` and ``limit 1``), but it is per-query — you would need + one HTTP round-trip per target. The CQL ``OR``-chain approach folds + all N targets into one request (auto-chunked when the URL is long). + + Examples + -------- + .. code:: + + >>> import pandas as pd + >>> from dataretrieval import waterdata + + >>> # Pair three off-grid timestamps with nearby observations + >>> targets = pd.to_datetime( + ... [ + ... "2023-06-15T10:30:31Z", + ... "2023-06-15T14:07:12Z", + ... "2023-06-16T03:45:19Z", + ... ] + ... ) + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... ) + + >>> # Widen the window for an irregular-cadence gauge + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... window="30min", + ... on_tie="mean", + ... ) + """ + for forbidden in ("time", "filter", "filter_lang"): + if forbidden in kwargs: + raise TypeError( + f"get_nearest_continuous constructs its own {forbidden!r}; " + "do not pass it directly" + ) + if on_tie not in ("first", "last", "mean"): + raise ValueError(f"on_tie must be 'first', 'last', or 'mean'; got {on_tie!r}") + + targets = pd.to_datetime(list(targets), utc=True) + window_td = pd.Timedelta(window) + + if len(targets) == 0: + # Nothing to ask about — return an empty frame shaped like a real + # ``get_continuous`` response (via a trivially-empty time range). + df, md = get_continuous( + monitoring_location_id=monitoring_location_id, + parameter_code=parameter_code, + time="1900-01-01T00:00:00Z/1900-01-01T00:00:00Z", + **kwargs, + ) + return df.iloc[0:0], md + + filter_expr = " OR ".join( + f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' " + f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')" + for t in targets + ) + + df, md = get_continuous( + monitoring_location_id=monitoring_location_id, + parameter_code=parameter_code, + filter=filter_expr, + filter_lang="cql-text", + **kwargs, + ) + + if df.empty: + return df, md + + df = df.copy() + df["time"] = pd.to_datetime(df["time"], utc=True) + + if "monitoring_location_id" in df.columns: + site_groups = list(df.groupby("monitoring_location_id", sort=False)) + else: + site_groups = [(None, df)] + + selected = [] + for _, site_df in site_groups: + for target in targets: + mask = (site_df["time"] >= target - window_td) & ( + site_df["time"] <= target + window_td + ) + window_df = site_df[mask] + if window_df.empty: + continue + deltas = (window_df["time"] - target).abs() + candidates = window_df[deltas == deltas.min()].sort_values("time") + + if len(candidates) == 1 or on_tie == "first": + row = candidates.iloc[0].copy() + elif on_tie == "last": + row = candidates.iloc[-1].copy() + else: # "mean" + row = candidates.iloc[0].copy() + for col in candidates.select_dtypes("number").columns: + row[col] = candidates[col].mean() + row["time"] = target + + row["target_time"] = target + selected.append(row) + + if not selected: + empty = df.iloc[0:0].copy() + empty["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") + return empty, md + + result = pd.DataFrame(selected).reset_index(drop=True) + return result, md + + def get_monitoring_locations( monitoring_location_id: list[str] | None = None, agency_code: list[str] | None = None, diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py new file mode 100644 index 00000000..bea82320 --- /dev/null +++ b/tests/waterdata_nearest_test.py @@ -0,0 +1,246 @@ +"""Tests for ``waterdata.get_nearest_continuous``. + +All network interaction is mocked at the ``get_continuous`` boundary, so +these run without an API key and without touching the USGS servers. +""" + +from unittest import mock + +import pandas as pd +import pytest + +from dataretrieval.waterdata.api import get_nearest_continuous + + +def _fake_df(rows): + """Build a minimal continuous-response-shaped DataFrame.""" + return pd.DataFrame( + { + "time": pd.to_datetime([r["time"] for r in rows], utc=True), + "value": [r["value"] for r in rows], + "monitoring_location_id": [r.get("site", "USGS-02238500") for r in rows], + } + ) + + +@pytest.fixture +def patch_get_continuous(): + """Replace ``waterdata.api.get_continuous`` with a controllable stub.""" + with mock.patch("dataretrieval.waterdata.api.get_continuous") as m: + yield m + + +def test_returns_nearest_per_target(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z", "2023-06-15T10:45:16Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + {"time": "2023-06-15T10:45:00Z", "value": 22.5}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + parameter_code="00060", + ) + assert len(result) == 2 + assert list(result["value"]) == [22.4, 22.5] + assert list(result["target_time"]) == list(targets) + + +def test_builds_one_or_clause_per_target(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:00Z", "2023-06-16T12:00:00Z"], utc=True) + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + parameter_code="00060", + window="7min30s", + ) + _, kwargs = patch_get_continuous.call_args + filter_expr = kwargs["filter"] + assert kwargs["filter_lang"] == "cql-text" + # Two windows — one top-level OR separator + assert filter_expr.count(") OR (") == 1 + # Each target produces >= and <= bounds + assert filter_expr.count("time >= '") == 2 + assert filter_expr.count("time <= '") == 2 + # Lower bound of the first window is 7:30 before the target + assert "'2023-06-15T10:22:30Z'" in filter_expr + assert "'2023-06-15T10:37:30Z'" in filter_expr + + +def test_tie_first_keeps_earlier(patch_get_continuous): + # Target at the midpoint between two grid points + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="first", + window="7min30s", + ) + assert len(result) == 1 + assert result.iloc[0]["value"] == 22.0 + assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:15:00Z") + + +def test_tie_last_keeps_later(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="last", + window="7min30s", + ) + assert result.iloc[0]["value"] == 22.4 + assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:30:00Z") + + +def test_tie_mean_averages_numeric_and_uses_target_time(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:22:30Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:15:00Z", "value": 22.0}, + {"time": "2023-06-15T10:30:00Z", "value": 22.4}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id="USGS-02238500", + on_tie="mean", + window="7min30s", + ) + assert result.iloc[0]["value"] == pytest.approx(22.2) + # Time is set to the target since no real observation sits at the midpoint + assert result.iloc[0]["time"] == targets[0] + + +def test_target_without_observations_is_dropped(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z", "2023-07-15T10:30:31Z"], utc=True) + # Only the June target has nearby data; July returns nothing. + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous(targets, monitoring_location_id="USGS-02238500") + assert len(result) == 1 + assert result.iloc[0]["target_time"] == targets[0] + + +def test_multi_site_returns_row_per_target_per_site(patch_get_continuous): + targets = pd.to_datetime(["2023-06-15T10:30:31Z"], utc=True) + patch_get_continuous.return_value = ( + _fake_df( + [ + {"time": "2023-06-15T10:30:00Z", "value": 22.4, "site": "USGS-1"}, + {"time": "2023-06-15T10:30:00Z", "value": 99.9, "site": "USGS-2"}, + ] + ), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + targets, + monitoring_location_id=["USGS-1", "USGS-2"], + parameter_code="00060", + ) + assert len(result) == 2 + assert set(result["monitoring_location_id"]) == {"USGS-1", "USGS-2"} + + +def test_empty_targets_returns_empty_frame_without_building_filter( + patch_get_continuous, +): + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + result, _ = get_nearest_continuous([], monitoring_location_id="USGS-02238500") + assert result.empty + # The one call that happens uses a trivial time= window, not a filter. + _, kwargs = patch_get_continuous.call_args + assert "filter" not in kwargs + assert "time" in kwargs + + +def test_rejects_time_kwarg(patch_get_continuous): + with pytest.raises(TypeError, match="time"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + time="2023-06-01/2023-07-01", + ) + + +def test_rejects_filter_kwarg(patch_get_continuous): + with pytest.raises(TypeError, match="filter"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + filter="x = 1", + ) + + +def test_rejects_invalid_on_tie(patch_get_continuous): + with pytest.raises(ValueError, match="on_tie"): + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + on_tie="random", + ) + + +def test_accepts_naive_datetimes_as_utc(patch_get_continuous): + """Naive inputs must be treated as UTC (matching pandas default).""" + naive = [pd.Timestamp("2023-06-15T10:30:00")] + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous(naive, monitoring_location_id="USGS-02238500") + assert len(result) == 1 + + +def test_accepts_list_of_strings(patch_get_continuous): + patch_get_continuous.return_value = ( + _fake_df([{"time": "2023-06-15T10:30:00Z", "value": 22.4}]), + mock.Mock(), + ) + result, _ = get_nearest_continuous( + ["2023-06-15T10:30:31Z"], monitoring_location_id="USGS-02238500" + ) + assert len(result) == 1 + + +def test_forwards_kwargs_to_get_continuous(patch_get_continuous): + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + get_nearest_continuous( + [pd.Timestamp("2023-06-15", tz="UTC")], + monitoring_location_id="USGS-02238500", + parameter_code="00060", + statistic_id="00011", + approval_status="Approved", + ) + _, kwargs = patch_get_continuous.call_args + assert kwargs["statistic_id"] == "00011" + assert kwargs["approval_status"] == "Approved" From 019e93dd862a72248c722c534736dd8355a78267 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 10:51:00 -0500 Subject: [PATCH 2/5] Accept window in HH:MM:SS form; switch default to "00:07:30" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``pandas.Timedelta`` already accepts ``"00:07:30"`` identically to ``"7min30s"``, so no behaviour change is needed — just switch the default and the docstring examples to the more readable form. Added a regression test that asserts the two spellings produce the same CQL filter so future refactors can't drift. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/api.py | 18 ++++++++++-------- tests/waterdata_nearest_test.py | 27 +++++++++++++++++++++++---- 2 files changed, 33 insertions(+), 12 deletions(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index b65d6b86..5f9f0bd3 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -445,7 +445,7 @@ def get_nearest_continuous( monitoring_location_id: str | list[str] | None = None, parameter_code: str | list[str] | None = None, *, - window: str | pd.Timedelta = "7min30s", + window: str | pd.Timedelta = "00:07:30", on_tie: Literal["first", "last", "mean"] = "first", **kwargs, ) -> tuple[pd.DataFrame, BaseMetadata]: @@ -472,12 +472,14 @@ def get_nearest_continuous( Forwarded to ``get_continuous``. parameter_code : string or list of strings, optional Forwarded to ``get_continuous``. - window : string or ``pandas.Timedelta``, default ``"7min30s"`` - Half-window around each target. Must be small enough that every - target's window captures roughly one observation at the service - cadence. The 7min30s default matches a 15-minute continuous gauge; - use a larger value (e.g. ``"15min"``) when the gauge cadence is - longer or you need more resilience to data gaps. + window : string or ``pandas.Timedelta``, default ``"00:07:30"`` + Half-window around each target, in ``HH:MM:SS`` form (or any + ``pandas.Timedelta``-parseable string: ``"7min30s"``, + ``"450s"``, etc.). Must be small enough that every target's + window captures roughly one observation at the service cadence. + The ``"00:07:30"`` default matches a 15-minute continuous gauge; + use a larger value (e.g. ``"00:15:00"``) when the gauge cadence + is longer or you need more resilience to data gaps. on_tie : {"first", "last", "mean"}, default ``"first"`` How to resolve ties when two observations are exactly equidistant from a target (which happens when the target falls at the midpoint @@ -547,7 +549,7 @@ def get_nearest_continuous( ... targets, ... monitoring_location_id="USGS-02238500", ... parameter_code="00060", - ... window="30min", + ... window="00:30:00", ... on_tie="mean", ... ) """ diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py index bea82320..ce616edf 100644 --- a/tests/waterdata_nearest_test.py +++ b/tests/waterdata_nearest_test.py @@ -58,7 +58,7 @@ def test_builds_one_or_clause_per_target(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", parameter_code="00060", - window="7min30s", + window="00:07:30", ) _, kwargs = patch_get_continuous.call_args filter_expr = kwargs["filter"] @@ -89,7 +89,7 @@ def test_tie_first_keeps_earlier(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", on_tie="first", - window="7min30s", + window="00:07:30", ) assert len(result) == 1 assert result.iloc[0]["value"] == 22.0 @@ -111,7 +111,7 @@ def test_tie_last_keeps_later(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", on_tie="last", - window="7min30s", + window="00:07:30", ) assert result.iloc[0]["value"] == 22.4 assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:30:00Z") @@ -132,7 +132,7 @@ def test_tie_mean_averages_numeric_and_uses_target_time(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", on_tie="mean", - window="7min30s", + window="00:07:30", ) assert result.iloc[0]["value"] == pytest.approx(22.2) # Time is set to the target since no real observation sits at the midpoint @@ -232,6 +232,25 @@ def test_accepts_list_of_strings(patch_get_continuous): assert len(result) == 1 +def test_window_accepts_hhmmss_and_shorthand_equivalently(patch_get_continuous): + """``window="00:07:30"`` and ``window="7min30s"`` are the same duration + as far as ``pandas.Timedelta`` is concerned, so the two forms must + produce identical CQL filters.""" + targets = pd.to_datetime(["2023-06-15T10:30:00Z"], utc=True) + patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) + + get_nearest_continuous(targets, monitoring_location_id="USGS-1", window="00:07:30") + filter_hhmmss = patch_get_continuous.call_args.kwargs["filter"] + + get_nearest_continuous(targets, monitoring_location_id="USGS-1", window="7min30s") + filter_shorthand = patch_get_continuous.call_args.kwargs["filter"] + + assert filter_hhmmss == filter_shorthand + # And the bounds should be 7:30 away from the target + assert "'2023-06-15T10:22:30Z'" in filter_hhmmss + assert "'2023-06-15T10:37:30Z'" in filter_hhmmss + + def test_forwards_kwargs_to_get_continuous(patch_get_continuous): patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) get_nearest_continuous( From 8aff5228417be59e157d095653528015a9d8112c Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 11:01:48 -0500 Subject: [PATCH 3/5] Use ISO 8601 duration as the window default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Switch the default from "00:07:30" to "PT7M30S" so the user-visible contract points at an actual international standard (ISO 8601 duration) rather than a pandas-specific colon form. ``pandas.Timedelta`` still accepts all the other forms users may already have typed — ISO 8601, HH:MM:SS, shorthand ("7min30s", "450s"), or a ``pd.Timedelta`` directly — and a parametrized test now exercises each shape to lock in the "whatever ``pd.Timedelta`` takes" contract. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/api.py | 27 ++++++++++++++-------- tests/waterdata_nearest_test.py | 41 ++++++++++++++++++--------------- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 5f9f0bd3..debe3ff4 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -445,7 +445,7 @@ def get_nearest_continuous( monitoring_location_id: str | list[str] | None = None, parameter_code: str | list[str] | None = None, *, - window: str | pd.Timedelta = "00:07:30", + window: str | pd.Timedelta = "PT7M30S", on_tie: Literal["first", "last", "mean"] = "first", **kwargs, ) -> tuple[pd.DataFrame, BaseMetadata]: @@ -472,14 +472,21 @@ def get_nearest_continuous( Forwarded to ``get_continuous``. parameter_code : string or list of strings, optional Forwarded to ``get_continuous``. - window : string or ``pandas.Timedelta``, default ``"00:07:30"`` - Half-window around each target, in ``HH:MM:SS`` form (or any - ``pandas.Timedelta``-parseable string: ``"7min30s"``, - ``"450s"``, etc.). Must be small enough that every target's - window captures roughly one observation at the service cadence. - The ``"00:07:30"`` default matches a 15-minute continuous gauge; - use a larger value (e.g. ``"00:15:00"``) when the gauge cadence - is longer or you need more resilience to data gaps. + window : string or ``pandas.Timedelta``, default ``"PT7M30S"`` + Half-window around each target, as an ISO 8601 duration + (``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts + any other form ``pandas.Timedelta`` parses — ``HH:MM:SS`` + (``"00:07:30"``), pandas shorthand (``"7min30s"``, + ``"450s"``), or a ``pd.Timedelta`` directly. See the + `pandas.Timedelta docs + `_ + for the full grammar. + + Must be small enough that every target's window captures + roughly one observation at the service cadence. The default + matches a 15-minute continuous gauge; widen (e.g. + ``"PT15M"``) for irregular cadences or resilience to data + gaps. on_tie : {"first", "last", "mean"}, default ``"first"`` How to resolve ties when two observations are exactly equidistant from a target (which happens when the target falls at the midpoint @@ -549,7 +556,7 @@ def get_nearest_continuous( ... targets, ... monitoring_location_id="USGS-02238500", ... parameter_code="00060", - ... window="00:30:00", + ... window="PT30M", ... on_tie="mean", ... ) """ diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py index ce616edf..c9424f5e 100644 --- a/tests/waterdata_nearest_test.py +++ b/tests/waterdata_nearest_test.py @@ -58,7 +58,7 @@ def test_builds_one_or_clause_per_target(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", parameter_code="00060", - window="00:07:30", + window="PT7M30S", ) _, kwargs = patch_get_continuous.call_args filter_expr = kwargs["filter"] @@ -89,7 +89,7 @@ def test_tie_first_keeps_earlier(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", on_tie="first", - window="00:07:30", + window="PT7M30S", ) assert len(result) == 1 assert result.iloc[0]["value"] == 22.0 @@ -111,7 +111,7 @@ def test_tie_last_keeps_later(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", on_tie="last", - window="00:07:30", + window="PT7M30S", ) assert result.iloc[0]["value"] == 22.4 assert result.iloc[0]["time"] == pd.Timestamp("2023-06-15T10:30:00Z") @@ -132,7 +132,7 @@ def test_tie_mean_averages_numeric_and_uses_target_time(patch_get_continuous): targets, monitoring_location_id="USGS-02238500", on_tie="mean", - window="00:07:30", + window="PT7M30S", ) assert result.iloc[0]["value"] == pytest.approx(22.2) # Time is set to the target since no real observation sits at the midpoint @@ -232,23 +232,28 @@ def test_accepts_list_of_strings(patch_get_continuous): assert len(result) == 1 -def test_window_accepts_hhmmss_and_shorthand_equivalently(patch_get_continuous): - """``window="00:07:30"`` and ``window="7min30s"`` are the same duration - as far as ``pandas.Timedelta`` is concerned, so the two forms must - produce identical CQL filters.""" +@pytest.mark.parametrize( + "window", + [ + "00:07:30", # HH:MM:SS + "7min30s", # pandas shorthand + "450s", # seconds shorthand + "PT7M30S", # ISO 8601 duration + pd.Timedelta(minutes=7, seconds=30), # Timedelta object + ], +) +def test_window_accepts_any_pandas_timedelta_form(patch_get_continuous, window): + """Every representation ``pandas.Timedelta`` parses must produce the + same CQL filter. Documents the public contract: ``window`` is + whatever ``pd.Timedelta(window)`` returns.""" targets = pd.to_datetime(["2023-06-15T10:30:00Z"], utc=True) patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) - get_nearest_continuous(targets, monitoring_location_id="USGS-1", window="00:07:30") - filter_hhmmss = patch_get_continuous.call_args.kwargs["filter"] - - get_nearest_continuous(targets, monitoring_location_id="USGS-1", window="7min30s") - filter_shorthand = patch_get_continuous.call_args.kwargs["filter"] - - assert filter_hhmmss == filter_shorthand - # And the bounds should be 7:30 away from the target - assert "'2023-06-15T10:22:30Z'" in filter_hhmmss - assert "'2023-06-15T10:37:30Z'" in filter_hhmmss + get_nearest_continuous(targets, monitoring_location_id="USGS-1", window=window) + filter_expr = patch_get_continuous.call_args.kwargs["filter"] + # Bounds are 7:30 away from the target regardless of input spelling + assert "'2023-06-15T10:22:30Z'" in filter_expr + assert "'2023-06-15T10:37:30Z'" in filter_expr def test_forwards_kwargs_to_get_continuous(patch_get_continuous): From 2613792d0fa8f1e2905b522cc7ea850c94efeb84 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 15:25:49 -0500 Subject: [PATCH 4/5] Simplify get_nearest_continuous; add NEWS entry MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the helper's body into four private functions so the top-level flow reads as a short recipe: - ``_check_nearest_kwargs`` reject kwargs the helper owns (``time``/``filter``/``filter_lang``); validate ``on_tie`` - ``_build_window_or_filter`` CQL ``OR``-chain of bracketed time windows, one per target - ``_pick_nearest_row`` window → nearest row, with the three tie-resolution branches isolated - ``_empty_nearest_result`` empty frame with a ``target_time`` column, used wherever no match lands Drops the nested ``for site → for target → mask → tie-branch`` loop in favor of a flat list-comprehension + walrus against the new helper. Fixes a fragile ``pd.to_datetime(list(targets), utc=True)`` (a numpy ``datetime64`` array would round-trip through ``list`` as tz-stripped scalars) — now passes the input directly to ``pd.to_datetime`` and wraps in ``pd.DatetimeIndex``. Swaps ``df = df.copy(); df["time"] = ...`` for ``df.assign(time=...)`` to avoid the full-frame copy. Also NEWS.md: add a short entry describing the new helper. Co-Authored-By: Claude Opus 4.7 (1M context) --- NEWS.md | 2 + dataretrieval/waterdata/api.py | 152 +++++++++++++++++++++------------ 2 files changed, 98 insertions(+), 56 deletions(-) diff --git a/NEWS.md b/NEWS.md index 48c91ea4..2e2dba99 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,3 +1,5 @@ +**04/23/2026:** Added `waterdata.get_nearest_continuous(targets, ...)` — for each of N target timestamps, fetches the single continuous observation closest to that timestamp in one HTTP round-trip (auto-chunked when the resulting CQL filter is long, via the facility added in #238). The helper is designed for workflows that pair many discrete-measurement timestamps with surrounding instantaneous data, which the OGC `time` parameter can't express since it only accepts one instant or one interval per request. Ties at window midpoints are resolved per a configurable `on_tie` ∈ {`"first"`, `"last"`, `"mean"`}; the default `window="PT7M30S"` matches a 15-minute continuous gauge. + **04/22/2026:** Highlights since the `v1.1.0` release (2025-11-26), which shipped the `waterdata` module: - Added `get_channel` for channel-measurement data (#218) and `get_stats_por` / `get_stats_date_range` for period-of-record and daily statistics (#207). diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index debe3ff4..4529426e 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -560,35 +560,23 @@ def get_nearest_continuous( ... on_tie="mean", ... ) """ - for forbidden in ("time", "filter", "filter_lang"): - if forbidden in kwargs: - raise TypeError( - f"get_nearest_continuous constructs its own {forbidden!r}; " - "do not pass it directly" - ) - if on_tie not in ("first", "last", "mean"): - raise ValueError(f"on_tie must be 'first', 'last', or 'mean'; got {on_tie!r}") - - targets = pd.to_datetime(list(targets), utc=True) + _check_nearest_kwargs(kwargs, on_tie) + targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True)) window_td = pd.Timedelta(window) if len(targets) == 0: - # Nothing to ask about — return an empty frame shaped like a real - # ``get_continuous`` response (via a trivially-empty time range). + # Issue a trivial-range request so the caller still receives a + # real ``BaseMetadata``; return an empty frame with the same + # shape a real response would have. df, md = get_continuous( monitoring_location_id=monitoring_location_id, parameter_code=parameter_code, time="1900-01-01T00:00:00Z/1900-01-01T00:00:00Z", **kwargs, ) - return df.iloc[0:0], md - - filter_expr = " OR ".join( - f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' " - f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')" - for t in targets - ) + return _empty_nearest_result(df), md + filter_expr = _build_window_or_filter(targets, window_td) df, md = get_continuous( monitoring_location_id=monitoring_location_id, parameter_code=parameter_code, @@ -596,50 +584,102 @@ def get_nearest_continuous( filter_lang="cql-text", **kwargs, ) - if df.empty: - return df, md + return _empty_nearest_result(df), md - df = df.copy() - df["time"] = pd.to_datetime(df["time"], utc=True) + df = df.assign(time=pd.to_datetime(df["time"], utc=True)) + site_groups = ( + df.groupby("monitoring_location_id", sort=False) + if "monitoring_location_id" in df.columns + else [(None, df)] + ) - if "monitoring_location_id" in df.columns: - site_groups = list(df.groupby("monitoring_location_id", sort=False)) - else: - site_groups = [(None, df)] + selected = [ + row + for _, site_df in site_groups + for target in targets + if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None + ] + if not selected: + return _empty_nearest_result(df), md + return pd.DataFrame(selected).reset_index(drop=True), md + + +_VALID_ON_TIE = ("first", "last", "mean") - selected = [] - for _, site_df in site_groups: - for target in targets: - mask = (site_df["time"] >= target - window_td) & ( - site_df["time"] <= target + window_td + +def _check_nearest_kwargs(kwargs: dict, on_tie: str) -> None: + """Reject kwargs the helper owns; validate ``on_tie``.""" + for forbidden in ("time", "filter", "filter_lang"): + if forbidden in kwargs: + raise TypeError( + f"get_nearest_continuous constructs its own {forbidden!r}; " + "do not pass it directly" ) - window_df = site_df[mask] - if window_df.empty: - continue - deltas = (window_df["time"] - target).abs() - candidates = window_df[deltas == deltas.min()].sort_values("time") - - if len(candidates) == 1 or on_tie == "first": - row = candidates.iloc[0].copy() - elif on_tie == "last": - row = candidates.iloc[-1].copy() - else: # "mean" - row = candidates.iloc[0].copy() - for col in candidates.select_dtypes("number").columns: - row[col] = candidates[col].mean() - row["time"] = target - - row["target_time"] = target - selected.append(row) + if on_tie not in _VALID_ON_TIE: + raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}") - if not selected: - empty = df.iloc[0:0].copy() - empty["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") - return empty, md - result = pd.DataFrame(selected).reset_index(drop=True) - return result, md +def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str: + """Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows. + + ``get_continuous`` auto-chunks the result if the full URL would + exceed the server's length limit, so this is always safe to build + as one string even for many targets. + """ + return " OR ".join( + f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' " + f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')" + for t in targets + ) + + +def _pick_nearest_row( + site_df: pd.DataFrame, + target: pd.Timestamp, + window_td: pd.Timedelta, + on_tie: str, +) -> pd.Series | None: + """Return the single row within ``window_td`` of ``target``, or ``None``. + + Resolves ties (two rows equidistant from ``target``) per ``on_tie``. + The returned row carries a ``target_time`` column identifying which + target it was selected for. + """ + in_window = site_df[ + (site_df["time"] >= target - window_td) + & (site_df["time"] <= target + window_td) + ] + if in_window.empty: + return None + deltas = (in_window["time"] - target).abs() + candidates = in_window[deltas == deltas.min()].sort_values("time") + + if len(candidates) == 1 or on_tie == "first": + row = candidates.iloc[0].copy() + elif on_tie == "last": + row = candidates.iloc[-1].copy() + else: # "mean" — synthesize a row whose numeric cols are averaged and + # whose ``time`` is the target (no real observation sits at the midpoint). + row = candidates.iloc[0].copy() + for col in candidates.select_dtypes("number").columns: + row[col] = candidates[col].mean() + row["time"] = target + + row["target_time"] = target + return row + + +def _empty_nearest_result(template: pd.DataFrame | None = None) -> pd.DataFrame: + """Empty frame with a ``target_time`` column, for no-match cases. + + When ``template`` is provided, preserve its columns/dtypes so the + returned frame matches the shape of a real ``get_continuous`` + response. + """ + base = pd.DataFrame() if template is None else template.iloc[0:0].copy() + base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") + return base def get_monitoring_locations( From 1618f6bd97094c73c407af55add134cd94c8dd47 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Thu, 23 Apr 2026 15:34:58 -0500 Subject: [PATCH 5/5] Raise on empty targets instead of issuing a no-op request Calling ``get_nearest_continuous`` with an empty ``targets`` is almost always a caller bug (an unfiltered frame, a typo, a mis-named column). The previous code papered over it by firing a trivial-range HTTP request (``time=1900-01-01/1900-01-01``) purely so the caller received a real ``BaseMetadata`` object. That pattern wastes a round-trip on a nonsensical input and hides the bug. Raise ``ValueError`` on empty ``targets`` instead. Shrinks the body and makes a caller mistake loud. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/waterdata/api.py | 11 +---------- tests/waterdata_nearest_test.py | 17 +++++++---------- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 4529426e..370d610c 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -565,16 +565,7 @@ def get_nearest_continuous( window_td = pd.Timedelta(window) if len(targets) == 0: - # Issue a trivial-range request so the caller still receives a - # real ``BaseMetadata``; return an empty frame with the same - # shape a real response would have. - df, md = get_continuous( - monitoring_location_id=monitoring_location_id, - parameter_code=parameter_code, - time="1900-01-01T00:00:00Z/1900-01-01T00:00:00Z", - **kwargs, - ) - return _empty_nearest_result(df), md + raise ValueError("targets must contain at least one timestamp") filter_expr = _build_window_or_filter(targets, window_td) df, md = get_continuous( diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py index c9424f5e..9873d821 100644 --- a/tests/waterdata_nearest_test.py +++ b/tests/waterdata_nearest_test.py @@ -171,16 +171,13 @@ def test_multi_site_returns_row_per_target_per_site(patch_get_continuous): assert set(result["monitoring_location_id"]) == {"USGS-1", "USGS-2"} -def test_empty_targets_returns_empty_frame_without_building_filter( - patch_get_continuous, -): - patch_get_continuous.return_value = (_fake_df([]), mock.Mock()) - result, _ = get_nearest_continuous([], monitoring_location_id="USGS-02238500") - assert result.empty - # The one call that happens uses a trivial time= window, not a filter. - _, kwargs = patch_get_continuous.call_args - assert "filter" not in kwargs - assert "time" in kwargs +def test_empty_targets_raises(patch_get_continuous): + """An empty ``targets`` is a call with no useful work to do and + almost always a caller bug — raise rather than silently issuing a + no-op HTTP request.""" + with pytest.raises(ValueError, match="targets"): + get_nearest_continuous([], monitoring_location_id="USGS-02238500") + patch_get_continuous.assert_not_called() def test_rejects_time_kwarg(patch_get_continuous):