Skip to content

Index

frequenz.channels ¤

Frequenz Channels.

This package contains channel implementations.

Base classes:

  • Receiver: An object that can wait for and consume messages from a channel.

  • Sender: An object that can send messages to a channel.

Channels:

  • Anycast: A channel that supports multiple senders and multiple receivers. A message sent through a sender will be received by exactly one receiver.

  • Broadcast: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers.

Utilities to work with channels:

  • merge: Merge messages coming from multiple receivers into a single stream.

  • select: Iterate over the values of all receivers as new values become available.

Exception classes:

Extra utility receivers:

  • Event: A receiver that generates a message when an event is set.

  • FileWatcher: A receiver that generates a message when a file is added, modified or deleted.

  • Timer: A receiver that generates a message after a given amount of time.

Classes¤

frequenz.channels.Anycast ¤

Bases: Generic[_T]

A channel that delivers each message to exactly one receiver.

Description¤

Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver.

Receiver Receiver msg1 msg1 Sender Channel Sender msg2 msg2

Characteristics

  • Buffered: Yes, with a global channel buffer
  • Buffer full policy: Block senders
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and if the senders are faster than the receivers, then the channel's buffer will fill up. In that case, the senders will block at the send() method until the receivers consume the messages in the channel's buffer. The channel's buffer size can be configured at creation time via the limit argument.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When the channel is not needed anymore, it should be closed with the close() method. This will prevent further attempts to send() data. Receivers will still be able to drain the pending values on the channel, but after that, subsequent receive() calls will raise a ReceiverStoppedError exception.

This channel is useful, for example, to distribute work across multiple workers.

In cases where each message need to be received by every receiver, a broadcast channel may be used.

Examples¤
Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Anycast, Sender


async def send(sender: Sender[int]) -> None:
    for msg in range(3):
        print(f"sending {msg}")
        await sender.send(msg)


