Index
frequenz.channels ¤
Frequenz Channels.
This package contains channel implementations.
Base classes:
-
Receiver: An object that can wait for and consume messages from a channel.
-
Sender: An object that can send messages to a channel.
Channels:
-
Anycast: A channel that supports multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.
-
Broadcast: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers.
Utilities to work with channels:
-
merge: Merge messages coming from multiple receivers into a single stream.
-
select: Iterate over the values of all receivers as new values become available.
Exception classes:
-
Error: Base class for all errors in this library.
-
ChannelError: Base class for all errors related to channels.
-
ChannelClosedError: Error raised when trying to operate (send, receive, etc.) through a closed channel.
-
SenderError: Base class for all errors related to senders.
-
ReceiverError: Base class for all errors related to receivers.
-
ReceiverStoppedError: A receiver stopped producing messages.
-
SelectError: Base class for all errors related to select.
-
SelectErrorGroup: A group of errors raised by select.
-
UnhandledSelectedError: An error raised by select that was not handled by the user.
Extra utility receivers:
-
Event: A receiver that generates a message when an event is set.
-
FileWatcher: A receiver that generates a message when a file is added, modified or deleted.
-
Timer: A receiver that generates a message after a given amount of time.
Classes¤
frequenz.channels.Anycast ¤
Bases: Generic[_T]
A channel that delivers each message to exactly one receiver.
Description¤
Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver.
Characteristics
- Buffered: Yes, with a global channel buffer
- Buffer full policy: Block senders
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and if the senders are faster than the receivers, then the
channel's buffer will fill up. In that case, the senders will block at the
send()
method until the receivers consume the
messages in the channel's buffer. The channel's buffer size can be configured at
creation time via the limit
argument.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When the channel is not needed anymore, it should be closed with the
close()
method. This will prevent further
attempts to send()
data. Receivers will still be
able to drain the pending values on the channel, but after that, subsequent
receive()
calls will raise a
ReceiverStoppedError
exception.
This channel is useful, for example, to distribute work across multiple workers.
In cases where each message need to be received by every receiver, a broadcast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Anycast, Sender
async def send(sender: Sender[int]) -> None:
for msg in range(3):
print(f"sending {msg}")
await sender.send(msg)
async def main() -> None:
channel = Anycast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
msg = await receiver.receive()
print(f"received {msg}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for msg in range(start, stop):
print(f"{name} sending {msg}")
await sender.send(msg)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for msg in receiver:
print(f"{name} received {msg}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Anycast[int](name="numbers", limit=2)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
Source code in src/frequenz/channels/_anycast.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
|
Attributes¤
is_closed
property
¤
Whether this channel is closed.
Any further attempts to use this channel after it is closed will result in an exception.
limit
property
¤
The maximum number of values that can be stored in the channel's buffer.
If the length of channel's buffer reaches the limit, then the sender blocks at the send() method until a value is consumed.
name
property
¤
The name of this channel.
This is for debugging purposes, it will be shown in the string representation of this channel.
Functions¤
__init__ ¤
Create an Anycast channel.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.
TYPE:
|
limit |
The size of the internal buffer in number of messages. If the buffer is full, then the senders will block until the receivers consume the messages in the buffer.
TYPE:
|
Source code in src/frequenz/channels/_anycast.py
__repr__ ¤
Return a string representation of this channel.
__str__ ¤
close
async
¤
Close the channel.
Any further attempts to send() data
will return False
.
Receivers will still be able to drain the pending values on the channel,
but after that, subsequent
receive() calls will return None
immediately.
Source code in src/frequenz/channels/_anycast.py
new_receiver ¤
Create a new receiver.
RETURNS | DESCRIPTION |
---|---|
Receiver[_T]
|
A Receiver instance attached to the Anycast channel. |
frequenz.channels.Broadcast ¤
Bases: Generic[_T]
A channel that deliver all messages to all receivers.
Description¤
Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders will be received by all receivers.
Characteristics
- Buffered: Yes, with one buffer per receiver
- Buffer full policy: Drop oldest message
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped.
Each receiver has its own buffer, so messages will only be dropped for receivers that can't keep up with the senders, and not for the whole channel.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When a channel is not needed anymore, it should be closed with
close()
. This will prevent further
attempts to send()
data, and will allow
receivers to drain the pending items on their queues, but after that,
subsequent receive() calls will
raise a ReceiverStoppedError
.
This channel is useful, for example, to implement a pub/sub pattern, where multiple receivers can subscribe to a channel to receive all messages.
In cases where each message needs to be delivered only to one receiver, an anycast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Broadcast, Sender
async def send(sender: Sender[int]) -> None:
for msg in range(3):
print(f"sending {msg}")
await sender.send(msg)
async def main() -> None:
channel = Broadcast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
msg = await receiver.receive()
print(f"received {msg}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for msg in range(start, stop):
print(f"{name} sending {msg}")
await sender.send(msg)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for msg in receiver:
print(f"{name} received {msg}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Broadcast[int](name="numbers")
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
sender_2 sending 20
sender_2 sending 21
receiver_1 received 10
receiver_1 received 11
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
receiver_2 received 10
receiver_2 received 11
receiver_2 received 12
receiver_2 received 20
receiver_2 received 21
Source code in src/frequenz/channels/_broadcast.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
|
Attributes¤
is_closed
property
¤
Whether this channel is closed.
Any further attempts to use this channel after it is closed will result in an exception.
name
property
¤
The name of this channel.
This is for logging purposes, and it will be shown in the string representation of this channel.
resend_latest
instance-attribute
¤
Whether to resend the latest value to new receivers.
When True
, every time a new receiver is created with new_receiver
, it will
automatically get sent the latest value on the channel. This allows new
receivers on slow streams to get the latest value as soon as they are created,
without having to wait for the next message on the channel to arrive.
It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions.
Functions¤
__init__ ¤
Create a Broadcast channel.
PARAMETER | DESCRIPTION |
---|---|
name |
The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.
TYPE:
|
resend_latest |
When True, every time a new receiver is created with
TYPE:
|
Source code in src/frequenz/channels/_broadcast.py
__repr__ ¤
Return a string representation of this channel.
Source code in src/frequenz/channels/_broadcast.py
__str__ ¤
close
async
¤
Close the Broadcast channel.
Any further attempts to send() data
will return False
.
Receivers will still be able to drain the pending items on their queues,
but after that, subsequent
receive() calls will return None
immediately.
Source code in src/frequenz/channels/_broadcast.py
new_receiver ¤
Create a new broadcast receiver.
Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.
PARAMETER | DESCRIPTION |
---|---|
name |
A name to identify the receiver in the logs.
TYPE:
|
limit |
Number of messages the receiver can hold in its buffer.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Receiver[_T]
|
A Receiver instance attached to the broadcast channel. |
Source code in src/frequenz/channels/_broadcast.py
frequenz.channels.ChannelClosedError ¤
Bases: ChannelError
Error raised when trying to operate on a closed channel.
Source code in src/frequenz/channels/_exceptions.py
frequenz.channels.ChannelError ¤
Bases: Error
An error produced in a channel.
All exceptions generated by channels inherit from this exception.
Source code in src/frequenz/channels/_exceptions.py
frequenz.channels.Error ¤
Bases: RuntimeError
Base error.
All exceptions generated by this library inherit from this exception.
Source code in src/frequenz/channels/_exceptions.py
frequenz.channels.Merger ¤
Bases: Receiver[_T]
A receiver that merges messages coming from multiple receivers into a single stream.
Tip
Please consider using the more idiomatic merge()
function instead of creating a Merger
instance directly.
Source code in src/frequenz/channels/_merge.py
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
|
Functions¤
__aiter__ ¤
Initialize the async iterator over received values.
RETURNS | DESCRIPTION |
---|---|
Self
|
|
__anext__
async
¤
Await the next value in the async iteration over received values.
RETURNS | DESCRIPTION |
---|---|
_T
|
The next value received. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in src/frequenz/channels/_receiver.py
__del__ ¤
__init__ ¤
Create a Merger
instance.
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to merge.
TYPE:
|
name |
The name of the receiver. Used to create the string representation of the receiver.
TYPE:
|
Source code in src/frequenz/channels/_merge.py
__repr__ ¤
__str__ ¤
Return a string representation of this receiver.
Source code in src/frequenz/channels/_merge.py
consume ¤
Return the latest value once ready
is complete.
RETURNS | DESCRIPTION |
---|---|
_T
|
The next value that was received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in src/frequenz/channels/_merge.py
map ¤
ready
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in src/frequenz/channels/_merge.py
receive
async
¤
Receive a message from the channel.
RETURNS | DESCRIPTION |
---|---|
_T
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if there is some problem with the receiver. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in src/frequenz/channels/_receiver.py
stop
async
¤
Stop the Merger
instance and cleanup any pending tasks.
frequenz.channels.Receiver ¤
An entity that receives values from a channel.
Source code in src/frequenz/channels/_receiver.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
|
Functions¤
__aiter__ ¤
Initialize the async iterator over received values.
RETURNS | DESCRIPTION |
---|---|
Self
|
|
__anext__
async
¤
Await the next value in the async iteration over received values.
RETURNS | DESCRIPTION |
---|---|
_T
|
The next value received. |
RAISES | DESCRIPTION |
---|---|
StopAsyncIteration
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in src/frequenz/channels/_receiver.py
consume
abstractmethod
¤
Return the latest value once ready()
is complete.
ready()
must be called before each call to consume()
.
RETURNS | DESCRIPTION |
---|---|
_T
|
The next value received. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if the receiver stopped producing messages. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in src/frequenz/channels/_receiver.py
map ¤
ready
abstractmethod
async
¤
Wait until the receiver is ready with a value or an error.
Once a call to ready()
has finished, the value should be read with
a call to consume()
(receive()
or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver is still active. |
Source code in src/frequenz/channels/_receiver.py
receive
async
¤
Receive a message from the channel.
RETURNS | DESCRIPTION |
---|---|
_T
|
The received message. |
RAISES | DESCRIPTION |
---|---|
ReceiverStoppedError
|
if there is some problem with the receiver. |
ReceiverError
|
if there is some problem with the receiver. |
Source code in src/frequenz/channels/_receiver.py
frequenz.channels.ReceiverError ¤
An error produced in a Receiver.
All exceptions generated by receivers inherit from this exception.
Source code in src/frequenz/channels/_receiver.py
Attributes¤
receiver
instance-attribute
¤
The receiver where the error happened.
Functions¤
__init__ ¤
Create an instance.
PARAMETER | DESCRIPTION |
---|---|
message |
An error message.
TYPE:
|
receiver |
The Receiver where the error happened.
TYPE:
|
Source code in src/frequenz/channels/_receiver.py
frequenz.channels.ReceiverStoppedError ¤
Bases: ReceiverError[_T]
The Receiver stopped producing messages.
Source code in src/frequenz/channels/_receiver.py
frequenz.channels.SelectError ¤
Bases: BaseException
A base exception for select()
.
This exception is raised when a select()
iteration fails. It is raised as
a single exception when one receiver fails during normal operation (while calling
ready()
for example). It is raised as a group exception
(SelectErrorGroup
) when a select
loop
is cleaning up after it's done.
Source code in src/frequenz/channels/_select.py
frequenz.channels.SelectErrorGroup ¤
Bases: BaseExceptionGroup[BaseException]
, SelectError
An exception group for select()
operation.
This exception group is raised when a select()
loops fails while cleaning up
running tests to check for ready receivers.
Source code in src/frequenz/channels/_select.py
frequenz.channels.Selected ¤
Bases: Generic[_T]
A result of a select()
iteration.
The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead.
Selected
instances should be used in conjunction with the
selected_from()
function to determine
which receiver was selected.
Please see select()
for an example.
Source code in src/frequenz/channels/_select.py
158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 |
|
Attributes¤
exception
property
¤
The exception that was raised while receiving the value (if any).
RETURNS | DESCRIPTION |
---|---|
Exception | None
|
The exception that was raised while receiving the value (if any). |
value
property
¤
The value that was received, if any.
RETURNS | DESCRIPTION |
---|---|
_T
|
The value that was received. |
RAISES | DESCRIPTION |
---|---|
Exception
|
If there was an exception while receiving the value. Normally
this should be an |
Functions¤
__init__ ¤
Create a new instance.
The receiver is consumed immediately when creating the instance and the received
value is stored in the instance for later use as
value
. If there was an exception
while receiving the value, then the exception is stored in the instance instead
(as exception
).
PARAMETER | DESCRIPTION |
---|---|
receiver |
The receiver that was selected.
TYPE:
|
Source code in src/frequenz/channels/_select.py
__repr__ ¤
Return a the internal representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in src/frequenz/channels/_select.py
__str__ ¤
Return a string representation of this instance.
RETURNS | DESCRIPTION |
---|---|
str
|
A string representation of this instance. |
Source code in src/frequenz/channels/_select.py
was_stopped ¤
Check if the selected receiver was stopped.
Check if the selected receiver raised
a ReceiverStoppedError
while
consuming a value.
RETURNS | DESCRIPTION |
---|---|
bool
|
Whether the receiver was stopped. |
Source code in src/frequenz/channels/_select.py
frequenz.channels.Sender ¤
An entity that sends values to a channel.
Source code in src/frequenz/channels/_sender.py
Functions¤
send
abstractmethod
async
¤
Send a message to the channel.
PARAMETER | DESCRIPTION |
---|---|
msg |
The message to be sent.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
SenderError
|
if there was an error sending the message. |
frequenz.channels.SenderError ¤
An error produced in a Sender.
All exceptions generated by senders inherit from this exception.
Source code in src/frequenz/channels/_sender.py
frequenz.channels.UnhandledSelectedError ¤
Bases: SelectError
, Generic[_T]
A receiver was not handled in a select()
loop.
This exception is raised when a select()
iteration finishes without a call to
selected_from()
for the selected receiver.
Source code in src/frequenz/channels/_select.py
Attributes¤
selected
instance-attribute
¤
The selected receiver that was not handled.
Functions¤
__init__ ¤
Create a new instance.
PARAMETER | DESCRIPTION |
---|---|
selected |
The selected receiver that was not handled.
TYPE:
|
Source code in src/frequenz/channels/_select.py
Functions¤
frequenz.channels.merge ¤
Merge messages coming from multiple receivers into a single stream.
Example
For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream like this:
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to merge.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Merger[_T]
|
A receiver that merges the messages coming from multiple receivers into a single stream. |
RAISES | DESCRIPTION |
---|---|
ValueError
|
if no receivers are provided. |
Source code in src/frequenz/channels/_merge.py
frequenz.channels.select
async
¤
Iterate over the values of all receivers as they receive new values.
This function is used to iterate over the values of all receivers as they receive
new values. It is used in conjunction with the
Selected
class and the
selected_from()
function to determine
which function to determine which receiver was selected in a select operation.
An exhaustiveness check is performed at runtime to make sure all selected receivers
are handled in the if-chain, so you should call selected_from()
with all the
receivers passed to select()
inside the select loop, even if you plan to ignore
a value, to signal select()
that you are purposefully ignoring the value.
Note
The select()
function is intended to be used in cases where the set of
receivers is static and known beforehand. If you need to dynamically add/remove
receivers from a select loop, there are a few alternatives. Depending on your
use case, one or the other could work better for you:
- Use
merge()
: this is useful when you have an unknown number of receivers of the same type that can be handled as a group. - Use tasks to manage each receiver individually: this is better if there are no relationships between the receivers.
- Break the
select()
loop and start a new one with the new set of receivers (this should be the last resort, as it has some performance implications because the loop needs to be restarted).
Example
import datetime
from typing import assert_never
from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.timer import Timer
timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))
async for selected in select(timer1, timer2):
if selected_from(selected, timer1):
# Beware: `selected.value` might raise an exception, you can always
# check for exceptions with `selected.exception` first or use
# a try-except block. You can also quickly check if the receiver was
# stopped and let any other unexpected exceptions bubble up.
if selected.was_stopped:
print("timer1 was stopped")
continue
print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
timer2.stop()
elif selected_from(selected, timer2):
# Explicitly handling of exceptions
match selected.exception:
case ReceiverStoppedError():
print("timer2 was stopped")
case Exception() as exception:
print(f"timer2: exception={exception}")
case None:
# All good, no exception, we can use `selected.value` safely
print(f"timer2: now={datetime.datetime.now()} drift={selected.value}")
case _ as unhanded:
assert_never(unhanded)
else:
# This is not necessary, as select() will check for exhaustiveness, but
# it is good practice to have it in case you forgot to handle a new
# receiver added to `select()` at a later point in time.
assert False
PARAMETER | DESCRIPTION |
---|---|
*receivers |
The receivers to select from. |
YIELDS | DESCRIPTION |
---|---|
AsyncIterator[Selected[Any]]
|
The currently selected item. |
RAISES | DESCRIPTION |
---|---|
UnhandledSelectedError
|
If a selected receiver was not handled in the if-chain. |
SelectErrorGroup
|
If there is an error while finishing the select operation and receivers fail while cleaning up. |
SelectError
|
If there is an error while selecting receivers during normal
operation. For example if a receiver raises an exception in the |
Source code in src/frequenz/channels/_select.py
369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 |
|
frequenz.channels.selected_from ¤
Check if the given receiver was selected by select()
.
This function is used in conjunction with the
Selected
class to determine which receiver was
selected in select()
iteration.
It also works as a type guard to narrow the type of the
Selected
instance to the type of the receiver.
Please see select()
for an example.
PARAMETER | DESCRIPTION |
---|---|
selected |
The result of a |
receiver |
The receiver to check if it was the source of a select operation.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
TypeGuard[Selected[_T]]
|
Whether the given receiver was selected. |