diff --git a/paimon-python/pypaimon/api/client.py b/paimon-python/pypaimon/api/client.py index c2f38912f432..0575f696fc3d 100644 --- a/paimon-python/pypaimon/api/client.py +++ b/paimon-python/pypaimon/api/client.py @@ -155,6 +155,10 @@ def __init__(self, max_retries: int = 5): @staticmethod def __create_retry_strategy(max_retries: int) -> Retry: + # Single retry budget shared across read and status (429 / 5xx) + # errors. Connect failures are intentionally non-retriable: a + # connect error usually means the host is wrong or the listener + # is down, and burning the budget on it just delays the failure. retry_kwargs = { 'total': max_retries, 'read': max_retries, @@ -264,18 +268,29 @@ class HttpClient(RESTClient): REQUEST_ID_KEY = "x-request-id" DEFAULT_REQUEST_ID = "unknown" + # 3-minute connect / read timeouts and a retry budget of 5 are + # conservative defaults that work well across the cluster shapes + # we see in practice. Not exposed as ``CatalogOptions``: callers + # who need to tune them can subclass or override the class-level + # constants. + _CONNECT_TIMEOUT_SECONDS = 180 + _READ_TIMEOUT_SECONDS = 180 + _MAX_RETRIES = 5 + def __init__(self, uri: str): self.logger = logging.getLogger(self.__class__.__name__) self.uri = _normalize_uri(uri) self.error_handler = DefaultErrorHandler.get_instance() self.session = requests.Session() - retry_interceptor = ExponentialRetry(max_retries=3) - + retry_interceptor = ExponentialRetry(max_retries=self._MAX_RETRIES) self.session.mount("http://", retry_interceptor.adapter) self.session.mount("https://", retry_interceptor.adapter) - self.session.timeout = (180, 180) + # ``Session.timeout`` is not consulted by the requests library; + # only ``Session.request(timeout=...)`` is. Keep the value here + # and pass it explicitly on every call (see ``_execute_request``). + self._timeout = (self._CONNECT_TIMEOUT_SECONDS, self._READ_TIMEOUT_SECONDS) self.session.headers.update({ 'Accept': 'application/json' @@ -361,7 +376,8 @@ def _execute_request(self, method: str, url: str, method=method, url=url, data=data.encode('utf-8') if data else None, - headers=headers + headers=headers, + timeout=self._timeout, ) duration_ms = (int(time.time() * 1_000_000_000) - start_time) // 1_000_000 response_request_id = response.headers.get(self.REQUEST_ID_KEY, self.DEFAULT_REQUEST_ID) diff --git a/paimon-python/pypaimon/tests/rest/client_test.py b/paimon-python/pypaimon/tests/rest/client_test.py index 6f381214c7ab..ba67e150b2b3 100644 --- a/paimon-python/pypaimon/tests/rest/client_test.py +++ b/paimon-python/pypaimon/tests/rest/client_test.py @@ -15,8 +15,9 @@ # limitations under the License. import unittest +from unittest import mock -from pypaimon.api.client import _parse_error_response +from pypaimon.api.client import HttpClient, _parse_error_response class HttpClientTest(unittest.TestCase): @@ -58,5 +59,35 @@ def test_parse_error_response_with_unparsable_json(self): self.assertEqual(error.resource_name, '') +class HttpClientTimeoutTest(unittest.TestCase): + """HttpClient passes its timeout to ``Session.request``. + + ``Session.timeout`` was previously set as an attribute, which the + requests library does not honour — only ``Session.request(timeout= + ...)`` does. This test pins the fix in place: every outgoing call + must carry the configured ``(connect, read)`` tuple, otherwise the + process would hang forever on a slow upstream. + """ + + def test_session_request_receives_timeout_tuple(self): + client = HttpClient("http://localhost:8080") + # 3-minute connect / read timeouts are the conservative default + # documented on ``HttpClient``; pin them here so the value is + # caught if someone changes them silently. + self.assertEqual(client._timeout, (180, 180)) + + with mock.patch.object(client.session, 'request') as req: + req.return_value = mock.Mock( + status_code=200, text='{}', headers={}, + json=lambda: {}) + client._execute_request('GET', 'http://localhost:8080/x') + + self.assertTrue(req.called, "session.request must be invoked") + kwargs = req.call_args.kwargs + self.assertEqual( + kwargs.get('timeout'), client._timeout, + "request must carry the timeout; Session.timeout is dead code") + + if __name__ == '__main__': unittest.main() diff --git a/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py b/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py index b6ea91e869d5..58d64e9e4822 100644 --- a/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py +++ b/paimon-python/pypaimon/tests/rest/test_exponential_retry_strategy.py @@ -27,36 +27,39 @@ class TestExponentialRetryStrategy(unittest.TestCase): - def setUp(self): - self.retry_strategy = ExponentialRetry(max_retries=5) - def test_basic_retry(self): retry = ExponentialRetry._ExponentialRetry__create_retry_strategy(5) - + self.assertEqual(retry.total, 5) self.assertEqual(retry.read, 5) - self.assertEqual(retry.connect, 0) # Connection errors should not retry - + # Connect failures are intentionally non-retriable — see the + # comment on ``ExponentialRetry.__create_retry_strategy``. + self.assertEqual(retry.connect, 0) + self.assertIn(429, retry.status_forcelist) # Too Many Requests self.assertIn(503, retry.status_forcelist) # Service Unavailable self.assertNotIn(404, retry.status_forcelist) - def test_no_retry_on_connect_error(self): + def test_retry_on_connect_error(self): + # ``connect=0`` means connect errors are not retried — the + # request should fail fast within roughly the connect timeout. + retry_strategy = ExponentialRetry(max_retries=2) session = requests.Session() - session.mount("http://", self.retry_strategy.adapter) - session.mount("https://", self.retry_strategy.adapter) - session.timeout = (1, 1) + session.mount("http://", retry_strategy.adapter) + session.mount("https://", retry_strategy.adapter) start_time = time.time() - + try: session.get("http://192.168.255.255:9999", timeout=(1, 1)) self.fail("Expected ConnectionError") except (ConnectionError, ConnectTimeout, Timeout, NewConnectionError, MaxRetryError): elapsed = time.time() - start_time + # No connect retries → bail out within roughly the connect + # timeout, with no exponential backoff. self.assertLess( elapsed, 5.0, - f"Connection error took {elapsed:.2f}s, should fail quickly without retry" + "connect failures should not be retried (got {:.2f}s)".format(elapsed) )