diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 315ce78..d347a44 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -12,6 +12,10 @@ +## Enhancements + +- Improved docstring documentation across the project. + ## Bug Fixes diff --git a/mkdocs.yml b/mkdocs.yml index b4b95cb..8a598f8 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -112,6 +112,7 @@ plugins: show_source: true show_symbol_type_toc: true signature_crossrefs: true + relative_crossrefs: true inventories: # See https://mkdocstrings.github.io/python/usage/#import for details - https://docs.python.org/3/objects.inv diff --git a/pyproject.toml b/pyproject.toml index 0467ff3..0267bb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -134,6 +134,9 @@ check-yield-types = false arg-type-hints-in-docstring = false arg-type-hints-in-signature = true allow-init-docstring = true +check-class-attributes = true +check-style-mismatch = true +require-inline-class-var-docs = true [tool.pylint.similarities] ignore-comments = ['yes'] diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index ac4bafb..d91f400 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -1,17 +1,17 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""A highlevel interface for the dispatch API. +"""A high-level interface for the dispatch API. A small overview of the most important classes in this module: -* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API. -* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality. -* [ActorDispatcher][frequenz.dispatch.ActorDispatcher]: A service to manage other actors based on +* [`Dispatcher`][.Dispatcher]: The entry point for the API. +* [`Dispatch`][.Dispatch]: A dispatch type with lots of useful extra functionality. +* [`ActorDispatcher`][.ActorDispatcher]: A service to manage other actors based on incoming dispatches. -* [Created][frequenz.dispatch.Created], - [Updated][frequenz.dispatch.Updated], - [Deleted][frequenz.dispatch.Deleted]: Dispatch event types. +* [`Created`][.Created], + [`Updated`][.Updated], + [`Deleted`][.Deleted]: Dispatch event types. """ diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 4419b5e..9c31b52 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -24,7 +24,7 @@ class DispatchActorId(BaseId, str_prefix="DA"): - """ID for a dispatch actor.""" + """An ID for a dispatch actor.""" def __init__(self, dispatch_id: DispatchId | int) -> None: """Initialize the DispatchActorId. @@ -37,26 +37,26 @@ def __init__(self, dispatch_id: DispatchId | int) -> None: @dataclass(frozen=True, kw_only=True) class DispatchInfo: - """Event emitted when the dispatch changes.""" + """An event emitted when the dispatch changes.""" @property @deprecated("'components' is deprecated, use 'target' instead.") def components(self) -> TargetComponents: - """Get the target components. + """The target components. - Deprecation: Deprecated in v0.10.3 - Use [`target`][frequenz.dispatch.DispatchInfo.target] instead. + Warning: Deprecated in v0.10.3 + Use [`target`][..target] instead. """ return self.target target: TargetComponents - """Target components to be used.""" + """The target components.""" dry_run: bool """Whether this is a dry run.""" options: dict[str, Any] - """Additional options.""" + """The additional options.""" _src: Dispatch """The dispatch that triggered this update.""" @@ -70,13 +70,13 @@ def __init__( options: dict[str, Any], _src: Dispatch, ) -> None: - """Initialize the DispatchInfo. + """Initialize a new instance. Args: - target: Target components to be used. + target: The target components to be used. components: Deprecated alias for `target`. dry_run: Whether this is a dry run. - options: Additional options. + options: The additional options. _src: The dispatch that triggered this update. Raises: @@ -103,109 +103,109 @@ def __init__( class ActorDispatcher(BackgroundService): - """Helper class to manage actors based on dispatches. - - Example usage: - - ```python - import os - import asyncio - from typing import override - from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo - from frequenz.client.common.microgrid.components import ComponentCategory - from frequenz.channels import Receiver, Broadcast, select, selected_from - from frequenz.sdk.actor import Actor, run - - class MyActor(Actor): - def __init__( - self, - *, - name: str | None = None, - ) -> None: - super().__init__(name=name) - self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None - self._dry_run: bool = False - self._options: dict[str, Any] = {} - - @classmethod - def new_with_dispatch( - cls, - initial_dispatch: DispatchInfo, - dispatch_updates_receiver: Receiver[DispatchInfo], - *, - name: str | None = None, - ) -> "Self": - self = cls(name=name) - self._dispatch_updates_receiver = dispatch_updates_receiver - self._update_dispatch_information(initial_dispatch) - return self - - @override - async def _run(self) -> None: - other_recv: Receiver[Any] = ... - - if self._dispatch_updates_receiver is None: - async for msg in other_recv: - # do stuff - ... - else: - await self._run_with_dispatch(other_recv) - - async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None: - async for selected in select(self._dispatch_updates_receiver, other_recv): - if selected_from(selected, self._dispatch_updates_receiver): - self._update_dispatch_information(selected.message) - elif selected_from(selected, other_recv): - # do stuff - ... + """A helper class to manage actors based on dispatches. + + Example: + ```python + import os + import asyncio + from typing import Any, Self + from typing import override + from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo + from frequenz.client.common.microgrid.components import ComponentCategory + from frequenz.channels import Receiver, Broadcast, select, selected_from + from frequenz.sdk.actor import Actor, run + + class MyActor(Actor): + def __init__( + self, + *, + name: str | None = None, + ) -> None: + super().__init__(name=name) + self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None + self._dry_run: bool = False + self._options: dict[str, Any] = {} + + @classmethod + def new_with_dispatch( + cls, + initial_dispatch: DispatchInfo, + dispatch_updates_receiver: Receiver[DispatchInfo], + *, + name: str | None = None, + ) -> Self: + self = cls(name=name) + self._dispatch_updates_receiver = dispatch_updates_receiver + self._update_dispatch_information(initial_dispatch) + return self + + @override + async def _run(self) -> None: + other_recv: Receiver[Any] = ... + + if self._dispatch_updates_receiver is None: + async for msg in other_recv: + # do stuff + ... else: - assert False, f"Unexpected selected receiver: {selected}" - - def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None: - print("Received update:", dispatch_update) - self._dry_run = dispatch_update.dry_run - self._options = dispatch_update.options - match dispatch_update.components: - case []: - print("Dispatch: Using all components") - case list() as ids if isinstance(ids[0], int): - component_ids = ids - case [ComponentCategory.BATTERY, *_]: - component_category = ComponentCategory.BATTERY - case unsupported: - print( - "Dispatch: Requested an unsupported selector %r, " - "but only component IDs or category BATTERY are supported.", - unsupported, - ) - - async def main(): - url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com") - auth_key = os.getenv("DISPATCH_API_AUTH_KEY", "some-key") - sign_secret = os.getenv("DISPATCH_API_SIGN_SECRET") - - microgrid_id = 1 - - async with Dispatcher( - microgrid_id=microgrid_id, - server_url=url, - auth_key=auth_key, - sign_secret=sign_secret, - ) as dispatcher: - status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") - - managing_actor = ActorDispatcher( - actor_factory=MyActor.new_with_dispatch, - running_status_receiver=status_receiver, - ) + await self._run_with_dispatch(other_recv) + + async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None: + async for selected in select(self._dispatch_updates_receiver, other_recv): + if selected_from(selected, self._dispatch_updates_receiver): + self._update_dispatch_information(selected.message) + elif selected_from(selected, other_recv): + # do stuff + ... + else: + assert False, f"Unexpected selected receiver: {selected}" + + def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None: + print("Received update:", dispatch_update) + self._dry_run = dispatch_update.dry_run + self._options = dispatch_update.options + match dispatch_update.target: + case []: + print("Dispatch: Using all components") + case list() as ids if isinstance(ids[0], int): + component_ids = ids + case [ComponentCategory.BATTERY, *_]: + component_category = ComponentCategory.BATTERY + case unsupported: + print( + "Dispatch: Requested an unsupported selector %r, " + "but only component IDs or category BATTERY are supported.", + unsupported, + ) + + async def main(): + url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com") + auth_key = os.getenv("DISPATCH_API_AUTH_KEY", "some-key") + sign_secret = os.getenv("DISPATCH_API_SIGN_SECRET") + + microgrid_id = 1 + + async with Dispatcher( + microgrid_id=microgrid_id, + server_url=url, + auth_key=auth_key, + sign_secret=sign_secret, + ) as dispatcher: + status_receiver = await dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE") + + managing_actor = ActorDispatcher( + actor_factory=MyActor.new_with_dispatch, + running_status_receiver=status_receiver, + ) - await run(managing_actor) - ``` + await run(managing_actor) + ``` """ @dataclass(frozen=True, kw_only=True) class ActorAndChannel: - """Actor and its sender.""" + """An actor and its dispatch update sender.""" actor: Actor """The actor.""" @@ -252,7 +252,11 @@ def start(self) -> None: self._tasks.add(asyncio.create_task(self._run())) async def _start_actor(self, dispatch: Dispatch) -> None: - """Start the actor the given dispatch refers to.""" + """Start the actor the given dispatch refers to. + + Args: + dispatch: The dispatch to start the actor for. + """ dispatch_update = DispatchInfo( target=dispatch.target, dry_run=dispatch.dry_run, @@ -298,7 +302,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None: ) async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None: - """Stop all actors. + """Stop the actor for the given dispatch. Args: stopping_dispatch: The dispatch that is stopping the actor. diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 45fcce8..75e5e36 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -38,11 +38,18 @@ class MergeStrategy(ABC): - """Base class for strategies to merge running intervals.""" + """A base class for strategies to merge running intervals.""" @abstractmethod def identity(self, dispatch: Dispatch) -> DispatchActorId: - """Identity function for the merge criteria.""" + """Return the identity key for the merge criteria. + + Args: + dispatch: The dispatch to identify. + + Returns: + The actor ID that identifies this dispatch for merging purposes. + """ @abstractmethod def filter( @@ -61,7 +68,7 @@ def filter( # pylint: disable=too-many-instance-attributes class DispatchScheduler(BackgroundService): - """Dispatch background service. + """A dispatch background service. This service is responsible for managing dispatches and scheduling them based on their start and stop times. @@ -77,6 +84,7 @@ class QueueItem: """ time: datetime + """The scheduled time of the event.""" priority: int """Sort priority when the time is the same. @@ -86,12 +94,20 @@ class QueueItem: stop event. """ dispatch_id: DispatchId + """The ID of the associated dispatch.""" dispatch: Dispatch = field(compare=False) + """The dispatch associated with this event.""" def __init__( self, time: datetime, dispatch: Dispatch, stop_event: bool ) -> None: - """Initialize the queue item.""" + """Initialize the queue item. + + Args: + time: The scheduled time of the event. + dispatch: The dispatch associated with this event. + stop_event: Whether this is a stop event rather than a start event. + """ self.time = time self.dispatch_id = dispatch.id self.priority = int(stop_event) @@ -129,7 +145,7 @@ def __init__( self._scheduled_events: list["DispatchScheduler.QueueItem"] = [] """The scheduled events, sorted by time. - Each event is a tuple of the scheduled time and the dispatch. + Each item holds the scheduled time, priority, and dispatch. heapq is used to keep the list sorted by time, so the next event is always at index 0. """ @@ -164,18 +180,22 @@ async def new_running_state_event_receiver( """Create a new receiver for running state events of the specified type. `merge_strategy` is an instance of a class derived from - [`MergeStrategy`][frequenz.dispatch.MergeStrategy]. Available strategies + [`MergeStrategy`][....MergeStrategy]. Available strategies are: - * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches - of the same type - * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all - dispatches of the same type and target - * `None` — no merging, just send all events + * [`MergeByType`][....MergeByType] — merges all dispatches + of the same type. + * [`MergeByTypeTarget`][....MergeByTypeTarget] — merges all + dispatches of the same type and target. + * `None` — no merging, just send all events. - You can make your own identity-based strategy by subclassing `MergeByType` and overriding - the `identity()` method. If you require a more complex strategy, you can subclass - `MergeStrategy` directly and implement both the `identity()` and `filter()` methods. + You can make your own identity-based strategy by subclassing + [`MergeByType`][....MergeByType] and overriding + the [`identity()`][....MergeStrategy.identity] method. If + you require a more complex strategy, you can subclass + [`MergeStrategy`][....MergeStrategy] directly and implement + both the [`identity()`][....MergeStrategy.identity] and + [`filter()`][....MergeStrategy.filter] methods. Running intervals from multiple dispatches will be merged, according to the chosen strategy. @@ -337,7 +357,7 @@ async def _run(self) -> None: # pylint: disable=too-many-branches pass async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None: - """Execute a scheduled event and schedules the next one if any. + """Execute a scheduled event and schedule the next one if any. Args: dispatch: The dispatch to execute. @@ -363,6 +383,9 @@ async def _fetch(self, timer: Timer) -> None: This is used for the initial fetch and for re-fetching all dispatches if the connection was lost. + + Args: + timer: The timer to update after fetching dispatches. """ self._initial_fetch_event.clear() # We fetch dispatches that would have ended 5 seconds ago to @@ -579,7 +602,7 @@ def _schedule_stop(self, dispatch: Dispatch) -> None: def _update_changed_running_state( self, updated_dispatch: Dispatch, previous_dispatch: Dispatch ) -> bool: - """Check if the running state of a dispatch has changed. + """Return whether the running state of a dispatch has changed. Checks if any of the running state changes to the dispatch require a new message to be sent to the actor so that it can potentially @@ -589,8 +612,8 @@ def _update_changed_running_state( in which case we need to send the message now. Args: - updated_dispatch: The new dispatch - previous_dispatch: The old dispatch + updated_dispatch: The new dispatch. + previous_dispatch: The old dispatch. Returns: True if the running state has changed, False otherwise. diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index bb4fbc8..d69e604 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -12,7 +12,7 @@ @dataclass(frozen=True) class Dispatch(BaseDispatch): - """Dispatch type with extra functionality.""" + """A dispatch type with extra functionality.""" deleted: bool = False """Whether the dispatch is deleted.""" @@ -38,16 +38,12 @@ def _set_deleted(self) -> None: @property def started(self) -> bool: - """Check if the dispatch is started. - - Returns: - True if the dispatch is started, False otherwise. - """ + """Whether the dispatch is currently started.""" now = datetime.now(tz=timezone.utc) return self.started_at(now) def started_at(self, now: datetime) -> bool: - """Check if the dispatch has started. + """Return whether the dispatch has started. A dispatch is considered started if the current time is after the start time but before the end time. @@ -57,31 +53,24 @@ def started_at(self, now: datetime) -> bool: last occurrence. Args: - now: time to use as now + now: The time to use as the current time. Returns: - True if the dispatch is started + True if the dispatch has started at `now`, False otherwise. """ if self.deleted: return False return super().started_at(now) - # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return - # value needs documenting - def missed_runs(self, since: datetime) -> Iterator[datetime]: # noqa: DOC405 + def missed_runs(self, since: datetime) -> Iterator[datetime]: """Yield all missed runs of a dispatch. - Yields all missed runs of a dispatch. - Args: since: The time to start checking for missed runs. - Returns: - A generator that yields all missed runs of a dispatch. - Yields: - datetime: The missed run. + The datetime of each missed run. """ now = datetime.now(tz=timezone.utc) diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index d7a793e..77762c8 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -1,7 +1,7 @@ # License: MIT # Copyright © 2024 Frequenz Energy-as-a-Service GmbH -"""A highlevel interface for the dispatch API.""" +"""A high-level interface for the dispatch API.""" from __future__ import annotations @@ -27,18 +27,17 @@ class Dispatcher(BackgroundService): - """A highlevel interface for the dispatch API. + """A high-level interface for the dispatch API. - This class provides a highlevel interface to the dispatch API. + This class provides a high-level interface to the dispatch API. It provides receivers for various events and management of actors based on dispatches. The receivers shortly explained: - * [Lifecycle events receiver][frequenz.dispatch.Dispatcher.new_lifecycle_events_receiver]: + * [Lifecycle events receiver][.new_lifecycle_events_receiver]: Receives an event whenever a dispatch is created, updated or deleted. - * [Running status change - receiver][frequenz.dispatch.Dispatcher.new_running_state_event_receiver]: + * [Running status change receiver][.new_running_state_event_receiver]: Receives an event whenever the running status of a dispatch changes. The running status of a dispatch can change due to a variety of reasons, such as but not limited to the dispatch being started, stopped, modified @@ -50,7 +49,9 @@ class Dispatcher(BackgroundService): Example: Managing an actor ```python import os - from frequenz.dispatch import Dispatcher, MergeByType + from frequenz.channels import Receiver + from frequenz.dispatch import Dispatcher, MergeByType, DispatchInfo + from frequenz.sdk.actor import Actor from unittest.mock import MagicMock async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) -> Actor: @@ -58,7 +59,7 @@ async def create_actor(dispatch: DispatchInfo, receiver: Receiver[DispatchInfo]) async def run(): url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com") - key = os.getenv("DISPATCH_API_KEY", "some-key") + key = os.getenv("DISPATCH_API_KEY", "some-key") microgrid_id = 1 @@ -67,7 +68,7 @@ async def run(): server_url=url, auth_key=key ) as dispatcher: - dispatcher.start_managing( + await dispatcher.start_managing( dispatch_type="DISPATCH_TYPE", actor_factory=create_actor, merge_strategy=MergeByType(), @@ -84,7 +85,7 @@ async def run(): async def run(): url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com") - key = os.getenv("DISPATCH_API_KEY", "some-key") + key = os.getenv("DISPATCH_API_KEY", "some-key") microgrid_id = 1 @@ -95,7 +96,7 @@ async def run(): ) as dispatcher: actor = MagicMock() # replace with your actor - rs_receiver = dispatcher.new_running_state_event_receiver("DISPATCH_TYPE") + rs_receiver = await dispatcher.new_running_state_event_receiver("DISPATCH_TYPE") async for dispatch in rs_receiver: if dispatch.started: @@ -129,7 +130,7 @@ async def run(): async def run(): url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com") - key = os.getenv("DISPATCH_API_KEY", "some-key") + key = os.getenv("DISPATCH_API_KEY", "some-key") microgrid_id = 1 @@ -166,7 +167,7 @@ async def run(): async def run(): url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com") - key = os.getenv("DISPATCH_API_KEY", "some-key") + key = os.getenv("DISPATCH_API_KEY", "some-key") microgrid_id = 1 @@ -215,16 +216,16 @@ def __init__( """Initialize the dispatcher. Args: - microgrid_id: The microgrid id. + microgrid_id: The microgrid ID. server_url: The URL of the dispatch service. key: The key to access the service, deprecated, use `auth_key` instead. auth_key: The authentication key to access the service. - sign_secret: The secret to sign the requests, optional + sign_secret: The optional secret to sign the requests. call_timeout: The timeout for API calls. stream_timeout: The timeout for streaming API calls. Raises: - ValueError: If both or neither `key` and `auth_key` are provided + ValueError: If both or neither `key` and `auth_key` are provided. """ super().__init__() @@ -283,7 +284,11 @@ async def wait(self) -> None: self._actor_dispatchers.clear() def cancel(self, msg: str | None = None) -> None: - """Stop the local dispatch service and initiate client disconnection.""" + """Stop the local dispatch service and initiate client disconnection. + + Args: + msg: An optional message to include in the cancellation. + """ self._bg_service.cancel(msg) for instance in self._actor_dispatchers.values(): @@ -297,7 +302,7 @@ async def wait_for_initialization(self) -> None: await self._bg_service.wait_for_initialization() def is_managed(self, dispatch_type: str) -> bool: - """Check if the dispatcher is managing actors for a given dispatch type. + """Return whether the dispatcher is managing actors for the given dispatch type. Args: dispatch_type: The type of the dispatch to check. @@ -320,21 +325,21 @@ async def start_managing( """Manage actors for a given dispatch type. Creates and manages an - [`ActorDispatcher`][frequenz.dispatch.ActorDispatcher] for the given type that will + [`ActorDispatcher`][...ActorDispatcher] for the given type that will start, stop and reconfigure actors based on received dispatches. - You can await the `Dispatcher` instance to block until all types - registered with `start_managing()` are stopped using - `stop_managing()` + You can await the [`Dispatcher`][...Dispatcher] instance to block + until all types registered with [`start_managing`][..start_managing] are stopped + using [`stop_managing`][..stop_managing]. "Merging" means that when multiple dispatches are active at the same time, the intervals are merged into one. This also decides how instances are mapped from dispatches to actors: - * [`MergeByType`][frequenz.dispatch.MergeByType] — All dispatches map to + * [`MergeByType`][...MergeByType] — All dispatches map to one single instance identified by the dispatch type and dry_run status. - * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — A + * [`MergeByTypeTarget`][...MergeByTypeTarget] — A dispatch maps to an instance identified by the dispatch type, dry_run status and target. So different dispatches with equal type and target will map to the same instance. @@ -344,7 +349,7 @@ async def start_managing( dispatch_type: The type of the dispatch to manage. actor_factory: The factory to create actors. merge_strategy: The strategy to merge running intervals. - retry_interval: Retry interval for when actor creation fails. + retry_interval: The retry interval to use when actor creation fails. """ dispatcher = self._actor_dispatchers.get(dispatch_type) @@ -389,7 +394,7 @@ async def stop_managing(self, dispatch_type: str) -> None: @property def client(self) -> DispatchApiClient: - """Return the client.""" + """The underlying dispatch API client.""" return self._client @override @@ -409,13 +414,13 @@ async def __aenter__(self) -> Self: def new_lifecycle_events_receiver( self, dispatch_type: str ) -> Receiver[DispatchEvent]: - """Return new, updated or deleted dispatches receiver. + """Return a receiver for dispatch lifecycle events. Args: dispatch_type: The type of the dispatch to listen for. Returns: - A new receiver for new dispatches. + A receiver for dispatch lifecycle events of the given type. """ return self._bg_service.new_lifecycle_events_receiver(dispatch_type) @@ -425,7 +430,7 @@ async def new_running_state_event_receiver( *, merge_strategy: MergeStrategy | None = None, ) -> Receiver[Dispatch]: - """Return running state event receiver. + """Return a running state event receiver. This receiver will receive a message whenever the current running status of a dispatch changes. @@ -452,14 +457,14 @@ async def new_running_state_event_receiver( - The dispatch was deleted `merge_strategy` is an instance of a class derived from - [`MergeStrategy`][frequenz.dispatch.MergeStrategy] Available strategies + [`MergeStrategy`][...MergeStrategy]. Available strategies are: - * [`MergeByType`][frequenz.dispatch.MergeByType] — merges all dispatches - of the same type - * [`MergeByTypeTarget`][frequenz.dispatch.MergeByTypeTarget] — merges all - dispatches of the same type and target - * `None` — no merging, just send all events (default) + * [`MergeByType`][...MergeByType] — merges all dispatches + of the same type. + * [`MergeByTypeTarget`][...MergeByTypeTarget] — merges all + dispatches of the same type and target. + * `None` — no merging, just send all events (default). Running intervals from multiple dispatches will be merged, according to the chosen strategy. @@ -469,7 +474,7 @@ async def new_running_state_event_receiver( Args: dispatch_type: The type of the dispatch to listen for. - merge_strategy: The type of the strategy to merge running intervals. + merge_strategy: The strategy to use for merging running intervals. Returns: A new receiver for dispatches whose running status changed. diff --git a/src/frequenz/dispatch/_event.py b/src/frequenz/dispatch/_event.py index f70d0dd..70d2e64 100644 --- a/src/frequenz/dispatch/_event.py +++ b/src/frequenz/dispatch/_event.py @@ -33,7 +33,7 @@ class Deleted: DispatchEvent = Created | Updated | Deleted -"""Type that is sent over the channel for dispatch updates. +"""The type representing dispatch lifecycle events. This type is used to send dispatches that were created, updated or deleted over the channel. diff --git a/src/frequenz/dispatch/_merge_strategies.py b/src/frequenz/dispatch/_merge_strategies.py index bb962f7..7c6b163 100644 --- a/src/frequenz/dispatch/_merge_strategies.py +++ b/src/frequenz/dispatch/_merge_strategies.py @@ -25,22 +25,35 @@ def _hash_positive(args: Any) -> int: class MergeByType(MergeStrategy): - """Merge running intervals based on the dispatch type.""" + """A merge strategy that combines running intervals based on dispatch type.""" @override def identity(self, dispatch: Dispatch) -> DispatchActorId: - """Identity function for the merge criteria.""" + """Return the actor identity for a dispatch based on its type. + + Args: + dispatch: The dispatch to compute the identity for. + + Returns: + An identity value grouping dispatches with the same type and dry-run flag. + """ return DispatchActorId(_hash_positive((dispatch.type, dispatch.dry_run))) @override def filter( self, dispatches: Mapping[DispatchId, Dispatch], dispatch: Dispatch ) -> bool: - """Filter dispatches based on the merge strategy. + """Return whether the dispatch event should be propagated. + + Start events are always propagated. Stop events are only propagated + if no other dispatch matching this strategy's criteria is still running. - Keeps start events. - Keeps stop events only if no other dispatches matching the - strategy's criteria are running. + Args: + dispatches: The currently known dispatches, keyed by their ID. + dispatch: The dispatch event to evaluate. + + Returns: + `True` if the event should be forwarded to consumers, `False` otherwise. """ now = datetime.now(tz=timezone.utc) @@ -83,11 +96,19 @@ def filter( class MergeByTypeTarget(MergeByType): - """Merge running intervals based on the dispatch type and target.""" + """A merge strategy that combines running intervals based on dispatch type and target.""" @override def identity(self, dispatch: Dispatch) -> DispatchActorId: - """Identity function for the merge criteria.""" + """Return the actor identity for a dispatch based on its type and target. + + Args: + dispatch: The dispatch to compute the identity for. + + Returns: + An identity value grouping dispatches with the same type, dry-run flag, + and target. + """ return DispatchActorId( _hash_positive((dispatch.type, dispatch.dry_run, tuple(dispatch.target))) )