Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
## 0.45.0

### Features
* Make the split-PDF `httpx.AsyncClient` connection-pool limits configurable via env vars: `UNSTRUCTURED_CLIENT_MAX_CONNECTIONS` (default `100`), `UNSTRUCTURED_CLIENT_MAX_KEEPALIVE_CONNECTIONS` (default `20`), and `UNSTRUCTURED_CLIENT_KEEPALIVE_EXPIRY` (default `5.0`s). Defaults match httpx, so behavior is unchanged unless set. Useful when deploying behind a connect-time-only load balancer (e.g. Kubernetes ClusterIP without a mesh) where shorter keepalives force connections to redistribute across backend pods.
* Honor the standard `SSL_CERT_FILE` / `REQUESTS_CA_BUNDLE` env vars to point the split-PDF `httpx.AsyncClient` at a custom trust store, so a single env-var setting applies uniformly across Python tooling.
* Add `UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT` and `UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY` env vars to wire an mTLS client certificate into the split-PDF `httpx.AsyncClient` (single PEM, or split cert + key files).
* Extend the split-PDF `event=plan_created` log to include the resolved pool limits and trust-store / mTLS mode so the active config is visible in production logs.

## 0.44.1

### Features
Expand Down
10 changes: 10 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -1231,3 +1231,13 @@ Based on:
- [python v0.44.1] .
### Releases
- [PyPI v0.44.1] https://pypi.org/project/unstructured-client/0.44.1 - .

## 2026-06-05 00:00:00
### Changes
Based on:
- OpenAPI Doc
- Speakeasy CLI 1.601.0 (2.680.0) https://github.com/speakeasy-api/speakeasy
### Generated
- [python v0.45.0] .
### Releases
- [PyPI v0.45.0] https://pypi.org/project/unstructured-client/0.45.0 - .
178 changes: 178 additions & 0 deletions _test_unstructured_client/unit/test_split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,184 @@ async def test_remaining_tasks_cancelled_when_fails_disallowed():
assert len(tasks) > cancelled_counter["cancelled"] > 0


@pytest.mark.asyncio
async def test_unit_run_tasks_pool_limits_configurable_via_env(
monkeypatch: pytest.MonkeyPatch,
):
"""Env vars override the httpx.AsyncClient connection-pool limits.

Operators running the SDK in a Kubernetes Deployment with
connect-time-only load balancing (kube-proxy ClusterIP, meshless)
need to be able to shrink the keepalive pool so connections recycle
frequently and redistribute across backend pods.
"""
monkeypatch.setenv("UNSTRUCTURED_CLIENT_MAX_CONNECTIONS", "7")
monkeypatch.setenv("UNSTRUCTURED_CLIENT_MAX_KEEPALIVE_CONNECTIONS", "1")
monkeypatch.setenv("UNSTRUCTURED_CLIENT_KEEPALIVE_EXPIRY", "30.0")

captured: dict[str, httpx.Limits] = {}
real_async_client = httpx.AsyncClient

def _capturing_async_client(*args, **kwargs):
captured["limits"] = kwargs.get("limits")
return real_async_client(*args, **kwargs)

with patch(
"unstructured_client._hooks.custom.split_pdf_hook.httpx.AsyncClient",
side_effect=_capturing_async_client,
):
await run_tasks(
[partial(_request_mock, fails=False, content="ok")],
allow_failed=True,
)

limits = captured["limits"]
assert isinstance(limits, httpx.Limits)
assert limits.max_connections == 7
assert limits.max_keepalive_connections == 1
assert limits.keepalive_expiry == 30.0


_TLS_ENV_VARS = (
"SSL_CERT_FILE",
"REQUESTS_CA_BUNDLE",
"UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT",
"UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY",
)


def test_unit_resolve_tls_config_defaults_unchanged_without_env(
monkeypatch: pytest.MonkeyPatch,
):
"""No env vars set → verify=True, cert=None (httpx defaults). Backward
compatibility check — callers that don't opt into TLS config see no
behavior change."""
from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config

for var in _TLS_ENV_VARS:
monkeypatch.delenv(var, raising=False)

verify, cert = _resolve_tls_config()
assert verify is True
assert cert is None


def test_unit_resolve_tls_config_ssl_cert_file_from_env(
monkeypatch: pytest.MonkeyPatch, tmp_path
):
"""SSL_CERT_FILE (stdlib ssl convention) → verify=<path>. Use case:
internal CA bundle shared with other Python tooling."""
from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config

for var in _TLS_ENV_VARS:
monkeypatch.delenv(var, raising=False)

