diff --git a/distributed/client.py b/distributed/client.py index f017612bd5..3667bb46fb 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -151,11 +151,11 @@ class FutureCancelledError(CancelledError): - key: str + key: Key reason: str msg: str | None - def __init__(self, key: str, reason: str | None, msg: str | None = None): + def __init__(self, key: Key, reason: str | None, msg: str | None = None): self.key = key self.reason = reason if reason else "unknown" self.msg = msg @@ -163,9 +163,10 @@ def __init__(self, key: str, reason: str | None, msg: str | None = None): def __str__(self) -> str: result = f"{self.key} cancelled for reason: {self.reason}." if self.msg: - result = "\n".join([result, self.msg]) + result += f"\n{self.msg}" return result + # Workaround to tblib <3.2.1 def __reduce__(self): return self.__class__, (self.key, self.reason, self.msg) @@ -295,21 +296,35 @@ class Future(TaskRef, Generic[_T]): Client: Creates futures """ + key: Key + _client: Client | None + _input_state: FutureState | None + _state: FutureState | None + _id: tuple[str, int] + _cleared: bool + _is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing) + _cb_executor: ClassVar[ThreadPoolExecutor | None] = None + _cb_executor_pid: ClassVar[int | None] = None - _cb_executor = None - _cb_executor_pid = None - _counter = itertools.count() - # Make sure this stays unique even across multiple processes or hosts - _uid = uuid.uuid4().hex + # Generate _id of Future instances as (_uid, next(_counter)) + # Make sure _uid stays unique even across multiple processes or hosts + _uid: ClassVar[str] = uuid.uuid4().hex + _counter: ClassVar[itertools.count[int]] = itertools.count() - def __init__(self, key, client=None, state=None, _id=None): + def __init__( + self, + key: Key, + client: Client | None = None, + state: FutureState | None = None, + _id: tuple[str, int] | None = None, + ): self.key = key - self._cleared = False self._client = client - self._id = _id or (Future._uid, next(Future._counter)) self._input_state = state self._state = None + self._id = _id or (Future._uid, next(Future._counter)) + self._cleared = False self._bind_late() @property @@ -651,7 +666,10 @@ class FutureState: __slots__ = ("_event", "key", "status", "type", "exception", "traceback") - def __init__(self, key: str): + key: Key + status: Literal["pending", "cancelled", "finished", "lost", "error"] + + def __init__(self, key: Key): self._event = None self.key = key self.exception = None