Skip to content

event

frequenz.channels.event ¤

A receiver that can be made ready directly.

Tip

Read the Event documentation for more information.

This module contains the following:

  • Event: A receiver that can be made ready directly.

Classes¤

frequenz.channels.event.Event ¤

Bases: Receiver[None]

A receiver that can be made ready directly.

Usage¤

There are cases where it is useful to be able to send a signal to a select() loop, for example, to stop a loop from outside the loop itself.

To do that, you can use an Event receiver and call set() on it when you want to make it ready.

Stopping¤

The receiver will be re-activated (will keep blocking) after the current set event is received. To stop the receiver completely, you can call stop().

Example¤
Exit after printing the first 5 numbers
import asyncio

from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.event import Event

channel: Anycast[int] = Anycast(name="channel")
receiver = channel.new_receiver()
sender = channel.new_sender()
stop_event = Event(name="stop")


async def do_work() -> None:
    async for selected in select(receiver, stop_event):
        if selected_from(selected, receiver):
            print(selected.value)
        elif selected_from(selected, stop_event):
            print("Stop event triggered")
            stop_event.stop()
            break


async def send_stuff() -> None:
    for i in range(10):
        if stop_event.is_stopped:
            break
        await asyncio.sleep(1)
        await sender.send(i)


async def main() -> None:
    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(do_work(), name="do_work")
        task_group.create_task(send_stuff(), name="send_stuff")
        await asyncio.sleep(5.5)
        stop_event.set()


asyncio.run(main())
Source code in src/frequenz/channels/event.py
class Event(_receiver.Receiver[None]):
    """A receiver that can be made ready directly.

    # Usage

    There are cases where it is useful to be able to send a signal to
    a [`select()`][frequenz.channels.select] loop, for example, to stop a loop from
    outside the loop itself.

    To do that, you can use an [`Event`][frequenz.channels.event.Event] receiver and
    call [`set()`][frequenz.channels.event.Event.set] on it when you want to make it
    ready.

    # Stopping

    The receiver will be re-activated (will keep blocking) after the current set
    event is received. To stop the receiver completely, you can call
    [`stop()`][frequenz.channels.event.Event.stop].

    # Example

    Example: Exit after printing the first 5 numbers
        ```python
        import asyncio

        from frequenz.channels import Anycast, select, selected_from
        from frequenz.channels.event import Event

        channel: Anycast[int] = Anycast(name="channel")
        receiver = channel.new_receiver()
        sender = channel.new_sender()
        stop_event = Event(name="stop")


        async def do_work() -> None:
            async for selected in select(receiver, stop_event):
                if selected_from(selected, receiver):
                    print(selected.value)
                elif selected_from(selected, stop_event):
                    print("Stop event triggered")
                    stop_event.stop()
                    break


        async def send_stuff() -> None:
            for i in range(10):
                if stop_event.is_stopped:
                    break
                await asyncio.sleep(1)
                await sender.send(i)


        async def main() -> None:
            async with asyncio.TaskGroup() as task_group:
                task_group.create_task(do_work(), name="do_work")
                task_group.create_task(send_stuff(), name="send_stuff")
                await asyncio.sleep(5.5)
                stop_event.set()


        asyncio.run(main())
        ```
    """

    def __init__(self, *, name: str | None = None) -> None:
        """Create a new instance.

        Args:
            name: The name of the receiver.  If `None` an `id(self)`-based name will be
                used. This is only for debugging purposes, it will be shown in the
                string representation of the receiver.
        """
        self._event: _asyncio.Event = _asyncio.Event()
        """The event that is set when the receiver is ready."""

        self._name: str = f"{id(self):_}" if name is None else name
        """The name of the receiver.

        This is for debugging purposes, it will be shown in the string representation
        of the receiver.
        """

        self._is_set: bool = False
        """Whether the receiver is ready to be consumed.

        This is used to differentiate between when the receiver was stopped (the event
        is triggered too) but still there is an event to be consumed and when it was
        stopped but was not explicitly set().
        """

        self._is_stopped: bool = False
        """Whether the receiver is stopped."""

    @property
    def name(self) -> str:
        """The name of this receiver.

        This is for debugging purposes, it will be shown in the string representation
        of this receiver.

        Returns:
            The name of this receiver.
        """
        return self._name

    @property
    def is_set(self) -> bool:
        """Whether this receiver is set (ready).

        Returns:
            Whether this receiver is set (ready).
        """
        return self._is_set

    @property
    def is_stopped(self) -> bool:
        """Whether this receiver is stopped.

        Returns:
            Whether this receiver is stopped.
        """
        return self._is_stopped

    def stop(self) -> None:
        """Stop this receiver."""
        self._is_stopped = True
        self._event.set()

    def set(self) -> None:
        """Trigger the event (make the receiver ready)."""
        self._is_set = True
        self._event.set()

    async def ready(self) -> bool:
        """Wait until this receiver is ready.

        Returns:
            Whether this receiver is still running.
        """
        if self._is_stopped:
            return False
        await self._event.wait()
        return not self._is_stopped

    def consume(self) -> None:
        """Consume the event.

        This makes this receiver wait again until the event is set again.

        Raises:
            ReceiverStoppedError: If this receiver is stopped.
        """
        if not self._is_set and self._is_stopped:
            raise _receiver.ReceiverStoppedError(self)

        assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

        self._is_set = False
        self._event.clear()

    def __str__(self) -> str:
        """Return a string representation of this receiver.

        Returns:
            A string representation of this receiver.
        """
        return f"{type(self).__name__}({self._name!r})"

    def __repr__(self) -> str:
        """Return a string representation of this receiver.

        Returns:
            A string representation of this receiver.
        """
        return (
            f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
            f"is_stopped={self.is_stopped!r}>"
        )