ca_bundle = tmp_path / "custom-ca.pem"
ca_bundle.write_text("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
monkeypatch.setenv("SSL_CERT_FILE", str(ca_bundle))

verify, cert = _resolve_tls_config()
assert verify == str(ca_bundle)
assert cert is None


def test_unit_resolve_tls_config_requests_ca_bundle_from_env(
monkeypatch: pytest.MonkeyPatch, tmp_path
):
"""REQUESTS_CA_BUNDLE (requests/httpx-ecosystem convention) → verify=<path>."""
from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config

for var in _TLS_ENV_VARS:
monkeypatch.delenv(var, raising=False)

ca_bundle = tmp_path / "custom-ca.pem"
ca_bundle.write_text("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
monkeypatch.setenv("REQUESTS_CA_BUNDLE", str(ca_bundle))

verify, cert = _resolve_tls_config()
assert verify == str(ca_bundle)
assert cert is None


def test_unit_resolve_tls_config_ssl_cert_file_wins_over_requests_ca_bundle(
monkeypatch: pytest.MonkeyPatch,
):
"""If both standard env vars are set, SSL_CERT_FILE takes precedence —
it's the lower-level stdlib convention."""
from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config

monkeypatch.setenv("SSL_CERT_FILE", "/etc/ssl/stdlib-ca.pem")
monkeypatch.setenv("REQUESTS_CA_BUNDLE", "/etc/ssl/requests-ca.pem")

verify, _ = _resolve_tls_config()
assert verify == "/etc/ssl/stdlib-ca.pem"


def test_unit_resolve_tls_config_client_cert_only(monkeypatch: pytest.MonkeyPatch):
"""UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT alone → cert=<path>. httpx will
read the private key from the same PEM file."""
from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config

monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", "/etc/ssl/client.pem")
monkeypatch.delenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY", raising=False)

verify, cert = _resolve_tls_config()
assert cert == "/etc/ssl/client.pem"


def test_unit_resolve_tls_config_client_cert_and_key_split(
monkeypatch: pytest.MonkeyPatch,
):
"""Both _CLIENT_CERT and _CLIENT_KEY → cert=(cert_path, key_path). For
PKI setups where cert and key live in separate files."""
from unstructured_client._hooks.custom.split_pdf_hook import _resolve_tls_config

monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", "/etc/ssl/client.crt")
monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY", "/etc/ssl/client.key")

verify, cert = _resolve_tls_config()
assert cert == ("/etc/ssl/client.crt", "/etc/ssl/client.key")


@pytest.mark.asyncio
async def test_unit_run_tasks_forwards_tls_config_to_httpx_async_client(
monkeypatch: pytest.MonkeyPatch, tmp_path
):
"""run_tasks() actually wires verify+cert into httpx.AsyncClient(). End-to-
end check that _resolve_tls_config is called and its result reaches the
client construction. AsyncClient is fully mocked so we don't have to feed
httpx a real cert chain."""
ca_bundle = tmp_path / "ca.pem"
ca_bundle.write_text("-----BEGIN CERTIFICATE-----\nfake\n-----END CERTIFICATE-----\n")
monkeypatch.setenv("SSL_CERT_FILE", str(ca_bundle))
monkeypatch.setenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT", "/etc/ssl/client.pem")

captured: dict = {}

class _MockAsyncClient:
def __init__(self, *args, **kwargs):
captured["verify"] = kwargs.get("verify")
captured["cert"] = kwargs.get("cert")

async def __aenter__(self):
return self

async def __aexit__(self, *exc):
return False

with patch(
"unstructured_client._hooks.custom.split_pdf_hook.httpx.AsyncClient",
new=_MockAsyncClient,
):
await run_tasks(
[partial(_request_mock, fails=False, content="ok")],
allow_failed=True,
)

assert captured["verify"] == str(ca_bundle)
assert captured["cert"] == "/etc/ssl/client.pem"


@patch("unstructured_client._hooks.custom.form_utils.Path")
def test_unit_get_split_pdf_cache_tmp_data_dir_uses_dir_from_form_data(mock_path: MagicMock):
"""Test get_split_pdf_cache_tmp_data_dir uses the directory from the form data."""
Expand Down
99 changes: 96 additions & 3 deletions src/unstructured_client/_hooks/custom/split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,84 @@ async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Respons
return index, response


def _resolve_pool_limits() -> httpx.Limits:
"""Resolve httpx connection-pool limits from environment variables.

Defaults match httpx's built-in defaults (max_connections=100,
max_keepalive_connections=20, keepalive_expiry=5.0) so behavior is
unchanged for callers that do not set the env vars. Operators running
the SDK inside a Kubernetes Deployment that load-balances only at
TCP-connect time (e.g. kube-proxy ClusterIP, no service mesh) can
lower these values to force frequent reconnects and redistribute
traffic across backend pods.
"""
return httpx.Limits(
max_connections=int(os.getenv("UNSTRUCTURED_CLIENT_MAX_CONNECTIONS", "100")),
max_keepalive_connections=int(
os.getenv("UNSTRUCTURED_CLIENT_MAX_KEEPALIVE_CONNECTIONS", "20")
),
keepalive_expiry=float(
os.getenv("UNSTRUCTURED_CLIENT_KEEPALIVE_EXPIRY", "5.0")
),
)


def _resolve_tls_config() -> tuple[Union[bool, str], Optional[Union[str, tuple[str, str]]]]:
"""Resolve httpx TLS trust-store and mTLS client-certificate config
from environment variables.

Returns a (verify, cert) tuple suitable for `httpx.AsyncClient(verify=..., cert=...)`.

Trust store (`verify`) — honors the same standard env vars other
libraries use, so a single env-var setting applies uniformly across
tools:
- `SSL_CERT_FILE` (path): stdlib `ssl` convention.
- `REQUESTS_CA_BUNDLE` (path): `requests` / `httpx`-ecosystem
convention. Checked if `SSL_CERT_FILE` is unset.
- Otherwise: `True` (httpx default — use system trust store).

mTLS client certificate (`cert`):
- `UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT` (path): PEM file. By default
httpx will read the private key from the same file.
- `UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY` (path, optional): use this
separate key file. Required only if cert and key are split.
- Otherwise: `None`.

Defaults match httpx's built-in defaults so behavior is unchanged for
callers that don't set any of these variables.
"""
verify: Union[bool, str] = (
os.getenv("SSL_CERT_FILE") or os.getenv("REQUESTS_CA_BUNDLE") or True
)

cert: Optional[Union[str, tuple[str, str]]] = None
if client_cert := os.getenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_CERT"):
if client_key := os.getenv("UNSTRUCTURED_CLIENT_TLS_CLIENT_KEY"):
cert = (client_cert, client_key)
else:
cert = client_cert

return verify, cert


def _describe_tls_config(
verify: Union[bool, str], cert: Optional[Union[str, tuple[str, str]]]
) -> str:
"""Short human-readable summary of the TLS config, safe for log output.
Emits "system-trust" / "custom-ca-bundle" rather than the actual file
path, so logs don't leak filesystem layout."""
verify_desc = "system-trust" if verify is True else "custom-ca-bundle"

if cert is None:
cert_desc = "none"
elif isinstance(cert, tuple):
cert_desc = "cert+key"
else:
cert_desc = "cert-only"

return f"trust_store={verify_desc} mtls_cert={cert_desc}"


async def run_tasks(
coroutines: list[partial[Coroutine[Any, Any, httpx.Response]]],
allow_failed: bool = False,
Expand Down Expand Up @@ -205,16 +283,25 @@ async def run_tasks(
client_timeout_minutes = int(timeout_var)
client_timeout = httpx.Timeout(60 * client_timeout_minutes)

limits = _resolve_pool_limits()
verify, cert = _resolve_tls_config()

logger.debug(
"split_pdf event=batch_async_start operation_id=%s chunk_count=%d concurrency=%d client_timeout=%s allow_failed=%s",
"split_pdf event=batch_async_start operation_id=%s chunk_count=%d concurrency=%d client_timeout=%s allow_failed=%s pool_max_connections=%s pool_max_keepalive=%s pool_keepalive_expiry=%s tls=%s",
operation_id,
len(coroutines),
concurrency_level,
client_timeout,
allow_failed,
limits.max_connections,
limits.max_keepalive_connections,
limits.keepalive_expiry,
_describe_tls_config(verify, cert),
)

async with httpx.AsyncClient(timeout=client_timeout) as client:
async with httpx.AsyncClient(
timeout=client_timeout, limits=limits, verify=verify, cert=cert
) as client:
armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore
tasks = [
asyncio.create_task(_order_keeper(index, coro))
Expand Down Expand Up @@ -770,8 +857,10 @@ def _before_request_unlocked(
)
self.coroutines_to_execute[operation_id].append(coroutine)

plan_limits = _resolve_pool_limits()
plan_verify, plan_cert = _resolve_tls_config()
logger.info(
"split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s",
"split_pdf event=plan_created operation_id=%s filename=%s strategy=%s page_range=%s-%s page_count=%d split_size=%d chunk_count=%d concurrency=%d allow_failed=%s cache_mode=%s timeout_seconds=%s retry_config_mode=%s pool_max_connections=%d pool_max_keepalive=%d pool_keepalive_expiry=%.1fs tls=%s",
operation_id,
Path(pdf_file_meta["filename"]).name,
form_data.get("strategy"),
Expand All @@ -790,6 +879,10 @@ def _before_request_unlocked(
self._retry_config_observability_mode(
self.operation_retry_configs.get(operation_id),
),
plan_limits.max_connections,
plan_limits.max_keepalive_connections,
plan_limits.keepalive_expiry,
_describe_tls_config(plan_verify, plan_cert),
)

self.pending_operation_ids[operation_id] = operation_id
Expand Down
4 changes: 2 additions & 2 deletions src/unstructured_client/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import importlib.metadata

__title__: str = "unstructured-client"
__version__: str = "0.44.1"
__version__: str = "0.45.0"
__openapi_doc_version__: str = "1.2.31"
__gen_version__: str = "2.680.0"
__user_agent__: str = "speakeasy-sdk/python 0.44.1 2.680.0 1.2.31 unstructured-client"
__user_agent__: str = "speakeasy-sdk/python 0.45.0 2.680.0 1.2.31 unstructured-client"

try:
if __package__ is not None:
Expand Down