Index
frequenz.sdk.actor
¤
A base class for creating simple composable actors.
Classes¤
frequenz.sdk.actor.Actor
¤
Bases: BackgroundService
, ABC
A primitive unit of computation that runs autonomously.
From Wikipedia, an actor is:
[...] the basic building block of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received. Actors may modify their own private state, but can only affect each other indirectly through messaging (removing the need for lock-based synchronization).
Channels can be used to implement communication between actors, as shown in the examples below.
To implement an actor, subclasses must implement the _run()
method, which should
run the actor's logic. The _run()
method is called by the base class when the
actor is started, and is expected to run until the actor is stopped.
If an unhandled exception is raised in the _run()
method, the actor will be
restarted automatically. Unhandled BaseException
s will cause the actor to stop
immediately and will be re-raised.
Warning
As actors manage asyncio.Task
objects, a reference to them must be held
for as long as the actor is expected to be running, otherwise its tasks will be
cancelled and the actor will stop. For more information, please refer to the
Python asyncio
documentation.
Example: Example of an actor receiving from two receivers
```python
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels.util import select, selected_from
class EchoActor(Actor):
def __init__(
self,
recv1: Receiver[bool],
recv2: Receiver[bool],
output: Sender[bool],
) -> None:
super().__init__()
self._recv1 = recv1
self._recv2 = recv2
self._output = output
async def _run(self) -> None:
async for selected in select(self._recv1, self._recv2):
if selected_from(selected, self._recv1):
await self._output.send(selected.value)
elif selected_from(selected, self._recv1):
await self._output.send(selected.value)
else:
assert False, "Unknown selected channel"
input_channel_1 = Broadcast[bool]("input_channel_1")
input_channel_2 = Broadcast[bool]("input_channel_2")
input_channel_2_sender = input_channel_2.new_sender()
echo_channel = Broadcast[bool]("EchoChannel")
echo_receiver = echo_channel.new_receiver()
async with EchoActor(
input_channel_1.new_receiver(),
input_channel_2.new_receiver(),
echo_channel.new_sender(),
):
await input_channel_2_sender.send(True)
print(await echo_receiver.receive())
```
Example: Example of composing two actors
```python
from frequenz.channels import Broadcast, Receiver, Sender
class Actor1(Actor):
def __init__(
self,
recv: Receiver[bool],
output: Sender[bool],
) -> None:
super().__init__()
self._recv = recv
self._output = output
async def _run(self) -> None:
async for msg in self._recv:
await self._output.send(msg)
class Actor2(Actor):
def __init__(
self,
recv: Receiver[bool],
output: Sender[bool],
) -> None:
super().__init__()
self._recv = recv
self._output = output
async def _run(self) -> None:
async for msg in self._recv:
await self._output.send(msg)
input_channel: Broadcast[bool] = Broadcast("Input to Actor1")
middle_channel: Broadcast[bool] = Broadcast("Actor1 -> Actor2 stream")
output_channel: Broadcast[bool] = Broadcast("Actor2 output")
input_sender = input_channel.new_sender()
output_receiver = output_channel.new_receiver()
async with (
Actor1(input_channel.new_receiver(), middle_channel.new_sender()),
Actor2(middle_channel.new_receiver(), output_channel.new_sender()),
):
await input_sender.send(True)
print(await output_receiver.receive())
```
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_actor.py
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
|
frequenz.sdk.actor.BackgroundService
¤
Bases: ABC
A background service that can be started and stopped.
A background service is a service that runs in the background spawning one or more tasks. The service can be started and stopped and can work as an async context manager to provide deterministic cleanup.
To implement a background service, subclasses must implement the
start()
method, which should
start the background tasks needed by the service, and add them to the _tasks
protected attribute.
If you need to collect results or handle exceptions of the tasks when stopping the
service, then you need to also override the
stop()
method, as the base
implementation does not collect any results and re-raises all exceptions.
Warning
As background services manage asyncio.Task
objects, a reference to them
must be held for as long as the background service is expected to be running,
otherwise its tasks will be cancelled and the service will stop. For more
information, please refer to the Python asyncio
documentation.
Example
import datetime
import asyncio
class Clock(BackgroundService):
def __init__(self, resolution_s: float, *, name: str | None = None) -> None:
super().__init__(name=name)
self._resolution_s = resolution_s
def start(self) -> None:
self._tasks.add(asyncio.create_task(self._tick()))
async def _tick(self) -> None:
while True:
await asyncio.sleep(self._resolution_s)
print(datetime.datetime.now())
async def main() -> None:
# As an async context manager
async with Clock(resolution_s=1):
await asyncio.sleep(5)
# Manual start/stop (only use if necessary, as cleanup is more complicated)
clock = Clock(resolution_s=1)
clock.start()
await asyncio.sleep(5)
await clock.stop()
asyncio.run(main())
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
|
Attributes¤
is_running: bool
property
¤
Return whether this background service is running.
A service is considered running when at least one task is running.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether this background service is running. |
name: str
property
¤
The name of this background service.
RETURNS | DESCRIPTION |
---|---|
str
|
The name of this background service. |
tasks: collections.abc.Set[asyncio.Task[Any]]
property
¤
Return the set of running tasks spawned by this background service.
Users typically should not modify the tasks in the returned set and only use them for informational purposes.
Danger
Changing the returned tasks may lead to unexpected behavior, don't do it unless the class explicitly documents it is safe to do so.
RETURNS | DESCRIPTION |
---|---|
Set[Task[Any]]
|
The set of running tasks spawned by this background service. |
Functions¤
__aenter__()
async
¤
Enter an async context.
Start this background service.
RETURNS | DESCRIPTION |
---|---|
Self
|
This background service. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
__aexit__(exc_type, exc_val, exc_tb)
async
¤
Exit an async context.
Stop this background service.
PARAMETER | DESCRIPTION |
---|---|
exc_type |
The type of the exception raised, if any.
TYPE:
|
exc_val |
The exception raised, if any.
TYPE:
|
exc_tb |
The traceback of the exception raised, if any.
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
__await__()
¤
Await this background service.
An awaited background service will wait for all its tasks to finish.
RETURNS | DESCRIPTION |
---|---|
Generator[None, None, None]
|
An implementation-specific generator for the awaitable. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
__del__()
¤
Destroy this instance.
Cancel all running tasks spawned by this background service.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
__init__(*, name=None)
¤
Initialize this BackgroundService.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of this background service. If
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
__repr__()
¤
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
__str__()
¤
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
cancel(msg=None)
¤
Cancel all running tasks spawned by this background service.
PARAMETER | DESCRIPTION |
---|---|
msg |
The message to be passed to the tasks being cancelled.
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
start()
abstractmethod
¤
stop(msg=None)
async
¤
Stop this background service.
This method cancels all running tasks spawned by this service and waits for them to finish.
PARAMETER | DESCRIPTION |
---|---|
msg |
The message to be passed to the tasks being cancelled.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an exception. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
wait()
async
¤
Wait this background service to finish.
Wait until all background service tasks are finished.
RAISES | DESCRIPTION |
---|---|
BaseExceptionGroup
|
If any of the tasks spawned by this service raised an
exception ( |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_background_service.py
frequenz.sdk.actor.ChannelRegistry
¤
Dynamically creates, own and provide access to channels.
It can be used by actors to dynamically establish a communication channel between each other. Channels are identified by string names.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_channel_registry.py
Functions¤
__init__(*, name)
¤
Create a ChannelRegistry
instance.
PARAMETER | DESCRIPTION |
---|---|
name |
A unique name for the registry.
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_channel_registry.py
new_receiver(key)
¤
Get a receiver to a dynamically created channel with the given key.
PARAMETER | DESCRIPTION |
---|---|
key |
A key to identify the channel.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[Any]
|
A receiver for a dynamically created channel with the given key. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_channel_registry.py
new_sender(key)
¤
Get a sender to a dynamically created channel with the given key.
PARAMETER | DESCRIPTION |
---|---|
key |
A key to identify the channel.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Sender[Any]
|
A sender to a dynamically created channel with the given key. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_channel_registry.py
frequenz.sdk.actor.ComponentMetricRequest
dataclass
¤
A request object to start streaming a metric for a component.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py
Attributes¤
component_id: int
instance-attribute
¤
The ID of the requested component.
metric_id: ComponentMetricId
instance-attribute
¤
The ID of the requested component's metric.
namespace: str
instance-attribute
¤
The namespace that this request belongs to.
Metric requests with a shared namespace enable the reuse of channels within that namespace.
If for example, an actor making a multiple requests, uses the name of the actor as the namespace, then requests from the actor will get reused when possible.
start_time: Optional[datetime]
instance-attribute
¤
The start time from which data is required.
When None, we will stream only live data.
Functions¤
get_channel_name()
¤
Return a channel name constructed from Self.
This channel name can be used by the sending side and receiving sides to identify the right channel from the ChannelRegistry.
RETURNS | DESCRIPTION |
---|---|
str
|
A string denoting a channel name. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_data_sourcing/microgrid_api_source.py
frequenz.sdk.actor.ComponentMetricsResamplingActor
¤
Bases: Actor
An actor to resample microgrid component metrics.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_resampling.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
|
Functions¤
__init__(*, channel_registry, data_sourcing_request_sender, resampling_request_receiver, config, name=None)
¤
Initialize an instance.
PARAMETER | DESCRIPTION |
---|---|
channel_registry |
The channel registry used to get senders and receivers for data sourcing subscriptions.
TYPE:
|
data_sourcing_request_sender |
The sender used to send requests to
the
TYPE:
|
resampling_request_receiver |
The receiver to use to receive new resampmling subscription requests.
TYPE:
|
config |
The configuration for the resampler.
TYPE:
|
name |
The name of the actor. If
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_resampling.py
frequenz.sdk.actor.ConfigManagingActor
¤
Bases: Actor
An actor that monitors a TOML configuration file for updates.
When the file is updated, the new configuration is sent, as a dict
, to the
output
sender.
When the actor is started, if a configuration file already exists, then it will be
read and sent to the output
sender before the actor starts monitoring the file
for updates. This way users can rely on the actor to do the initial configuration
reading too.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_config_managing.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
|
Functions¤
__init__(config_path, output, event_types=frozenset(FileWatcher.EventType), *, name=None)
¤
Initialize this instance.
PARAMETER | DESCRIPTION |
---|---|
config_path |
The path to the TOML file with the configuration. |
output |
The sender to send the config to. |
event_types |
The set of event types to monitor. |
name |
The name of the actor. If
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_config_managing.py
send_config()
async
¤
Send the configuration to the output sender.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_config_managing.py
frequenz.sdk.actor.DataSourcingActor
¤
Bases: Actor
An actor that provides data streams of metrics as time series.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_data_sourcing/data_sourcing.py
Functions¤
__init__(request_receiver, registry, *, name=None)
¤
Create a DataSourcingActor
instance.
PARAMETER | DESCRIPTION |
---|---|
request_receiver |
A channel receiver to accept metric requests from.
TYPE:
|
registry |
A channel registry. To be replaced by a singleton instance.
TYPE:
|
name |
The name of the actor. If
TYPE:
|
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/actor/_data_sourcing/data_sourcing.py
frequenz.sdk.actor.ResamplerConfig
dataclass
¤
Resampler configuration.
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/timeseries/_resampling.py
128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
|
Attributes¤
align_to: datetime | None = UNIX_EPOCH
class-attribute
instance-attribute
¤
The time to align the resampling period to.
The resampling period will be aligned to this time, so the first resampled
sample will be at the first multiple of resampling_period
starting from
align_to
. It must be an aware datetime and can be in the future too.
If align_to
is None
, the resampling period will be aligned to the
time the resampler is created.
initial_buffer_len: int = DEFAULT_BUFFER_LEN_INIT
class-attribute
instance-attribute
¤
The initial length of the resampling buffer.
The buffer could grow or shrink depending on the source properties, like sampling rate, to make sure all the requested past sampling periods can be stored.
It must be at least 1 and at most max_buffer_len
.
max_buffer_len: int = DEFAULT_BUFFER_LEN_MAX
class-attribute
instance-attribute
¤
The maximum length of the resampling buffer.
Buffers won't be allowed to grow beyond this point even if it would be needed to keep all the requested past sampling periods. An error will be emitted in the logs if the buffer length needs to be truncated to this value.
It must be at bigger than warn_buffer_len
.
max_data_age_in_periods: float = 3.0
class-attribute
instance-attribute
¤
The maximum age a sample can have to be considered relevant for resampling.
Expressed in number of periods, where period is the resampling_period
if we are downsampling (resampling period bigger than the input period) or
the input sampling period if we are upsampling (input period bigger than
the resampling period).
It must be bigger than 1.0.
Example
If resampling_period
is 3 seconds, the input sampling period is
1 and max_data_age_in_periods
is 2, then data older than 3*2
= 6 seconds will be discarded when creating a new sample and never
passed to the resampling function.
If resampling_period
is 3 seconds, the input sampling period is
5 and max_data_age_in_periods
is 2, then data older than 5*2
= 10 seconds will be discarded when creating a new sample and never
passed to the resampling function.
resampling_function: ResamplingFunction = average
class-attribute
instance-attribute
¤
The resampling function.
This function will be applied to the sequence of relevant samples at a given time. The result of the function is what is sent as the resampled value.
resampling_period: timedelta
instance-attribute
¤
The resampling period.
This is the time it passes between resampled data should be calculated.
It must be a positive time span.
warn_buffer_len: int = DEFAULT_BUFFER_LEN_WARN
class-attribute
instance-attribute
¤
The minimum length of the resampling buffer that will emit a warning.
If a buffer grows bigger than this value, it will emit a warning in the logs, so buffers don't grow too big inadvertently.
It must be at least 1 and at most max_buffer_len
.
Functions¤
__post_init__()
¤
Check that config values are valid.
RAISES | DESCRIPTION |
---|---|
ValueError
|
If any value is out of range. |
Source code in /opt/hostedtoolcache/Python/3.11.5/x64/lib/python3.11/site-packages/frequenz/sdk/timeseries/_resampling.py
Functions¤
frequenz.sdk.actor.run(*actors)
async
¤
Await the completion of all actors.
PARAMETER | DESCRIPTION |
---|---|
actors |
the actors to be awaited.
TYPE:
|