async def main() -> None:
    channel = Anycast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            msg = await receiver.receive()
            print(f"received {msg}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for msg in range(start, stop):
        print(f"{name} sending {msg}")
        await sender.send(msg)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for msg in receiver:
            print(f"{name} received {msg}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Anycast[int](name="numbers", limit=2)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
Source code in src/frequenz/channels/_anycast.py
class Anycast(Generic[_T]):
    """A channel that delivers each message to exactly one receiver.

    # Description

    [Anycast][frequenz.channels.Anycast] channels support multiple
    [senders][frequenz.channels.Sender] and multiple
    [receivers][frequenz.channels.Receiver]. Each message sent through any of the
    senders will be received by exactly one receiver.

    <center>
    ```bob
    .---------. msg1                           msg1  .-----------.
    | Sender  +------.                       .------>| Receiver  |
    '---------'      |      .----------.     |       '-----------'
                     +----->| Channel  +-----+
    .---------.      |      '----------'     |       .-----------.
    | Sender  +------'                       '------>| Receiver  |
    '---------' msg2                           msg2  '-----------'
    ```
    </center>

    !!! Note inline end "Characteristics"

        * **Buffered:** Yes, with a global channel buffer
        * **Buffer full policy:** Block senders
        * **Multiple receivers:** Yes
        * **Multiple senders:** Yes
        * **Thread-safe:** No

    This channel is buffered, and if the senders are faster than the receivers, then the
    channel's buffer will fill up. In that case, the senders will block at the
    [`send()`][frequenz.channels.Sender.send] method until the receivers consume the
    messages in the channel's buffer. The channel's buffer size can be configured at
    creation time via the `limit` argument.

    To create a new [senders][frequenz.channels.Sender] and
    [receivers][frequenz.channels.Receiver] you can use the
    [`new_sender()`][frequenz.channels.Broadcast.new_sender] and
    [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
    respectively.

    When the channel is not needed anymore, it should be closed with the
    [`close()`][frequenz.channels.Anycast.close] method. This will prevent further
    attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be
    able to drain the pending values on the channel, but after that, subsequent
    [`receive()`][frequenz.channels.Receiver.receive] calls will raise a
    [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception.

    This channel is useful, for example, to distribute work across multiple workers.

    In cases where each message need to be received by every receiver, a
    [broadcast][frequenz.channels.Broadcast] channel may be used.

    # Examples

    Example: Send a few numbers to a receiver
        This is a very simple example that sends a few numbers from a single sender to
        a single receiver.

        ```python
        import asyncio

        from frequenz.channels import Anycast, Sender


        async def send(sender: Sender[int]) -> None:
            for msg in range(3):
                print(f"sending {msg}")
                await sender.send(msg)


        async def main() -> None:
            channel = Anycast[int](name="numbers")

            sender = channel.new_sender()
            receiver = channel.new_receiver()

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send(sender))
                for _ in range(3):
                    msg = await receiver.receive()
                    print(f"received {msg}")
                    await asyncio.sleep(0.1)  # sleep (or work) with the data


        asyncio.run(main())
        ```

        The output should look something like (although the sending and received might
        appear more interleaved):

        ```
        sending 0
        sending 1
        sending 2
        received 0
        received 1
        received 2
        ```

    Example: Send a few number from multiple senders to multiple receivers
        This is a more complex example that sends a few numbers from multiple senders to
        multiple receivers, using a small buffer to force the senders to block.

        ```python
        import asyncio

        from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender


        async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
            for msg in range(start, stop):
                print(f"{name} sending {msg}")
                await sender.send(msg)


        async def recv(name: str, receiver: Receiver[int]) -> None:
            try:
                async for msg in receiver:
                    print(f"{name} received {msg}")
                await asyncio.sleep(0.1)  # sleep (or work) with the data
            except ReceiverStoppedError:
                pass


        async def main() -> None:
            acast = Anycast[int](name="numbers", limit=2)

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
                task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
                task_group.create_task(recv("receiver_1", acast.new_receiver()))
                task_group.create_task(recv("receiver_2", acast.new_receiver()))


        asyncio.run(main())
        ```

        The output should look something like this(although the sending and received
        might appear interleaved in a different way):

        ```
        sender_1 sending 10
        sender_1 sending 11
        sender_1 sending 12
        Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
        consumes a value
        sender_2 sending 20
        Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
        consumes a value
        receiver_1 received 10
        receiver_1 received 11
        sender_2 sending 21
        Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
        consumes a value
        receiver_1 received 12
        receiver_1 received 20
        receiver_1 received 21
        ```
    """

    def __init__(self, *, name: str, limit: int = 10) -> None:
        """Create an Anycast channel.

        Args:
            name: The name of the channel. This is for logging purposes, and it will be
                shown in the string representation of the channel.
            limit: The size of the internal buffer in number of messages.  If the buffer
                is full, then the senders will block until the receivers consume the
                messages in the buffer.
        """
        self._name: str = name
        """The name of the channel.

        This is for logging purposes, and it will be shown in the string representation
        of the channel.
        """

        self._deque: deque[_T] = deque(maxlen=limit)
        """The channel's buffer."""

        self._send_cv: Condition = Condition()
        """The condition to wait for free space in the channel's buffer.

        If the channel's buffer is full, then the sender waits for values to
        get consumed using this condition until there's some free space
        available in the channel's buffer.
        """

        self._recv_cv: Condition = Condition()
        """The condition to wait for values in the channel's buffer.

        If the channel's buffer is empty, then the receiver waits for values
        using this condition until there's a value available in the channel's
        buffer.
        """

        self._closed: bool = False
        """Whether the channel is closed."""

    @property
    def name(self) -> str:
        """The name of this channel.

        This is for debugging purposes, it will be shown in the string representation
        of this channel.
        """
        return self._name

    @property
    def is_closed(self) -> bool:
        """Whether this channel is closed.

        Any further attempts to use this channel after it is closed will result in an
        exception.
        """
        return self._closed

    @property
    def limit(self) -> int:
        """The maximum number of values that can be stored in the channel's buffer.

        If the length of channel's buffer reaches the limit, then the sender
        blocks at the [send()][frequenz.channels.Sender.send] method until
        a value is consumed.
        """
        maxlen = self._deque.maxlen
        assert maxlen is not None
        return maxlen

    async def close(self) -> None:
        """Close the channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending values on the channel,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.

        """
        self._closed = True
        async with self._send_cv:
            self._send_cv.notify_all()
        async with self._recv_cv:
            self._recv_cv.notify_all()

    def new_sender(self) -> Sender[_T]:
        """Create a new sender.

        Returns:
            A Sender instance attached to the Anycast channel.
        """
        return _Sender(self)

    def new_receiver(self) -> Receiver[_T]:
        """Create a new receiver.

        Returns:
            A Receiver instance attached to the Anycast channel.
        """
        return _Receiver(self)

    def __str__(self) -> str:
        """Return a string representation of this channel."""
        return f"{type(self).__name__}:{self._name}"

    def __repr__(self) -> str:
        """Return a string representation of this channel."""
        return (
            f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<"
            f"current={len(self._deque)!r}, closed={self._closed!r}>"
        )
Attributes¤
is_closed property ¤
is_closed: bool

Whether this channel is closed.

Any further attempts to use this channel after it is closed will result in an exception.

limit property ¤
limit: int

The maximum number of values that can be stored in the channel's buffer.

If the length of channel's buffer reaches the limit, then the sender blocks at the send() method until a value is consumed.

name property ¤
name: str

The name of this channel.

This is for debugging purposes, it will be shown in the string representation of this channel.

Functions¤
__init__ ¤
__init__(*, name: str, limit: int = 10) -> None

Create an Anycast channel.

PARAMETER DESCRIPTION
name

The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.

TYPE: str

limit

The size of the internal buffer in number of messages. If the buffer is full, then the senders will block until the receivers consume the messages in the buffer.

TYPE: int DEFAULT: 10

Source code in src/frequenz/channels/_anycast.py
def __init__(self, *, name: str, limit: int = 10) -> None:
    """Create an Anycast channel.

    Args:
        name: The name of the channel. This is for logging purposes, and it will be
            shown in the string representation of the channel.
        limit: The size of the internal buffer in number of messages.  If the buffer
            is full, then the senders will block until the receivers consume the
            messages in the buffer.
    """
    self._name: str = name
    """The name of the channel.

    This is for logging purposes, and it will be shown in the string representation
    of the channel.
    """

    self._deque: deque[_T] = deque(maxlen=limit)
    """The channel's buffer."""

    self._send_cv: Condition = Condition()
    """The condition to wait for free space in the channel's buffer.

    If the channel's buffer is full, then the sender waits for values to
    get consumed using this condition until there's some free space
    available in the channel's buffer.
    """

    self._recv_cv: Condition = Condition()
    """The condition to wait for values in the channel's buffer.

    If the channel's buffer is empty, then the receiver waits for values
    using this condition until there's a value available in the channel's
    buffer.
    """

    self._closed: bool = False
    """Whether the channel is closed."""
__repr__ ¤
__repr__() -> str

Return a string representation of this channel.

Source code in src/frequenz/channels/_anycast.py
def __repr__(self) -> str:
    """Return a string representation of this channel."""
    return (
        f"{type(self).__name__}(name={self._name!r}, limit={self.limit!r}):<"
        f"current={len(self._deque)!r}, closed={self._closed!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this channel.

Source code in src/frequenz/channels/_anycast.py
def __str__(self) -> str:
    """Return a string representation of this channel."""
    return f"{type(self).__name__}:{self._name}"
close async ¤
close() -> None

Close the channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending values on the channel, but after that, subsequent receive() calls will return None immediately.

Source code in src/frequenz/channels/_anycast.py
async def close(self) -> None:
    """Close the channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending values on the channel,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.

    """
    self._closed = True
    async with self._send_cv:
        self._send_cv.notify_all()
    async with self._recv_cv:
        self._recv_cv.notify_all()
new_receiver ¤
new_receiver() -> Receiver[_T]

Create a new receiver.

RETURNS DESCRIPTION
Receiver[_T]

A Receiver instance attached to the Anycast channel.

Source code in src/frequenz/channels/_anycast.py
def new_receiver(self) -> Receiver[_T]:
    """Create a new receiver.

    Returns:
        A Receiver instance attached to the Anycast channel.
    """
    return _Receiver(self)
new_sender ¤
new_sender() -> Sender[_T]

Create a new sender.

RETURNS DESCRIPTION
Sender[_T]

A Sender instance attached to the Anycast channel.

Source code in src/frequenz/channels/_anycast.py
def new_sender(self) -> Sender[_T]:
    """Create a new sender.

    Returns:
        A Sender instance attached to the Anycast channel.
    """
    return _Sender(self)

frequenz.channels.Broadcast ¤

Bases: Generic[_T]

A channel that deliver all messages to all receivers.

Description¤

Broadcast channels can have multiple senders and multiple receivers. Each message sent through any of the senders will be received by all receivers.

Receiver Receiver msg1 msg1,msg2 Sender Channel Sender msg2 msg1,msg2

Characteristics

  • Buffered: Yes, with one buffer per receiver
  • Buffer full policy: Drop oldest message
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped.

Each receiver has its own buffer, so messages will only be dropped for receivers that can't keep up with the senders, and not for the whole channel.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When a channel is not needed anymore, it should be closed with close(). This will prevent further attempts to send() data, and will allow receivers to drain the pending items on their queues, but after that, subsequent receive() calls will raise a ReceiverStoppedError.

This channel is useful, for example, to implement a pub/sub pattern, where multiple receivers can subscribe to a channel to receive all messages.

In cases where each message needs to be delivered only to one receiver, an anycast channel may be used.

Examples¤
Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Broadcast, Sender


async def send(sender: Sender[int]) -> None:
    for msg in range(3):
        print(f"sending {msg}")
        await sender.send(msg)


async def main() -> None:
    channel = Broadcast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            msg = await receiver.receive()
            print(f"received {msg}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for msg in range(start, stop):
        print(f"{name} sending {msg}")
        await sender.send(msg)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for msg in receiver:
            print(f"{name} received {msg}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Broadcast[int](name="numbers")

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
sender_2 sending 20
sender_2 sending 21
receiver_1 received 10
receiver_1 received 11
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
receiver_2 received 10
receiver_2 received 11
receiver_2 received 12
receiver_2 received 20
receiver_2 received 21
Source code in src/frequenz/channels/_broadcast.py
class Broadcast(Generic[_T]):
    """A channel that deliver all messages to all receivers.

    # Description

    [Broadcast][frequenz.channels.Broadcast] channels can have multiple
    [senders][frequenz.channels.Sender] and multiple
    [receivers][frequenz.channels.Receiver]. Each message sent through any of the
    senders will be received by all receivers.

    <center>
    ```bob
    .---------. msg1                           msg1,msg2  .-----------.
    | Sender  +------.                        .---------->| Receiver  |
    '---------'      |      .----------.     |            '-----------'
                     +----->| Channel  +-----+
    .---------.      |      '----------'     |            .-----------.
    | Sender  +------'                       '----------->| Receiver  |
    '---------' msg2                           msg1,msg2  '-----------'
    ```
    </center>

    !!! Note inline end "Characteristics"

        * **Buffered:** Yes, with one buffer per receiver
        * **Buffer full policy:** Drop oldest message
        * **Multiple receivers:** Yes
        * **Multiple senders:** Yes
        * **Thread-safe:** No

    This channel is buffered, and when messages are not being consumed fast
    enough and the buffer fills up, old messages will get dropped.

    Each receiver has its own buffer, so messages will only be dropped for
    receivers that can't keep up with the senders, and not for the whole
    channel.

    To create a new [senders][frequenz.channels.Sender] and
    [receivers][frequenz.channels.Receiver] you can use the
    [`new_sender()`][frequenz.channels.Broadcast.new_sender] and
    [`new_receiver()`][frequenz.channels.Broadcast.new_receiver] methods
    respectively.

    When a channel is not needed anymore, it should be closed with
    [`close()`][frequenz.channels.Broadcast.close]. This will prevent further
    attempts to [`send()`][frequenz.channels.Sender.send] data, and will allow
    receivers to drain the pending items on their queues, but after that,
    subsequent [receive()][frequenz.channels.Receiver.receive] calls will
    raise a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError].

    This channel is useful, for example, to implement a pub/sub pattern, where
    multiple receivers can subscribe to a channel to receive all messages.

    In cases where each message needs to be delivered only to one receiver, an
    [anycast][frequenz.channels.Anycast] channel may be used.

    # Examples

    Example: Send a few numbers to a receiver
        This is a very simple example that sends a few numbers from a single sender to
        a single receiver.

        ```python
        import asyncio

        from frequenz.channels import Broadcast, Sender


        async def send(sender: Sender[int]) -> None:
            for msg in range(3):
                print(f"sending {msg}")
                await sender.send(msg)


        async def main() -> None:
            channel = Broadcast[int](name="numbers")

            sender = channel.new_sender()
            receiver = channel.new_receiver()

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send(sender))
                for _ in range(3):
                    msg = await receiver.receive()
                    print(f"received {msg}")
                    await asyncio.sleep(0.1)  # sleep (or work) with the data


        asyncio.run(main())
        ```

        The output should look something like (although the sending and received might
        appear more interleaved):

        ```
        sending 0
        sending 1
        sending 2
        received 0
        received 1
        received 2
        ```

    Example: Send a few number from multiple senders to multiple receivers
        This is a more complex example that sends a few numbers from multiple senders to
        multiple receivers, using a small buffer to force the senders to block.

        ```python
        import asyncio

        from frequenz.channels import Broadcast, Receiver, ReceiverStoppedError, Sender


        async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
            for msg in range(start, stop):
                print(f"{name} sending {msg}")
                await sender.send(msg)


        async def recv(name: str, receiver: Receiver[int]) -> None:
            try:
                async for msg in receiver:
                    print(f"{name} received {msg}")
                await asyncio.sleep(0.1)  # sleep (or work) with the data
            except ReceiverStoppedError:
                pass


        async def main() -> None:
            acast = Broadcast[int](name="numbers")

            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
                task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
                task_group.create_task(recv("receiver_1", acast.new_receiver()))
                task_group.create_task(recv("receiver_2", acast.new_receiver()))


        asyncio.run(main())
        ```

        The output should look something like this(although the sending and received
        might appear interleaved in a different way):

        ```
        sender_1 sending 10
        sender_1 sending 11
        sender_1 sending 12
        sender_2 sending 20
        sender_2 sending 21
        receiver_1 received 10
        receiver_1 received 11
        receiver_1 received 12
        receiver_1 received 20
        receiver_1 received 21
        receiver_2 received 10
        receiver_2 received 11
        receiver_2 received 12
        receiver_2 received 20
        receiver_2 received 21
        ```
    """

    def __init__(self, *, name: str, resend_latest: bool = False) -> None:
        """Create a Broadcast channel.

        Args:
            name: The name of the channel. This is for logging purposes, and it will be
                shown in the string representation of the channel.
            resend_latest: When True, every time a new receiver is created with
                `new_receiver`, it will automatically get sent the latest value on the
                channel.  This allows new receivers on slow streams to get the latest
                value as soon as they are created, without having to wait for the next
                message on the channel to arrive.  It is safe to be set in
                data/reporting channels, but is not recommended for use in channels that
                stream control instructions.
        """
        self._name: str = name
        """The name of the broadcast channel.

        Only used for debugging purposes.
        """

        self._recv_cv: Condition = Condition()
        """The condition to wait for data in the channel's buffer."""

        self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {}
        """The receivers attached to the channel, indexed by their hash()."""

        self._closed: bool = False
        """Whether the channel is closed."""

        self._latest: _T | None = None
        """The latest value sent to the channel."""

        self.resend_latest: bool = resend_latest
        """Whether to resend the latest value to new receivers.

        When `True`, every time a new receiver is created with `new_receiver`, it will
        automatically get sent the latest value on the channel.  This allows new
        receivers on slow streams to get the latest value as soon as they are created,
        without having to wait for the next message on the channel to arrive.

        It is safe to be set in data/reporting channels, but is not recommended for use
        in channels that stream control instructions.
        """

    @property
    def name(self) -> str:
        """The name of this channel.

        This is for logging purposes, and it will be shown in the string representation
        of this channel.
        """
        return self._name

    @property
    def is_closed(self) -> bool:
        """Whether this channel is closed.

        Any further attempts to use this channel after it is closed will result in an
        exception.
        """
        return self._closed

    async def close(self) -> None:
        """Close the Broadcast channel.

        Any further attempts to [send()][frequenz.channels.Sender.send] data
        will return `False`.

        Receivers will still be able to drain the pending items on their queues,
        but after that, subsequent
        [receive()][frequenz.channels.Receiver.receive] calls will return `None`
        immediately.
        """
        self._latest = None
        self._closed = True
        async with self._recv_cv:
            self._recv_cv.notify_all()

    def new_sender(self) -> Sender[_T]:
        """Create a new broadcast sender.

        Returns:
            A Sender instance attached to the broadcast channel.
        """
        return _Sender(self)

    def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]:
        """Create a new broadcast receiver.

        Broadcast receivers have their own buffer, and when messages are not
        being consumed fast enough and the buffer fills up, old messages will
        get dropped just in this receiver.

        Args:
            name: A name to identify the receiver in the logs.
            limit: Number of messages the receiver can hold in its buffer.

        Returns:
            A Receiver instance attached to the broadcast channel.
        """
        recv: _Receiver[_T] = _Receiver(name, limit, self)
        self._receivers[hash(recv)] = weakref.ref(recv)
        if self.resend_latest and self._latest is not None:
            recv.enqueue(self._latest)
        return recv

    def __str__(self) -> str:
        """Return a string representation of this receiver."""
        return f"{type(self).__name__}:{self._name}"

    def __repr__(self) -> str:
        """Return a string representation of this channel."""
        return (
            f"{type(self).__name__}(name={self._name!r}, "
            f"resend_latest={self.resend_latest!r}):<"
            f"latest={self._latest!r}, "
            f"receivers={len(self._receivers)!r}, "
            f"closed={self._closed!r}>"
        )
Attributes¤
is_closed property ¤
is_closed: bool

Whether this channel is closed.

Any further attempts to use this channel after it is closed will result in an exception.

name property ¤
name: str

The name of this channel.

This is for logging purposes, and it will be shown in the string representation of this channel.

resend_latest instance-attribute ¤
resend_latest: bool = resend_latest

Whether to resend the latest value to new receivers.

When True, every time a new receiver is created with new_receiver, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest value as soon as they are created, without having to wait for the next message on the channel to arrive.

It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions.

Functions¤
__init__ ¤
__init__(*, name: str, resend_latest: bool = False) -> None

Create a Broadcast channel.

PARAMETER DESCRIPTION
name

The name of the channel. This is for logging purposes, and it will be shown in the string representation of the channel.

TYPE: str

resend_latest

When True, every time a new receiver is created with new_receiver, it will automatically get sent the latest value on the channel. This allows new receivers on slow streams to get the latest value as soon as they are created, without having to wait for the next message on the channel to arrive. It is safe to be set in data/reporting channels, but is not recommended for use in channels that stream control instructions.

TYPE: bool DEFAULT: False

Source code in src/frequenz/channels/_broadcast.py
def __init__(self, *, name: str, resend_latest: bool = False) -> None:
    """Create a Broadcast channel.

    Args:
        name: The name of the channel. This is for logging purposes, and it will be
            shown in the string representation of the channel.
        resend_latest: When True, every time a new receiver is created with
            `new_receiver`, it will automatically get sent the latest value on the
            channel.  This allows new receivers on slow streams to get the latest
            value as soon as they are created, without having to wait for the next
            message on the channel to arrive.  It is safe to be set in
            data/reporting channels, but is not recommended for use in channels that
            stream control instructions.
    """
    self._name: str = name
    """The name of the broadcast channel.

    Only used for debugging purposes.
    """

    self._recv_cv: Condition = Condition()
    """The condition to wait for data in the channel's buffer."""

    self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {}
    """The receivers attached to the channel, indexed by their hash()."""

    self._closed: bool = False
    """Whether the channel is closed."""

    self._latest: _T | None = None
    """The latest value sent to the channel."""

    self.resend_latest: bool = resend_latest
    """Whether to resend the latest value to new receivers.

    When `True`, every time a new receiver is created with `new_receiver`, it will
    automatically get sent the latest value on the channel.  This allows new
    receivers on slow streams to get the latest value as soon as they are created,
    without having to wait for the next message on the channel to arrive.

    It is safe to be set in data/reporting channels, but is not recommended for use
    in channels that stream control instructions.
    """
__repr__ ¤
__repr__() -> str

Return a string representation of this channel.

Source code in src/frequenz/channels/_broadcast.py
def __repr__(self) -> str:
    """Return a string representation of this channel."""
    return (
        f"{type(self).__name__}(name={self._name!r}, "
        f"resend_latest={self.resend_latest!r}):<"
        f"latest={self._latest!r}, "
        f"receivers={len(self._receivers)!r}, "
        f"closed={self._closed!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this receiver.

Source code in src/frequenz/channels/_broadcast.py
def __str__(self) -> str:
    """Return a string representation of this receiver."""
    return f"{type(self).__name__}:{self._name}"
close async ¤
close() -> None

Close the Broadcast channel.

Any further attempts to send() data will return False.

Receivers will still be able to drain the pending items on their queues, but after that, subsequent receive() calls will return None immediately.

Source code in src/frequenz/channels/_broadcast.py
async def close(self) -> None:
    """Close the Broadcast channel.

    Any further attempts to [send()][frequenz.channels.Sender.send] data
    will return `False`.

    Receivers will still be able to drain the pending items on their queues,
    but after that, subsequent
    [receive()][frequenz.channels.Receiver.receive] calls will return `None`
    immediately.
    """
    self._latest = None
    self._closed = True
    async with self._recv_cv:
        self._recv_cv.notify_all()
new_receiver ¤
new_receiver(
    *, name: str | None = None, limit: int = 50
) -> Receiver[_T]

Create a new broadcast receiver.

Broadcast receivers have their own buffer, and when messages are not being consumed fast enough and the buffer fills up, old messages will get dropped just in this receiver.

PARAMETER DESCRIPTION
name

A name to identify the receiver in the logs.

TYPE: str | None DEFAULT: None

limit

Number of messages the receiver can hold in its buffer.

TYPE: int DEFAULT: 50

RETURNS DESCRIPTION
Receiver[_T]

A Receiver instance attached to the broadcast channel.

Source code in src/frequenz/channels/_broadcast.py
def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]:
    """Create a new broadcast receiver.

    Broadcast receivers have their own buffer, and when messages are not
    being consumed fast enough and the buffer fills up, old messages will
    get dropped just in this receiver.

    Args:
        name: A name to identify the receiver in the logs.
        limit: Number of messages the receiver can hold in its buffer.

    Returns:
        A Receiver instance attached to the broadcast channel.
    """
    recv: _Receiver[_T] = _Receiver(name, limit, self)
    self._receivers[hash(recv)] = weakref.ref(recv)
    if self.resend_latest and self._latest is not None:
        recv.enqueue(self._latest)
    return recv
new_sender ¤
new_sender() -> Sender[_T]

Create a new broadcast sender.

RETURNS DESCRIPTION
Sender[_T]

A Sender instance attached to the broadcast channel.

Source code in src/frequenz/channels/_broadcast.py
def new_sender(self) -> Sender[_T]:
    """Create a new broadcast sender.

    Returns:
        A Sender instance attached to the broadcast channel.
    """
    return _Sender(self)

frequenz.channels.ChannelClosedError ¤

Bases: ChannelError

Error raised when trying to operate on a closed channel.

Source code in src/frequenz/channels/_exceptions.py
class ChannelClosedError(ChannelError):
    """Error raised when trying to operate on a closed channel."""

    def __init__(self, channel: Any):
        """Create a `ChannelClosedError` instance.

        Args:
            channel: A reference to the channel that was closed.
        """
        super().__init__(f"Channel {channel} was closed", channel)
Attributes¤
channel instance-attribute ¤
channel: Any = channel

The channel where the error happened.

Functions¤
__init__ ¤
__init__(channel: Any)

Create a ChannelClosedError instance.

PARAMETER DESCRIPTION
channel

A reference to the channel that was closed.

TYPE: Any

Source code in src/frequenz/channels/_exceptions.py
def __init__(self, channel: Any):
    """Create a `ChannelClosedError` instance.

    Args:
        channel: A reference to the channel that was closed.
    """
    super().__init__(f"Channel {channel} was closed", channel)

frequenz.channels.ChannelError ¤

Bases: Error

An error produced in a channel.

All exceptions generated by channels inherit from this exception.

Source code in src/frequenz/channels/_exceptions.py
class ChannelError(Error):
    """An error produced in a channel.

    All exceptions generated by channels inherit from this exception.
    """

    def __init__(self, message: str, channel: Any):
        """Create a ChannelError instance.

        Args:
            message: An error message.
            channel: A reference to the channel that encountered the error.
        """
        super().__init__(message)
        self.channel: Any = channel
        """The channel where the error happened."""
Attributes¤
channel instance-attribute ¤
channel: Any = channel

The channel where the error happened.

Functions¤
__init__ ¤
__init__(message: str, channel: Any)

Create a ChannelError instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: str

channel

A reference to the channel that encountered the error.

TYPE: Any

Source code in src/frequenz/channels/_exceptions.py
def __init__(self, message: str, channel: Any):
    """Create a ChannelError instance.

    Args:
        message: An error message.
        channel: A reference to the channel that encountered the error.
    """
    super().__init__(message)
    self.channel: Any = channel
    """The channel where the error happened."""

frequenz.channels.Error ¤

Bases: RuntimeError

Base error.

All exceptions generated by this library inherit from this exception.

Source code in src/frequenz/channels/_exceptions.py
class Error(RuntimeError):
    """Base error.

    All exceptions generated by this library inherit from this exception.
    """

    def __init__(self, message: str):
        """Create a ChannelError instance.

        Args:
            message: An error message.
        """
        super().__init__(message)
Functions¤
__init__ ¤
__init__(message: str)

Create a ChannelError instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: str

Source code in src/frequenz/channels/_exceptions.py
def __init__(self, message: str):
    """Create a ChannelError instance.

    Args:
        message: An error message.
    """
    super().__init__(message)

frequenz.channels.Merger ¤

Bases: Receiver[_T]

A receiver that merges messages coming from multiple receivers into a single stream.

Tip

Please consider using the more idiomatic merge() function instead of creating a Merger instance directly.

Source code in src/frequenz/channels/_merge.py
class Merger(Receiver[_T]):
    """A receiver that merges messages coming from multiple receivers into a single stream.

    Tip:
        Please consider using the more idiomatic [`merge()`][frequenz.channels.merge]
        function instead of creating a `Merger` instance directly.
    """

    def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
        """Create a `Merger` instance.

        Args:
            *receivers: The receivers to merge.
            name: The name of the receiver. Used to create the string representation
                of the receiver.
        """
        self._receivers: dict[str, Receiver[_T]] = {
            str(id): recv for id, recv in enumerate(receivers)
        }
        self._name: str = name if name is not None else type(self).__name__
        self._pending: set[asyncio.Task[Any]] = {
            asyncio.create_task(anext(recv), name=name)
            for name, recv in self._receivers.items()
        }
        self._results: deque[_T] = deque(maxlen=len(self._receivers))

    def __del__(self) -> None:
        """Cleanup any pending tasks."""
        for task in self._pending:
            if not task.done() and task.get_loop().is_running():
                task.cancel()

    async def stop(self) -> None:
        """Stop the `Merger` instance and cleanup any pending tasks."""
        for task in self._pending:
            task.cancel()
        await asyncio.gather(*self._pending, return_exceptions=True)
        self._pending = set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # we use a while loop to continue to wait for new data, in case the
        # previous `wait` completed because a channel was closed.
        while True:
            # if there are messages waiting to be consumed, return immediately.
            if len(self._results) > 0:
                return True

            # if there are no more pending receivers, we return immediately.
            if len(self._pending) == 0:
                return False

            done, self._pending = await asyncio.wait(
                self._pending, return_when=asyncio.FIRST_COMPLETED
            )
            for item in done:
                name = item.get_name()
                # if channel is closed, don't add a task for it again.
                if isinstance(item.exception(), StopAsyncIteration):
                    continue
                result = item.result()
                self._results.append(result)
                self._pending.add(
                    asyncio.create_task(anext(self._receivers[name]), name=name)
                )

    def consume(self) -> _T:
        """Return the latest value once `ready` is complete.

        Returns:
            The next value that was received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        if not self._results and not self._pending:
            raise ReceiverStoppedError(self)

        assert self._results, "`consume()` must be preceded by a call to `ready()`"

        return self._results.popleft()

    def __str__(self) -> str:
        """Return a string representation of this receiver."""
        if len(self._receivers) > 3:
            receivers = [str(p) for p in itertools.islice(self._receivers.values(), 3)]
            receivers.append("…")
        else:
            receivers = [str(p) for p in self._receivers.values()]
        return f"{self._name}:{','.join(receivers)}"

    def __repr__(self) -> str:
        """Return a string representation of this receiver."""
        return (
            f"{self._name}("
            f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})"
        )
Functions¤
__aiter__ ¤
__aiter__() -> Self

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Self

self, since no extra setup is needed for the iterator.

Source code in src/frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__ async ¤
__anext__() -> _T

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
_T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def __anext__(self) -> _T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__del__ ¤
__del__() -> None

Cleanup any pending tasks.

Source code in src/frequenz/channels/_merge.py
def __del__(self) -> None:
    """Cleanup any pending tasks."""
    for task in self._pending:
        if not task.done() and task.get_loop().is_running():
            task.cancel()
__init__ ¤
__init__(
    *receivers: Receiver[_T], name: str | None
) -> None

Create a Merger instance.

PARAMETER DESCRIPTION
*receivers

The receivers to merge.

TYPE: Receiver[_T] DEFAULT: ()

name

The name of the receiver. Used to create the string representation of the receiver.

TYPE: str | None

Source code in src/frequenz/channels/_merge.py
def __init__(self, *receivers: Receiver[_T], name: str | None) -> None:
    """Create a `Merger` instance.

    Args:
        *receivers: The receivers to merge.
        name: The name of the receiver. Used to create the string representation
            of the receiver.
    """
    self._receivers: dict[str, Receiver[_T]] = {
        str(id): recv for id, recv in enumerate(receivers)
    }
    self._name: str = name if name is not None else type(self).__name__
    self._pending: set[asyncio.Task[Any]] = {
        asyncio.create_task(anext(recv), name=name)
        for name, recv in self._receivers.items()
    }
    self._results: deque[_T] = deque(maxlen=len(self._receivers))
__repr__ ¤
__repr__() -> str

Return a string representation of this receiver.

Source code in src/frequenz/channels/_merge.py
def __repr__(self) -> str:
    """Return a string representation of this receiver."""
    return (
        f"{self._name}("
        f"{', '.join(f'{k}={v!r}' for k, v in self._receivers.items())})"
    )
__str__ ¤
__str__() -> str

Return a string representation of this receiver.

Source code in src/frequenz/channels/_merge.py
def __str__(self) -> str:
    """Return a string representation of this receiver."""
    if len(self._receivers) > 3:
        receivers = [str(p) for p in itertools.islice(self._receivers.values(), 3)]
        receivers.append("…")
    else:
        receivers = [str(p) for p in self._receivers.values()]
    return f"{self._name}:{','.join(receivers)}"
consume ¤
consume() -> _T

Return the latest value once ready is complete.

RETURNS DESCRIPTION
_T

The next value that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_merge.py
def consume(self) -> _T:
    """Return the latest value once `ready` is complete.

    Returns:
        The next value that was received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    if not self._results and not self._pending:
        raise ReceiverStoppedError(self)

    assert self._results, "`consume()` must be preceded by a call to `ready()`"

    return self._results.popleft()
map ¤
map(call: Callable[[_T], _U]) -> Receiver[_U]

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[_T], _U]

RETURNS DESCRIPTION
Receiver[_U]

A Receiver to read results of the given function from.

Source code in src/frequenz/channels/_receiver.py
def map(self, call: Callable[[_T], _U]) -> Receiver[_U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
ready async ¤
ready() -> bool

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in src/frequenz/channels/_merge.py
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # we use a while loop to continue to wait for new data, in case the
    # previous `wait` completed because a channel was closed.
    while True:
        # if there are messages waiting to be consumed, return immediately.
        if len(self._results) > 0:
            return True

        # if there are no more pending receivers, we return immediately.
        if len(self._pending) == 0:
            return False

        done, self._pending = await asyncio.wait(
            self._pending, return_when=asyncio.FIRST_COMPLETED
        )
        for item in done:
            name = item.get_name()
            # if channel is closed, don't add a task for it again.
            if isinstance(item.exception(), StopAsyncIteration):
                continue
            result = item.result()
            self._results.append(result)
            self._pending.add(
                asyncio.create_task(anext(self._receivers[name]), name=name)
            )
receive async ¤
receive() -> _T

Receive a message from the channel.

RETURNS DESCRIPTION
_T

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def receive(self) -> _T:
    """Receive a message from the channel.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received
stop async ¤
stop() -> None

Stop the Merger instance and cleanup any pending tasks.

Source code in src/frequenz/channels/_merge.py
async def stop(self) -> None:
    """Stop the `Merger` instance and cleanup any pending tasks."""
    for task in self._pending:
        task.cancel()
    await asyncio.gather(*self._pending, return_exceptions=True)
    self._pending = set()

frequenz.channels.Receiver ¤

Bases: ABC, Generic[_T]

An entity that receives values from a channel.

Source code in src/frequenz/channels/_receiver.py
class Receiver(ABC, Generic[_T]):
    """An entity that receives values from a channel."""

    async def __anext__(self) -> _T:
        """Await the next value in the async iteration over received values.

        Returns:
            The next value received.

        Raises:
            StopAsyncIteration: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """
        try:
            await self.ready()
            return self.consume()
        except ReceiverStoppedError as exc:
            raise StopAsyncIteration() from exc

    @abstractmethod
    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """

    @abstractmethod
    def consume(self) -> _T:
        """Return the latest value once `ready()` is complete.

        `ready()` must be called before each call to `consume()`.

        Returns:
            The next value received.

        Raises:
            ReceiverStoppedError: if the receiver stopped producing messages.
            ReceiverError: if there is some problem with the receiver.
        """

    def __aiter__(self) -> Self:
        """Initialize the async iterator over received values.

        Returns:
            `self`, since no extra setup is needed for the iterator.
        """
        return self

    async def receive(self) -> _T:
        """Receive a message from the channel.

        Returns:
            The received message.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
            ReceiverError: if there is some problem with the receiver.
        """
        try:
            received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
        except StopAsyncIteration as exc:
            # If we already had a cause and it was the receiver was stopped,
            # then reuse that error, as StopAsyncIteration is just an artifact
            # introduced by __anext__.
            if (
                isinstance(exc.__cause__, ReceiverStoppedError)
                # pylint is not smart enough to figure out we checked above
                # this is a ReceiverStoppedError and thus it does have
                # a receiver member
                and exc.__cause__.receiver is self  # pylint: disable=no-member
            ):
                raise exc.__cause__
            raise ReceiverStoppedError(self) from exc
        return received

    def map(self, call: Callable[[_T], _U]) -> Receiver[_U]:
        """Return a receiver with `call` applied on incoming messages.

        Args:
            call: function to apply on incoming messages.

        Returns:
            A `Receiver` to read results of the given function from.
        """
        return _Map(self, call)
Functions¤
__aiter__ ¤
__aiter__() -> Self

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Self

self, since no extra setup is needed for the iterator.

Source code in src/frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__ async ¤
__anext__() -> _T

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
_T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def __anext__(self) -> _T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
consume abstractmethod ¤
consume() -> _T

Return the latest value once ready() is complete.

ready() must be called before each call to consume().

RETURNS DESCRIPTION
_T

The next value received.

RAISES DESCRIPTION
ReceiverStoppedError

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
@abstractmethod
def consume(self) -> _T:
    """Return the latest value once `ready()` is complete.

    `ready()` must be called before each call to `consume()`.

    Returns:
        The next value received.

    Raises:
        ReceiverStoppedError: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
map ¤
map(call: Callable[[_T], _U]) -> Receiver[_U]

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[_T], _U]

RETURNS DESCRIPTION
Receiver[_U]

A Receiver to read results of the given function from.

Source code in src/frequenz/channels/_receiver.py
def map(self, call: Callable[[_T], _U]) -> Receiver[_U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
ready abstractmethod async ¤
ready() -> bool

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in src/frequenz/channels/_receiver.py
@abstractmethod
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
receive async ¤
receive() -> _T

Receive a message from the channel.

RETURNS DESCRIPTION
_T

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def receive(self) -> _T:
    """Receive a message from the channel.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received

frequenz.channels.ReceiverError ¤

Bases: Error, Generic[_T]

An error produced in a Receiver.

All exceptions generated by receivers inherit from this exception.

Source code in src/frequenz/channels/_receiver.py
class ReceiverError(Error, Generic[_T]):
    """An error produced in a [Receiver][frequenz.channels.Receiver].

    All exceptions generated by receivers inherit from this exception.
    """

    def __init__(self, message: str, receiver: Receiver[_T]):
        """Create an instance.

        Args:
            message: An error message.
            receiver: The [Receiver][frequenz.channels.Receiver] where the
                error happened.
        """
        super().__init__(message)
        self.receiver: Receiver[_T] = receiver
        """The receiver where the error happened."""
Attributes¤
receiver instance-attribute ¤
receiver: Receiver[_T] = receiver

The receiver where the error happened.

Functions¤
__init__ ¤
__init__(message: str, receiver: Receiver[_T])

Create an instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: str

receiver

The Receiver where the error happened.

TYPE: Receiver[_T]

Source code in src/frequenz/channels/_receiver.py
def __init__(self, message: str, receiver: Receiver[_T]):
    """Create an instance.

    Args:
        message: An error message.
        receiver: The [Receiver][frequenz.channels.Receiver] where the
            error happened.
    """
    super().__init__(message)
    self.receiver: Receiver[_T] = receiver
    """The receiver where the error happened."""

frequenz.channels.ReceiverStoppedError ¤

Bases: ReceiverError[_T]

The Receiver stopped producing messages.

Source code in src/frequenz/channels/_receiver.py
class ReceiverStoppedError(ReceiverError[_T]):
    """The [Receiver][frequenz.channels.Receiver] stopped producing messages."""

    def __init__(self, receiver: Receiver[_T]):
        """Create an instance.

        Args:
            receiver: The [Receiver][frequenz.channels.Receiver] where the
                error happened.
        """
        super().__init__(f"Receiver {receiver} was stopped", receiver)
Attributes¤
receiver instance-attribute ¤
receiver: Receiver[_T] = receiver

The receiver where the error happened.

Functions¤
__init__ ¤
__init__(receiver: Receiver[_T])

Create an instance.

PARAMETER DESCRIPTION
receiver

The Receiver where the error happened.

TYPE: Receiver[_T]

Source code in src/frequenz/channels/_receiver.py
def __init__(self, receiver: Receiver[_T]):
    """Create an instance.

    Args:
        receiver: The [Receiver][frequenz.channels.Receiver] where the
            error happened.
    """
    super().__init__(f"Receiver {receiver} was stopped", receiver)

frequenz.channels.SelectError ¤

Bases: BaseException

A base exception for select().

This exception is raised when a select() iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling ready() for example). It is raised as a group exception (SelectErrorGroup) when a select loop is cleaning up after it's done.

Source code in src/frequenz/channels/_select.py
class SelectError(BaseException):
    """A base exception for [`select()`][frequenz.channels.select].

    This exception is raised when a `select()` iteration fails.  It is raised as
    a single exception when one receiver fails during normal operation (while calling
    `ready()` for example).  It is raised as a group exception
    ([`SelectErrorGroup`][frequenz.channels.SelectErrorGroup]) when a `select` loop
    is cleaning up after it's done.
    """

frequenz.channels.SelectErrorGroup ¤

Bases: BaseExceptionGroup[BaseException], SelectError

An exception group for select() operation.

This exception group is raised when a select() loops fails while cleaning up running tests to check for ready receivers.

Source code in src/frequenz/channels/_select.py
class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError):
    """An exception group for [`select()`][frequenz.channels.select] operation.

    This exception group is raised when a `select()` loops fails while cleaning up
    running tests to check for ready receivers.
    """

frequenz.channels.Selected ¤

Bases: Generic[_T]

A result of a select() iteration.

The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead.

Selected instances should be used in conjunction with the selected_from() function to determine which receiver was selected.

Please see select() for an example.

Source code in src/frequenz/channels/_select.py
class Selected(Generic[_T]):
    """A result of a [`select()`][frequenz.channels.select] iteration.

    The selected receiver is consumed immediately and the received value is stored in
    the instance, unless there was an exception while receiving the value, in which case
    the exception is stored instead.

    `Selected` instances should be used in conjunction with the
    [`selected_from()`][frequenz.channels.selected_from] function to determine
    which receiver was selected.

    Please see [`select()`][frequenz.channels.select] for an example.
    """

    def __init__(self, receiver: Receiver[_T]) -> None:
        """Create a new instance.

        The receiver is consumed immediately when creating the instance and the received
        value is stored in the instance for later use as
        [`value`][frequenz.channels.Selected.value].  If there was an exception
        while receiving the value, then the exception is stored in the instance instead
        (as [`exception`][frequenz.channels.Selected.exception]).

        Args:
            receiver: The receiver that was selected.
        """
        self._recv: Receiver[_T] = receiver
        """The receiver that was selected."""

        self._value: _T | _EmptyResult = _EmptyResult()
        """The value that was received.

        If there was an exception while receiving the value, then this will be `None`.
        """
        self._exception: Exception | None = None
        """The exception that was raised while receiving the value (if any)."""

        try:
            self._value = receiver.consume()
        except Exception as exc:  # pylint: disable=broad-except
            self._exception = exc

        self._handled: bool = False
        """Flag to indicate if this selected has been handled in the if-chain."""

    @property
    def value(self) -> _T:
        """The value that was received, if any.

        Returns:
            The value that was received.

        Raises:
            Exception: If there was an exception while receiving the value. Normally
                this should be an [`frequenz.channels.Error`][frequenz.channels.Error]
                instance, but catches all exceptions in case some receivers can raise
                anything else.
        """
        if self._exception is not None:
            raise self._exception
        assert not isinstance(self._value, _EmptyResult)
        return self._value

    @property
    def exception(self) -> Exception | None:
        """The exception that was raised while receiving the value (if any).

        Returns:
            The exception that was raised while receiving the value (if any).
        """
        return self._exception

    def was_stopped(self) -> bool:
        """Check if the selected receiver was stopped.

        Check if the selected receiver raised
        a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] while
        consuming a value.

        Returns:
            Whether the receiver was stopped.
        """
        return isinstance(self._exception, ReceiverStoppedError)

    def __str__(self) -> str:
        """Return a string representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return (
            f"{type(self).__name__}({self._recv}) -> "
            f"{self._exception or self._value})"
        )

    def __repr__(self) -> str:
        """Return a the internal representation of this instance.

        Returns:
            A string representation of this instance.
        """
        return (
            f"{type(self).__name__}({self._recv=}, {self._value=}, "
            f"{self._exception=}, {self._handled=})"
        )
Attributes¤
exception property ¤
exception: Exception | None

The exception that was raised while receiving the value (if any).

RETURNS DESCRIPTION
Exception | None

The exception that was raised while receiving the value (if any).

value property ¤
value: _T

The value that was received, if any.

RETURNS DESCRIPTION
_T

The value that was received.

RAISES DESCRIPTION
Exception

If there was an exception while receiving the value. Normally this should be an frequenz.channels.Error instance, but catches all exceptions in case some receivers can raise anything else.

Functions¤
__init__ ¤
__init__(receiver: Receiver[_T]) -> None

Create a new instance.

The receiver is consumed immediately when creating the instance and the received value is stored in the instance for later use as value. If there was an exception while receiving the value, then the exception is stored in the instance instead (as exception).

PARAMETER DESCRIPTION
receiver

The receiver that was selected.

TYPE: Receiver[_T]

Source code in src/frequenz/channels/_select.py
def __init__(self, receiver: Receiver[_T]) -> None:
    """Create a new instance.

    The receiver is consumed immediately when creating the instance and the received
    value is stored in the instance for later use as
    [`value`][frequenz.channels.Selected.value].  If there was an exception
    while receiving the value, then the exception is stored in the instance instead
    (as [`exception`][frequenz.channels.Selected.exception]).

    Args:
        receiver: The receiver that was selected.
    """
    self._recv: Receiver[_T] = receiver
    """The receiver that was selected."""

    self._value: _T | _EmptyResult = _EmptyResult()
    """The value that was received.

    If there was an exception while receiving the value, then this will be `None`.
    """
    self._exception: Exception | None = None
    """The exception that was raised while receiving the value (if any)."""

    try:
        self._value = receiver.consume()
    except Exception as exc:  # pylint: disable=broad-except
        self._exception = exc

    self._handled: bool = False
    """Flag to indicate if this selected has been handled in the if-chain."""
__repr__ ¤
__repr__() -> str

Return a the internal representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/channels/_select.py
def __repr__(self) -> str:
    """Return a the internal representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return (
        f"{type(self).__name__}({self._recv=}, {self._value=}, "
        f"{self._exception=}, {self._handled=})"
    )
__str__ ¤
__str__() -> str

Return a string representation of this instance.

RETURNS DESCRIPTION
str

A string representation of this instance.

Source code in src/frequenz/channels/_select.py
def __str__(self) -> str:
    """Return a string representation of this instance.

    Returns:
        A string representation of this instance.
    """
    return (
        f"{type(self).__name__}({self._recv}) -> "
        f"{self._exception or self._value})"
    )
was_stopped ¤
was_stopped() -> bool

Check if the selected receiver was stopped.

Check if the selected receiver raised a ReceiverStoppedError while consuming a value.

RETURNS DESCRIPTION
bool

Whether the receiver was stopped.

Source code in src/frequenz/channels/_select.py
def was_stopped(self) -> bool:
    """Check if the selected receiver was stopped.

    Check if the selected receiver raised
    a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] while
    consuming a value.

    Returns:
        Whether the receiver was stopped.
    """
    return isinstance(self._exception, ReceiverStoppedError)

frequenz.channels.Sender ¤

Bases: ABC, Generic[_T]

An entity that sends values to a channel.

Source code in src/frequenz/channels/_sender.py
class Sender(ABC, Generic[_T]):
    """An entity that sends values to a *channel*."""

    @abstractmethod
    async def send(self, msg: _T) -> None:
        """Send a message to the channel.

        Args:
            msg: The message to be sent.

        Raises:
            SenderError: if there was an error sending the message.
        """
Functions¤
send abstractmethod async ¤
send(msg: _T) -> None

Send a message to the channel.

PARAMETER DESCRIPTION
msg

The message to be sent.

TYPE: _T

RAISES DESCRIPTION
SenderError

if there was an error sending the message.

Source code in src/frequenz/channels/_sender.py
@abstractmethod
async def send(self, msg: _T) -> None:
    """Send a message to the channel.

    Args:
        msg: The message to be sent.

    Raises:
        SenderError: if there was an error sending the message.
    """

frequenz.channels.SenderError ¤

Bases: Error, Generic[_T]

An error produced in a Sender.

All exceptions generated by senders inherit from this exception.

Source code in src/frequenz/channels/_sender.py
class SenderError(Error, Generic[_T]):
    """An error produced in a [Sender][frequenz.channels.Sender].

    All exceptions generated by senders inherit from this exception.
    """

    def __init__(self, message: str, sender: Sender[_T]):
        """Create an instance.

        Args:
            message: An error message.
            sender: The [Sender][frequenz.channels.Sender] where the error
                happened.
        """
        super().__init__(message)
        self.sender: Sender[_T] = sender
        """The sender where the error happened."""
Attributes¤
sender instance-attribute ¤
sender: Sender[_T] = sender

The sender where the error happened.

Functions¤
__init__ ¤
__init__(message: str, sender: Sender[_T])

Create an instance.

PARAMETER DESCRIPTION
message

An error message.

TYPE: str

sender

The Sender where the error happened.

TYPE: Sender[_T]

Source code in src/frequenz/channels/_sender.py
def __init__(self, message: str, sender: Sender[_T]):
    """Create an instance.

    Args:
        message: An error message.
        sender: The [Sender][frequenz.channels.Sender] where the error
            happened.
    """
    super().__init__(message)
    self.sender: Sender[_T] = sender
    """The sender where the error happened."""

frequenz.channels.UnhandledSelectedError ¤

Bases: SelectError, Generic[_T]

A receiver was not handled in a select() loop.

This exception is raised when a select() iteration finishes without a call to selected_from() for the selected receiver.

Source code in src/frequenz/channels/_select.py
class UnhandledSelectedError(SelectError, Generic[_T]):
    """A receiver was not handled in a [`select()`][frequenz.channels.select] loop.

    This exception is raised when a `select()` iteration finishes without a call to
    [`selected_from()`][frequenz.channels.selected_from] for the selected receiver.
    """

    def __init__(self, selected: Selected[_T]) -> None:
        """Create a new instance.

        Args:
            selected: The selected receiver that was not handled.
        """
        recv = selected._recv  # pylint: disable=protected-access
        super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
        self.selected: Selected[_T] = selected
        """The selected receiver that was not handled."""
Attributes¤
selected instance-attribute ¤
selected: Selected[_T] = selected

The selected receiver that was not handled.

Functions¤
__init__ ¤
__init__(selected: Selected[_T]) -> None

Create a new instance.

PARAMETER DESCRIPTION
selected

The selected receiver that was not handled.

TYPE: Selected[_T]

Source code in src/frequenz/channels/_select.py
def __init__(self, selected: Selected[_T]) -> None:
    """Create a new instance.

    Args:
        selected: The selected receiver that was not handled.
    """
    recv = selected._recv  # pylint: disable=protected-access
    super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
    self.selected: Selected[_T] = selected
    """The selected receiver that was not handled."""

Functions¤

frequenz.channels.merge ¤

merge(*receivers: Receiver[_T]) -> Merger[_T]

Merge messages coming from multiple receivers into a single stream.

Example

For example, if there are two channel receivers with the same type, they can be awaited together, and their results merged into a single stream like this:

from frequenz.channels import Broadcast

channel1 = Broadcast[int](name="input-chan-1")
channel2 = Broadcast[int](name="input-chan-2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

async for msg in merge(receiver1, receiver2):
    print(f"received {msg}")
PARAMETER DESCRIPTION
*receivers

The receivers to merge.

TYPE: Receiver[_T] DEFAULT: ()

RETURNS DESCRIPTION
Merger[_T]

A receiver that merges the messages coming from multiple receivers into a single stream.

RAISES DESCRIPTION
ValueError

if no receivers are provided.

Source code in src/frequenz/channels/_merge.py
def merge(*receivers: Receiver[_T]) -> Merger[_T]:
    """Merge messages coming from multiple receivers into a single stream.

    Example:
        For example, if there are two channel receivers with the same type,
        they can be awaited together, and their results merged into a single
        stream like this:

        ```python
        from frequenz.channels import Broadcast

        channel1 = Broadcast[int](name="input-chan-1")
        channel2 = Broadcast[int](name="input-chan-2")
        receiver1 = channel1.new_receiver()
        receiver2 = channel2.new_receiver()

        async for msg in merge(receiver1, receiver2):
            print(f"received {msg}")
        ```

    Args:
        *receivers: The receivers to merge.

    Returns:
        A receiver that merges the messages coming from multiple receivers into a
            single stream.

    Raises:
        ValueError: if no receivers are provided.
    """
    if not receivers:
        raise ValueError("At least one receiver must be provided")

    return Merger(*receivers, name="merge")

frequenz.channels.select async ¤

select(
    *receivers: Receiver[Any],
) -> AsyncIterator[Selected[Any]]

Iterate over the values of all receivers as they receive new values.

This function is used to iterate over the values of all receivers as they receive new values. It is used in conjunction with the Selected class and the selected_from() function to determine which function to determine which receiver was selected in a select operation.

An exhaustiveness check is performed at runtime to make sure all selected receivers are handled in the if-chain, so you should call selected_from() with all the receivers passed to select() inside the select loop, even if you plan to ignore a value, to signal select() that you are purposefully ignoring the value.

Note

The select() function is intended to be used in cases where the set of receivers is static and known beforehand. If you need to dynamically add/remove receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you:

  • Use merge(): this is useful when you have an unknown number of receivers of the same type that can be handled as a group.
  • Use tasks to manage each receiver individually: this is better if there are no relationships between the receivers.
  • Break the select() loop and start a new one with the new set of receivers (this should be the last resort, as it has some performance implications because the loop needs to be restarted).
Example
import datetime
from typing import assert_never

from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.timer import Timer

timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))

async for selected in select(timer1, timer2):
    if selected_from(selected, timer1):
        # Beware: `selected.value` might raise an exception, you can always
        # check for exceptions with `selected.exception` first or use
        # a try-except block. You can also quickly check if the receiver was
        # stopped and let any other unexpected exceptions bubble up.
        if selected.was_stopped:
            print("timer1 was stopped")
            continue
        print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
        timer2.stop()
    elif selected_from(selected, timer2):
        # Explicitly handling of exceptions
        match selected.exception:
            case ReceiverStoppedError():
                print("timer2 was stopped")
            case Exception() as exception:
                print(f"timer2: exception={exception}")
            case None:
                # All good, no exception, we can use `selected.value` safely
                print(f"timer2: now={datetime.datetime.now()} drift={selected.value}")
            case _ as unhanded:
                assert_never(unhanded)
    else:
        # This is not necessary, as select() will check for exhaustiveness, but
        # it is good practice to have it in case you forgot to handle a new
        # receiver added to `select()` at a later point in time.
        assert False
PARAMETER DESCRIPTION
*receivers

The receivers to select from.

TYPE: Receiver[Any] DEFAULT: ()

YIELDS DESCRIPTION
AsyncIterator[Selected[Any]]

The currently selected item.

RAISES DESCRIPTION
UnhandledSelectedError

If a selected receiver was not handled in the if-chain.

SelectErrorGroup

If there is an error while finishing the select operation and receivers fail while cleaning up.

SelectError

If there is an error while selecting receivers during normal operation. For example if a receiver raises an exception in the ready() method. Normal errors while receiving values are not raised, but reported via the Selected instance.

Source code in src/frequenz/channels/_select.py
async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
    """Iterate over the values of all receivers as they receive new values.

    This function is used to iterate over the values of all receivers as they receive
    new values.  It is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class and the
    [`selected_from()`][frequenz.channels.selected_from] function to determine
    which function to determine which receiver was selected in a select operation.

    An exhaustiveness check is performed at runtime to make sure all selected receivers
    are handled in the if-chain, so you should call `selected_from()` with all the
    receivers passed to `select()` inside the select loop, even if you plan to ignore
    a value, to signal `select()` that you are purposefully ignoring the value.

    Note:
        The `select()` function is intended to be used in cases where the set of
        receivers is static and known beforehand.  If you need to dynamically add/remove
        receivers from a select loop, there are a few alternatives.  Depending on your
        use case, one or the other could work better for you:

        * Use [`merge()`][frequenz.channels.merge]: this is useful when you have an
          unknown number of receivers of the same type that can be handled as a group.
        * Use tasks to manage each receiver individually: this is better if there are no
          relationships between the receivers.
        * Break the `select()` loop and start a new one with the new set of receivers
          (this should be the last resort, as it has some performance implications
           because the loop needs to be restarted).

    Example:
        ```python
        import datetime
        from typing import assert_never

        from frequenz.channels import ReceiverStoppedError, select, selected_from
        from frequenz.channels.timer import Timer

        timer1 = Timer.periodic(datetime.timedelta(seconds=1))
        timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))

        async for selected in select(timer1, timer2):
            if selected_from(selected, timer1):
                # Beware: `selected.value` might raise an exception, you can always
                # check for exceptions with `selected.exception` first or use
                # a try-except block. You can also quickly check if the receiver was
                # stopped and let any other unexpected exceptions bubble up.
                if selected.was_stopped:
                    print("timer1 was stopped")
                    continue
                print(f"timer1: now={datetime.datetime.now()} drift={selected.value}")
                timer2.stop()
            elif selected_from(selected, timer2):
                # Explicitly handling of exceptions
                match selected.exception:
                    case ReceiverStoppedError():
                        print("timer2 was stopped")
                    case Exception() as exception:
                        print(f"timer2: exception={exception}")
                    case None:
                        # All good, no exception, we can use `selected.value` safely
                        print(f"timer2: now={datetime.datetime.now()} drift={selected.value}")
                    case _ as unhanded:
                        assert_never(unhanded)
            else:
                # This is not necessary, as select() will check for exhaustiveness, but
                # it is good practice to have it in case you forgot to handle a new
                # receiver added to `select()` at a later point in time.
                assert False
        ```

    Args:
        *receivers: The receivers to select from.

    Yields:
        The currently selected item.

    Raises:
        UnhandledSelectedError: If a selected receiver was not handled in the if-chain.
        SelectErrorGroup: If there is an error while finishing the select operation and
            receivers fail while cleaning up.
        SelectError: If there is an error while selecting receivers during normal
            operation.  For example if a receiver raises an exception in the `ready()`
            method.  Normal errors while receiving values are not raised, but reported
            via the `Selected` instance.
    """
    receivers_map: dict[str, Receiver[Any]] = {str(hash(r)): r for r in receivers}
    pending: set[asyncio.Task[bool]] = set()

    try:
        for name, recv in receivers_map.items():
            pending.add(asyncio.create_task(recv.ready(), name=name))

        while pending:
            done, pending = await asyncio.wait(
                pending, return_when=asyncio.FIRST_COMPLETED
            )

            for task in done:
                receiver_active: bool = True
                name = task.get_name()
                recv = receivers_map[name]
                if exception := task.exception():
                    match exception:
                        case asyncio.CancelledError():
                            # If the receiver was cancelled, then it means we want to
                            # exit the select loop, so we handle the receiver but we
                            # don't add it back to the pending list.
                            receiver_active = False
                        case _ as exc:
                            raise SelectError(f"Error while selecting {recv}") from exc

                selected = Selected(recv)
                yield selected
                if not selected._handled:  # pylint: disable=protected-access
                    raise UnhandledSelectedError(selected)

                receiver_active = task.result()
                if not receiver_active:
                    continue

                # Add back the receiver to the pending list
                name = task.get_name()
                recv = receivers_map[name]
                pending.add(asyncio.create_task(recv.ready(), name=name))
    finally:
        await _stop_pending_tasks(pending)

frequenz.channels.selected_from ¤

selected_from(
    selected: Selected[Any], receiver: Receiver[_T]
) -> TypeGuard[Selected[_T]]

Check if the given receiver was selected by select().

This function is used in conjunction with the Selected class to determine which receiver was selected in select() iteration.

It also works as a type guard to narrow the type of the Selected instance to the type of the receiver.

Please see select() for an example.

PARAMETER DESCRIPTION
selected

The result of a select() iteration.

TYPE: Selected[Any]

receiver

The receiver to check if it was the source of a select operation.

TYPE: Receiver[_T]

RETURNS DESCRIPTION
TypeGuard[Selected[_T]]

Whether the given receiver was selected.

Source code in src/frequenz/channels/_select.py
def selected_from(
    selected: Selected[Any], receiver: Receiver[_T]
) -> TypeGuard[Selected[_T]]:
    """Check if the given receiver was selected by [`select()`][frequenz.channels.select].

    This function is used in conjunction with the
    [`Selected`][frequenz.channels.Selected] class to determine which receiver was
    selected in `select()` iteration.

    It also works as a [type guard][typing.TypeGuard] to narrow the type of the
    `Selected` instance to the type of the receiver.

    Please see [`select()`][frequenz.channels.select] for an example.

    Args:
        selected: The result of a `select()` iteration.
        receiver: The receiver to check if it was the source of a select operation.

    Returns:
        Whether the given receiver was selected.
    """
    if handled := selected._recv is receiver:  # pylint: disable=protected-access
        selected._handled = True  # pylint: disable=protected-access
    return handled