diff --git a/CHANGELOG.md b/CHANGELOG.md index a5d7c651..0661441e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,11 @@ +## 0.45.0 + +### Features +* Make the split-PDF `httpx.AsyncClient` connection-pool limits configurable via env vars: `UNSTRUCTURED_CLIENT_MAX_CONNECTIONS` (default `100`), `UNSTRUCTURED_CLIENT_MAX_KEEPALIVE_CONNECTIONS` (default `20`), and `UNSTRUCTURED_CLIENT_KEEPALIVE_EXPIRY` (default `5.0`s). Defaults match httpx, so behavior is unchanged unless set. Useful when deploying behind a connect-time-only load balancer (e.g. Kubernetes ClusterIP without a mesh) where shorter keepalives force connections to redistribute across backend pods. +* Honor the standard `SSL_CERT_FILE` / `REQUESTS_CA_BUNDLE` env vars to point the split-PDF `httpx.AsyncClient` at a custom trust store, so a single env-var setting applies uniformly across Python tooling. +* Add `UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT` and `UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY` env vars to wire an mTLS client certificate into the split-PDF `httpx.AsyncClient` (single PEM, or split cert + key files). +* Extend the split-PDF `event=plan_created` log to include the resolved pool limits and trust-store / mTLS mode so the active config is visible in production logs. + ## 0.44.1 ### Features diff --git a/RELEASES.md b/RELEASES.md index 10855b06..51643612 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1231,3 +1231,13 @@ Based on: - [python v0.44.1] . ### Releases - [PyPI v0.44.1] https://pypi.org/project/unstructured-client/0.44.1 - . + +## 2026-06-05 00:00:00 +### Changes +Based on: +- OpenAPI Doc +- Speakeasy CLI 1.601.0 (2.680.0) https://github.com/speakeasy-api/speakeasy +### Generated +- [python v0.45.0] . +### Releases +- [PyPI v0.45.0] https://pypi.org/project/unstructured-client/0.45.0 - . diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index bf0a83da..9a69d9a8 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -482,6 +482,184 @@ async def test_remaining_tasks_cancelled_when_fails_disallowed(): assert len(tasks) > cancelled_counter["cancelled"] > 0 +@pytest.mark.asyncio +async def test_unit_run_tasks_pool_limits_configurable_via_env( + monkeypatch: pytest.MonkeyPatch, +): + """Env vars override the httpx.AsyncClient connection-pool limits. + + Operators running the SDK in a Kubernetes Deployment with + connect-time-only load balancing (kube-proxy ClusterIP, meshless) + need to be able to shrink the keepalive pool so connections recycle + frequently and redistribute across backend pods. + """ + monkeypatch.setenv("UNSTRUCTURED_CLIENT_MAX_CONNECTIONS", "7") + monkeypatch.setenv("UNSTRUCTURED_CLIENT_MAX_KEEPALIVE_CONNECTIONS", "1") + monkeypatch.setenv("UNSTRUCTURED_CLIENT_KEEPALIVE_EXPIRY", "30.0") + + captured: dict[str, httpx.Limits] = {} + real_async_client = httpx.AsyncClient + + def _capturing_async_client(*args, **kwargs): + captured["limits"] = kwargs.get("limits") + return real_async_client(*args, **kwargs) + + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.httpx.AsyncClient", + side_effect=_capturing_async_client, + ): + await run_tasks( + [partial(_request_mock, fails=False, content="ok")], + allow_failed=True, + ) + + limits = captured["limits"] + assert isinstance(limits, httpx.Limits) + assert limits.max_connections == 7 + assert limits.max_keepalive_connections == 1 + assert limits.keepalive_expiry == 30.0 + + +_TLS_ENV_VARS = ( + "SSL_CERT_FILE", + "REQUESTS_CA_BUNDLE", + "UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", + "UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY", +) + + +def test_unit_resolve_tls_config_defaults_unchanged_without_env( + monkeypatch: pytest.MonkeyPatch, +): + """No env vars set → verify=True, cert=None (httpx defaults). Backward + compatibility check — callers that don't opt into TLS config see no + behavior change.""" + from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config + + for var in _TLS_ENV_VARS: + monkeypatch.delenv(var, raising=False) + + verify, cert = _resolve_tls_config() + assert verify is True + assert cert is None + + +def test_unit_resolve_tls_config_ssl_cert_file_from_env( + monkeypatch: pytest.MonkeyPatch, tmp_path +): + """SSL_CERT_FILE (stdlib ssl convention) → verify=. Use case: + internal CA bundle shared with other Python tooling.""" + from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config + + for var in _TLS_ENV_VARS: + monkeypatch.delenv(var, raising=False) + + ca_bundle = tmp_path / "custom-ca.pem" + ca_bundle.write_text("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n") + monkeypatch.setenv("SSL_CERT_FILE", str(ca_bundle)) + + verify, cert = _resolve_tls_config() + assert verify == str(ca_bundle) + assert cert is None + + +def test_unit_resolve_tls_config_requests_ca_bundle_from_env( + monkeypatch: pytest.MonkeyPatch, tmp_path +): + """REQUESTS_CA_BUNDLE (requests/httpx-ecosystem convention) → verify=.""" + from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config + + for var in _TLS_ENV_VARS: + monkeypatch.delenv(var, raising=False) + + ca_bundle = tmp_path / "custom-ca.pem" + ca_bundle.write_text("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n") + monkeypatch.setenv("REQUESTS_CA_BUNDLE", str(ca_bundle)) + + verify, cert = _resolve_tls_config() + assert verify == str(ca_bundle) + assert cert is None + + +def test_unit_resolve_tls_config_ssl_cert_file_wins_over_requests_ca_bundle( + monkeypatch: pytest.MonkeyPatch, +): + """If both standard env vars are set, SSL_CERT_FILE takes precedence — + it's the lower-level stdlib convention.""" + from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config + + monkeypatch.setenv("SSL_CERT_FILE", "/etc/ssl/stdlib-ca.pem") + monkeypatch.setenv("REQUESTS_CA_BUNDLE", "/etc/ssl/requests-ca.pem") + + verify, _ = _resolve_tls_config() + assert verify == "/etc/ssl/stdlib-ca.pem" + + +def test_unit_resolve_tls_config_client_cert_only(monkeypatch: pytest.MonkeyPatch): + """UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT alone → cert=. httpx will + read the private key from the same PEM file.""" + from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config + + monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", "/etc/ssl/client.pem") + monkeypatch.delenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY", raising=False) + + verify, cert = _resolve_tls_config() + assert cert == "/etc/ssl/client.pem" + + +def test_unit_resolve_tls_config_client_cert_and_key_split( + monkeypatch: pytest.MonkeyPatch, +): + """Both _CLIENT_CERT and _CLIENT_KEY → cert=(cert_path, key_path). For + PKI setups where cert and key live in separate files.""" + from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config + + monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", "/etc/ssl/client.crt") + monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY", "/etc/ssl/client.key") + + verify, cert = _resolve_tls_config() + assert cert == ("/etc/ssl/client.crt", "/etc/ssl/client.key") + + +@pytest.mark.asyncio +async def test_unit_run_tasks_forwards_tls_config_to_httpx_async_client( + monkeypatch: pytest.MonkeyPatch, tmp_path +): + """run_tasks() actually wires verify+cert into httpx.AsyncClient(). End-to- + end check that _resolve_tls_config is called and its result reaches the + client construction. AsyncClient is fully mocked so we don't have to feed + httpx a real cert chain.""" + ca_bundle = tmp_path / "ca.pem" + ca_bundle.write_text("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n") + monkeypatch.setenv("SSL_CERT_FILE", str(ca_bundle)) + monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", "/etc/ssl/client.pem") + + captured: dict = {} + + class _MockAsyncClient: + def __init__(self, *args, **kwargs): + captured["verify"] = kwargs.get("verify") + captured["cert"] = kwargs.get("cert") + + async def __aenter__(self): + return self + + async def __aexit__(self, *exc): + return False + + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.httpx.AsyncClient", + new=_MockAsyncClient, + ): + await run_tasks( + [partial(_request_mock, fails=False, content="ok")], + allow_failed=True, + ) + + assert captured["verify"] == str(ca_bundle) + assert captured["cert"] == "/etc/ssl/client.pem" + + @patch("unstructured_client._hooks.custom.form_utils.Path") def test_unit_get_split_pdf_cache_tmp_data_dir_uses_dir_from_form_data(mock_path: MagicMock): """Test get_split_pdf_cache_tmp_data_dir uses the directory from the form data.""" diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 7a6b3489..912da332 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -177,6 +177,84 @@ async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Respons return index, response +def _resolve_pool_limits() -> httpx.Limits: + """Resolve httpx connection-pool limits from environment variables. + + Defaults match httpx's built-in defaults (max_connections=100, + max_keepalive_connections=20, keepalive_expiry=5.0) so behavior is + unchanged for callers that do not set the env vars. Operators running + the SDK inside a Kubernetes Deployment that load-balances only at + TCP-connect time (e.g. kube-proxy ClusterIP, no service mesh) can + lower these values to force frequent reconnects and redistribute + traffic across backend pods. + """ + return httpx.Limits( + max_connections=int(os.getenv("UNSTRUCTURED_CLIENT_MAX_CONNECTIONS", "100")), + max_keepalive_connections=int( + os.getenv("UNSTRUCTURED_CLIENT_MAX_KEEPALIVE_CONNECTIONS", "20") + ), + keepalive_expiry=float( + os.getenv("UNSTRUCTURED_CLIENT_KEEPALIVE_EXPIRY", "5.0") + ), + ) + + +def _resolve_tls_config() -> tuple[Union[bool, str], Optional[Union[str, tuple[str, str]]]]: + """Resolve httpx TLS trust-store and mTLS client-certificate config + from environment variables. + + Returns a (verify, cert) tuple suitable for `httpx.AsyncClient(verify=..., cert=...)`. + + Trust store (`verify`) — honors the same standard env vars other + libraries use, so a single env-var setting applies uniformly across + tools: + - `SSL_CERT_FILE` (path): stdlib `ssl` convention. + - `REQUESTS_CA_BUNDLE` (path): `requests` / `httpx`-ecosystem + convention. Checked if `SSL_CERT_FILE` is unset. + - Otherwise: `True` (httpx default — use system trust store). + + mTLS client certificate (`cert`): + - `UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT` (path): PEM file. By default + httpx will read the private key from the same file. + - `UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY` (path, optional): use this + separate key file. Required only if cert and key are split. + - Otherwise: `None`. + + Defaults match httpx's built-in defaults so behavior is unchanged for + callers that don't set any of these variables. + """ + verify: Union[bool, str] = ( + os.getenv("SSL_CERT_FILE") or os.getenv("REQUESTS_CA_BUNDLE") or True + ) + + cert: Optional[Union[str, tuple[str, str]]] = None + if client_cert := os.getenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT"): + if client_key := os.getenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY"): + cert = (client_cert, client_key) + else: + cert = client_cert + + return verify, cert + + +def _describe_tls_config( + verify: Union[bool, str], cert: Optional[Union[str, tuple[str, str]]] +) -> str: + """Short human-readable summary of the TLS config, safe for log output. + Emits "system-trust" / "custom-ca-bundle" rather than the actual file + path, so logs don't leak filesystem layout.""" + verify_desc = "system-trust" if verify is True else "custom-ca-bundle" + + if cert is None: + cert_desc = "none" + elif isinstance(cert, tuple): + cert_desc = "cert+key" + else: + cert_desc = "cert-only" + + return f"trust_store={verify_desc} mtls_cert={cert_desc}" + + async def run_tasks( coroutines: list[partial[Coroutine[Any, Any, httpx.Response]]], allow_failed: bool = False, @@ -205,16 +283,25 @@ async def run_tasks( client_timeout_minutes = int(timeout_var) client_timeout = httpx.Timeout(60 * client_timeout_minutes) + limits = _resolve_pool_limits() + verify, cert = _resolve_tls_config() + logger.debug( - "split_pdf event=batch_async_start operation_id=%s chunk_count=%d concurrency=%d client_timeout=%s allow_failed=%s", + "split_pdf event=batch_async_start operation_id=%s chunk_count=%d concurrency=%d client_timeout=%s allow_failed=%s pool_max_connections=%s pool_max_keepalive=%s pool_keepalive_expiry=%s tls=%s", operation_id, len(coroutines), concurrency_level, client_timeout, allow_failed, + limits.max_connections, + limits.max_keepalive_connections, + limits.keepalive_expiry, + _describe_tls_config(verify, cert), ) - async with httpx.AsyncClient(timeout=client_timeout) as client: + async with httpx.AsyncClient( + timeout=client_timeout, limits=limits, verify=verify, cert=cert + ) as client: armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore tasks = [ asyncio.create_task(_order_keeper(index, coro)) @@ -770,8 +857,10 @@ def _before_request_unlocked( ) self.coroutines_to_execute[operation_id].append(coroutine) + plan_limits = _resolve_pool_limits() + plan_verify, plan_cert = _resolve_tls_config() logger.info( - "split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s", + "split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s pool_max_connections=%d pool_max_keepalive=%d pool_keepalive_expiry=%.1fs tls=%s", operation_id, Path(pdf_file_meta["filename"]).name, form_data.get("strategy"), @@ -790,6 +879,10 @@ def _before_request_unlocked( self._retry_config_observability_mode( self.operation_retry_configs.get(operation_id), ), + plan_limits.max_connections, + plan_limits.max_keepalive_connections, + plan_limits.keepalive_expiry, + _describe_tls_config(plan_verify, plan_cert), ) self.pending_operation_ids[operation_id] = operation_id diff --git a/src/unstructured_client/_version.py b/src/unstructured_client/_version.py index cffd0888..8a14e983 100644 --- a/src/unstructured_client/_version.py +++ b/src/unstructured_client/_version.py @@ -3,10 +3,10 @@ import importlib.metadata __title__: str = "unstructured-client" -__version__: str = "0.44.1" +__version__: str = "0.45.0" __openapi_doc_version__: str = "1.2.31" __gen_version__: str = "2.680.0" -__user_agent__: str = "speakeasy-sdk/python 0.44.1 2.680.0 1.2.31 unstructured-client" +__user_agent__: str = "speakeasy-sdk/python 0.45.0 2.680.0 1.2.31 unstructured-client" try: if __package__ is not None: