Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

<!-- Here goes the main new features and examples or instructions on how to use them -->

## Enhancements

- Improved docstring documentation across the project.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ plugins:
show_source: true
show_symbol_type_toc: true
signature_crossrefs: true
relative_crossrefs: true
inventories:
# See https://mkdocstrings.github.io/python/usage/#import for details
- https://docs.python.org/3/objects.inv
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ check-yield-types = false
arg-type-hints-in-docstring = false
arg-type-hints-in-signature = true
allow-init-docstring = true
check-class-attributes = true
check-style-mismatch = true
require-inline-class-var-docs = true

[tool.pylint.similarities]
ignore-comments = ['yes']
Expand Down
14 changes: 7 additions & 7 deletions src/frequenz/dispatch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# License: MIT
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH

"""A highlevel interface for the dispatch API.
"""A high-level interface for the dispatch API.

A small overview of the most important classes in this module:

* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
* [ActorDispatcher][frequenz.dispatch.ActorDispatcher]: A service to manage other actors based on
* [`Dispatcher`][.Dispatcher]: The entry point for the API.
* [`Dispatch`][.Dispatch]: A dispatch type with lots of useful extra functionality.
* [`ActorDispatcher`][.ActorDispatcher]: A service to manage other actors based on
incoming dispatches.
* [Created][frequenz.dispatch.Created],
[Updated][frequenz.dispatch.Updated],
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
* [`Created`][.Created],
[`Updated`][.Updated],
[`Deleted`][.Deleted]: Dispatch event types.

"""

Expand Down
222 changes: 113 additions & 109 deletions src/frequenz/dispatch/_actor_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@


class DispatchActorId(BaseId, str_prefix="DA"):
"""ID for a dispatch actor."""
"""An ID for a dispatch actor."""

def __init__(self, dispatch_id: DispatchId | int) -> None:
"""Initialize the DispatchActorId.
Expand All @@ -37,26 +37,26 @@ def __init__(self, dispatch_id: DispatchId | int) -> None:

@dataclass(frozen=True, kw_only=True)
class DispatchInfo:
"""Event emitted when the dispatch changes."""
"""An event emitted when the dispatch changes."""

@property
@deprecated("'components' is deprecated, use 'target' instead.")
def components(self) -> TargetComponents:
"""Get the target components.
"""The target components.

Deprecation: Deprecated in v0.10.3
Use [`target`][frequenz.dispatch.DispatchInfo.target] instead.
Warning: Deprecated in v0.10.3
Use [`target`][..target] instead.
"""
return self.target

target: TargetComponents
"""Target components to be used."""
"""The target components."""

dry_run: bool
"""Whether this is a dry run."""

options: dict[str, Any]
"""Additional options."""
"""The additional options."""

_src: Dispatch
"""The dispatch that triggered this update."""
Expand All @@ -70,13 +70,13 @@ def __init__(
options: dict[str, Any],
_src: Dispatch,
) -> None:
"""Initialize the DispatchInfo.
"""Initialize a new instance.

Args:
target: Target components to be used.
target: The target components to be used.
components: Deprecated alias for `target`.
dry_run: Whether this is a dry run.
options: Additional options.
options: The additional options.
_src: The dispatch that triggered this update.

Raises:
Expand All @@ -103,109 +103,109 @@ def __init__(


class ActorDispatcher(BackgroundService):
"""Helper class to manage actors based on dispatches.

Example usage:

```python
import os
import asyncio
from typing import override
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
from frequenz.sdk.actor import Actor, run

class MyActor(Actor):
def __init__(
self,
*,
name: str | None = None,
) -> None:
super().__init__(name=name)
self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None
self._dry_run: bool = False
self._options: dict[str, Any] = {}

@classmethod
def new_with_dispatch(
cls,
initial_dispatch: DispatchInfo,
dispatch_updates_receiver: Receiver[DispatchInfo],
*,
name: str | None = None,
) -> "Self":
self = cls(name=name)
self._dispatch_updates_receiver = dispatch_updates_receiver
self._update_dispatch_information(initial_dispatch)
return self

@override
async def _run(self) -> None:
other_recv: Receiver[Any] = ...

if self._dispatch_updates_receiver is None:
async for msg in other_recv:
# do stuff
...
else:
await self._run_with_dispatch(other_recv)

async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
async for selected in select(self._dispatch_updates_receiver, other_recv):
if selected_from(selected, self._dispatch_updates_receiver):
self._update_dispatch_information(selected.message)
elif selected_from(selected, other_recv):
# do stuff
...
"""A helper class to manage actors based on dispatches.

Example:
```python
import os
import asyncio
from typing import Any, Self
from typing import override
from frequenz.dispatch import Dispatcher, ActorDispatcher, DispatchInfo
from frequenz.client.common.microgrid.components import ComponentCategory
from frequenz.channels import Receiver, Broadcast, select, selected_from
from frequenz.sdk.actor import Actor, run

