Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,22 @@


class FutureCancelledError(CancelledError):
key: str
key: Key
reason: str
msg: str | None

def __init__(self, key: str, reason: str | None, msg: str | None = None):
def __init__(self, key: Key, reason: str | None, msg: str | None = None):
self.key = key
self.reason = reason if reason else "unknown"
self.msg = msg

def __str__(self) -> str:
result = f"{self.key} cancelled for reason: {self.reason}."
if self.msg:
result = "\n".join([result, self.msg])
result += f"\n{self.msg}"
return result

# Workaround to tblib <3.2.1
def __reduce__(self):
return self.__class__, (self.key, self.reason, self.msg)

Expand Down Expand Up @@ -295,21 +296,35 @@ class Future(TaskRef, Generic[_T]):
Client: Creates futures
"""

key: Key
_client: Client | None
_input_state: FutureState | None
_state: FutureState | None
_id: tuple[str, int]
_cleared: bool

_is_finalizing: staticmethod[[], bool] = staticmethod(sys.is_finalizing)
_cb_executor: ClassVar[ThreadPoolExecutor | None] = None
_cb_executor_pid: ClassVar[int | None] = None

_cb_executor = None
_cb_executor_pid = None
_counter = itertools.count()
# Make sure this stays unique even across multiple processes or hosts
_uid = uuid.uuid4().hex
# Generate _id of Future instances as (_uid, next(_counter))
# Make sure _uid stays unique even across multiple processes or hosts
_uid: ClassVar[str] = uuid.uuid4().hex
_counter: ClassVar[itertools.count[int]] = itertools.count()

def __init__(self, key, client=None, state=None, _id=None):
def __init__(
self,
key: Key,
client: Client | None = None,
state: FutureState | None = None,
_id: tuple[str, int] | None = None,
):
self.key = key
self._cleared = False
self._client = client
self._id = _id or (Future._uid, next(Future._counter))
self._input_state = state
self._state = None
self._id = _id or (Future._uid, next(Future._counter))
self._cleared = False
self._bind_late()

@property
Expand Down Expand Up @@ -651,7 +666,10 @@ class FutureState:

__slots__ = ("_event", "key", "status", "type", "exception", "traceback")

def __init__(self, key: str):
key: Key
status: Literal["pending", "cancelled", "finished", "lost", "error"]

def __init__(self, key: Key):
self._event = None
self.key = key
self.exception = None
Expand Down
Loading