Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions paimon-python/pypaimon/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ def __init__(self, max_retries: int = 5):

@staticmethod
def __create_retry_strategy(max_retries: int) -> Retry:
# Single retry budget shared across read and status (429 / 5xx)
# errors. Connect failures are intentionally non-retriable: a
# connect error usually means the host is wrong or the listener
# is down, and burning the budget on it just delays the failure.
retry_kwargs = {
'total': max_retries,
'read': max_retries,
Expand Down Expand Up @@ -264,18 +268,29 @@ class HttpClient(RESTClient):
REQUEST_ID_KEY = "x-request-id"
DEFAULT_REQUEST_ID = "unknown"

# 3-minute connect / read timeouts and a retry budget of 5 are
# conservative defaults that work well across the cluster shapes
# we see in practice. Not exposed as ``CatalogOptions``: callers
# who need to tune them can subclass or override the class-level
# constants.
_CONNECT_TIMEOUT_SECONDS = 180
_READ_TIMEOUT_SECONDS = 180
_MAX_RETRIES = 5

def __init__(self, uri: str):
self.logger = logging.getLogger(self.__class__.__name__)
self.uri = _normalize_uri(uri)
self.error_handler = DefaultErrorHandler.get_instance()
self.session = requests.Session()

retry_interceptor = ExponentialRetry(max_retries=3)

retry_interceptor = ExponentialRetry(max_retries=self._MAX_RETRIES)
self.session.mount("http://", retry_interceptor.adapter)
self.session.mount("https://", retry_interceptor.adapter)

self.session.timeout = (180, 180)
# ``Session.timeout`` is not consulted by the requests library;
# only ``Session.request(timeout=...)`` is. Keep the value here
# and pass it explicitly on every call (see ``_execute_request``).
self._timeout = (self._CONNECT_TIMEOUT_SECONDS, self._READ_TIMEOUT_SECONDS)

self.session.headers.update({
'Accept': 'application/json'
Expand Down Expand Up @@ -361,7 +376,8 @@ def _execute_request(self, method: str, url: str,
method=method,
url=url,
data=data.encode('utf-8') if data else None,
headers=headers
headers=headers,
timeout=self._timeout,
)
duration_ms = (int(time.time() * 1_000_000_000) - start_time) // 1_000_000
response_request_id = response.headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID)
Expand Down
33 changes: 32 additions & 1 deletion paimon-python/pypaimon/tests/rest/client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# limitations under the License.

import unittest
from unittest import mock

from pypaimon.api.client import _parse_error_response
from pypaimon.api.client import HttpClient, _parse_error_response


class HttpClientTest(unittest.TestCase):
Expand Down Expand Up @@ -58,5 +59,35 @@ def test_parse_error_response_with_unparsable_json(self):
self.assertEqual(error.resource_name, '')


class HttpClientTimeoutTest(unittest.TestCase):
"""HttpClient passes its timeout to ``Session.request``.

``Session.timeout`` was previously set as an attribute, which the
requests library does not honour — only ``Session.request(timeout=
...)`` does. This test pins the fix in place: every outgoing call
must carry the configured ``(connect, read)`` tuple, otherwise the
process would hang forever on a slow upstream.
"""

def test_session_request_receives_timeout_tuple(self):
client = HttpClient("http://localhost:8080")
# 3-minute connect / read timeouts are the conservative default
# documented on ``HttpClient``; pin them here so the value is
# caught if someone changes them silently.
self.assertEqual(client._timeout, (180, 180))

with mock.patch.object(client.session, 'request') as req:
req.return_value = mock.Mock(
status_code=200, text='{}', headers={},
json=lambda: {})
client._execute_request('GET', 'http://localhost:8080/x')

self.assertTrue(req.called, "session.request must be invoked")
kwargs = req.call_args.kwargs
self.assertEqual(
kwargs.get('timeout'), client._timeout,
"request must carry the timeout; Session.timeout is dead code")


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
Expand Up @@ -27,36 +27,39 @@

class TestExponentialRetryStrategy(unittest.TestCase):

def setUp(self):
self.retry_strategy = ExponentialRetry(max_retries=5)

def test_basic_retry(self):
retry = ExponentialRetry._ExponentialRetry__create_retry_strategy(5)

self.assertEqual(retry.total, 5)
self.assertEqual(retry.read, 5)
self.assertEqual(retry.connect, 0) # Connection errors should not retry

# Connect failures are intentionally non-retriable — see the
# comment on ``ExponentialRetry.__create_retry_strategy``.
self.assertEqual(retry.connect, 0)

self.assertIn(429, retry.status_forcelist) # Too Many Requests
self.assertIn(503, retry.status_forcelist) # Service Unavailable
self.assertNotIn(404, retry.status_forcelist)

def test_no_retry_on_connect_error(self):
def test_retry_on_connect_error(self):
# ``connect=0`` means connect errors are not retried — the
# request should fail fast within roughly the connect timeout.
retry_strategy = ExponentialRetry(max_retries=2)
session = requests.Session()
session.mount("http://", self.retry_strategy.adapter)
session.mount("https://", self.retry_strategy.adapter)
session.timeout = (1, 1)
session.mount("http://", retry_strategy.adapter)
session.mount("https://", retry_strategy.adapter)

start_time = time.time()

try:
session.get("http://192.168.255.255:9999", timeout=(1, 1))
self.fail("Expected ConnectionError")
except (ConnectionError, ConnectTimeout, Timeout, NewConnectionError, MaxRetryError):
elapsed = time.time() - start_time
# No connect retries → bail out within roughly the connect
# timeout, with no exponential backoff.
self.assertLess(
elapsed, 5.0,
f"Connection error took {elapsed:.2f}s, should fail quickly without retry"
"connect failures should not be retried (got {:.2f}s)".format(elapsed)
)


Expand Down
Loading