class MyActor(Actor):
def __init__(
self,
*,
name: str | None = None,
) -> None:
super().__init__(name=name)
self._dispatch_updates_receiver: Receiver[DispatchInfo] | None = None
self._dry_run: bool = False
self._options: dict[str, Any] = {}

@classmethod
def new_with_dispatch(
cls,
initial_dispatch: DispatchInfo,
dispatch_updates_receiver: Receiver[DispatchInfo],
*,
name: str | None = None,
) -> Self:
self = cls(name=name)
self._dispatch_updates_receiver = dispatch_updates_receiver
self._update_dispatch_information(initial_dispatch)
return self

@override
async def _run(self) -> None:
other_recv: Receiver[Any] = ...

if self._dispatch_updates_receiver is None:
async for msg in other_recv:
# do stuff
...
else:
assert False, f"Unexpected selected receiver: {selected}"

def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None:
print("Received update:", dispatch_update)
self._dry_run = dispatch_update.dry_run
self._options = dispatch_update.options
match dispatch_update.components:
case []:
print("Dispatch: Using all components")
case list() as ids if isinstance(ids[0], int):
component_ids = ids
case [ComponentCategory.BATTERY, *_]:
component_category = ComponentCategory.BATTERY
case unsupported:
print(
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)

async def main():
url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com")
auth_key = os.getenv("DISPATCH_API_AUTH_KEY", "some-key")
sign_secret = os.getenv("DISPATCH_API_SIGN_SECRET")

microgrid_id = 1

async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
auth_key=auth_key,
sign_secret=sign_secret,
) as dispatcher:
status_receiver = dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)
await self._run_with_dispatch(other_recv)

async def _run_with_dispatch(self, other_recv: Receiver[Any]) -> None:
async for selected in select(self._dispatch_updates_receiver, other_recv):
if selected_from(selected, self._dispatch_updates_receiver):
self._update_dispatch_information(selected.message)
elif selected_from(selected, other_recv):
# do stuff
...
else:
assert False, f"Unexpected selected receiver: {selected}"

def _update_dispatch_information(self, dispatch_update: DispatchInfo) -> None:
print("Received update:", dispatch_update)
self._dry_run = dispatch_update.dry_run
self._options = dispatch_update.options
match dispatch_update.target:
case []:
print("Dispatch: Using all components")
case list() as ids if isinstance(ids[0], int):
component_ids = ids
case [ComponentCategory.BATTERY, *_]:
component_category = ComponentCategory.BATTERY
case unsupported:
print(
"Dispatch: Requested an unsupported selector %r, "
"but only component IDs or category BATTERY are supported.",
unsupported,
)

async def main():
url = os.getenv("DISPATCH_API_URL", "grpc://dispatch.url.goes.here.example.com")
auth_key = os.getenv("DISPATCH_API_AUTH_KEY", "some-key")
sign_secret = os.getenv("DISPATCH_API_SIGN_SECRET")

microgrid_id = 1

async with Dispatcher(
microgrid_id=microgrid_id,
server_url=url,
auth_key=auth_key,
sign_secret=sign_secret,
) as dispatcher:
status_receiver = await dispatcher.new_running_state_event_receiver("EXAMPLE_TYPE")

managing_actor = ActorDispatcher(
actor_factory=MyActor.new_with_dispatch,
running_status_receiver=status_receiver,
)

await run(managing_actor)
```
await run(managing_actor)
```
"""

@dataclass(frozen=True, kw_only=True)
class ActorAndChannel:
"""Actor and its sender."""
"""An actor and its dispatch update sender."""

actor: Actor
"""The actor."""
Expand Down Expand Up @@ -252,7 +252,11 @@ def start(self) -> None:
self._tasks.add(asyncio.create_task(self._run()))

async def _start_actor(self, dispatch: Dispatch) -> None:
"""Start the actor the given dispatch refers to."""
"""Start the actor the given dispatch refers to.

Args:
dispatch: The dispatch to start the actor for.
"""
dispatch_update = DispatchInfo(
target=dispatch.target,
dry_run=dispatch.dry_run,
Expand Down Expand Up @@ -298,7 +302,7 @@ async def _start_actor(self, dispatch: Dispatch) -> None:
)

async def _stop_actor(self, stopping_dispatch: Dispatch, msg: str) -> None:
"""Stop all actors.
"""Stop the actor for the given dispatch.

Args:
stopping_dispatch: The dispatch that is stopping the actor.
Expand Down
Loading
Loading