Attributes¤
is_set property ¤
is_set: bool

Whether this receiver is set (ready).

RETURNS DESCRIPTION
bool

Whether this receiver is set (ready).

is_stopped property ¤
is_stopped: bool

Whether this receiver is stopped.

RETURNS DESCRIPTION
bool

Whether this receiver is stopped.

name property ¤
name: str

The name of this receiver.

This is for debugging purposes, it will be shown in the string representation of this receiver.

RETURNS DESCRIPTION
str

The name of this receiver.

Functions¤
__aiter__ ¤
__aiter__() -> Self

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Self

self, since no extra setup is needed for the iterator.

Source code in src/frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__ async ¤
__anext__() -> _T

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
_T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def __anext__(self) -> _T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__init__ ¤
__init__(*, name: str | None = None) -> None

Create a new instance.

PARAMETER DESCRIPTION
name

The name of the receiver. If None an id(self)-based name will be used. This is only for debugging purposes, it will be shown in the string representation of the receiver.

TYPE: str | None DEFAULT: None

Source code in src/frequenz/channels/event.py
def __init__(self, *, name: str | None = None) -> None:
    """Create a new instance.

    Args:
        name: The name of the receiver.  If `None` an `id(self)`-based name will be
            used. This is only for debugging purposes, it will be shown in the
            string representation of the receiver.
    """
    self._event: _asyncio.Event = _asyncio.Event()
    """The event that is set when the receiver is ready."""

    self._name: str = f"{id(self):_}" if name is None else name
    """The name of the receiver.

    This is for debugging purposes, it will be shown in the string representation
    of the receiver.
    """

    self._is_set: bool = False
    """Whether the receiver is ready to be consumed.

    This is used to differentiate between when the receiver was stopped (the event
    is triggered too) but still there is an event to be consumed and when it was
    stopped but was not explicitly set().
    """

    self._is_stopped: bool = False
    """Whether the receiver is stopped."""
__repr__ ¤
__repr__() -> str

Return a string representation of this receiver.

RETURNS DESCRIPTION
str

A string representation of this receiver.

Source code in src/frequenz/channels/event.py
def __repr__(self) -> str:
    """Return a string representation of this receiver.

    Returns:
        A string representation of this receiver.
    """
    return (
        f"<{type(self).__name__} name={self._name!r} is_set={self.is_set!r} "
        f"is_stopped={self.is_stopped!r}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of this receiver.

RETURNS DESCRIPTION
str

A string representation of this receiver.

Source code in src/frequenz/channels/event.py
def __str__(self) -> str:
    """Return a string representation of this receiver.

    Returns:
        A string representation of this receiver.
    """
    return f"{type(self).__name__}({self._name!r})"
consume ¤
consume() -> None

Consume the event.

This makes this receiver wait again until the event is set again.

RAISES DESCRIPTION
ReceiverStoppedError

If this receiver is stopped.

Source code in src/frequenz/channels/event.py
def consume(self) -> None:
    """Consume the event.

    This makes this receiver wait again until the event is set again.

    Raises:
        ReceiverStoppedError: If this receiver is stopped.
    """
    if not self._is_set and self._is_stopped:
        raise _receiver.ReceiverStoppedError(self)

    assert self._is_set, "calls to `consume()` must be follow a call to `ready()`"

    self._is_set = False
    self._event.clear()
map ¤
map(call: Callable[[_T], _U]) -> Receiver[_U]

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[_T], _U]

RETURNS DESCRIPTION
Receiver[_U]

A Receiver to read results of the given function from.

Source code in src/frequenz/channels/_receiver.py
def map(self, call: Callable[[_T], _U]) -> Receiver[_U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
ready async ¤
ready() -> bool

Wait until this receiver is ready.

RETURNS DESCRIPTION
bool

Whether this receiver is still running.

Source code in src/frequenz/channels/event.py
async def ready(self) -> bool:
    """Wait until this receiver is ready.

    Returns:
        Whether this receiver is still running.
    """
    if self._is_stopped:
        return False
    await self._event.wait()
    return not self._is_stopped
receive async ¤
receive() -> _T

Receive a message from the channel.

RETURNS DESCRIPTION
_T

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def receive(self) -> _T:
    """Receive a message from the channel.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received
set ¤
set() -> None

Trigger the event (make the receiver ready).

Source code in src/frequenz/channels/event.py
def set(self) -> None:
    """Trigger the event (make the receiver ready)."""
    self._is_set = True
    self._event.set()
stop ¤
stop() -> None

Stop this receiver.

Source code in src/frequenz/channels/event.py
def stop(self) -> None:
    """Stop this receiver."""
    self._is_stopped = True
    self._event.set()