Skip to content

Selecting¤

Select the first among multiple Receivers.

Usage¤

If you need to receiver different types of messages from different receivers, you need to know the source of a particular received value to know the type of the value.

select() allows you to do that. It is an async iterator that will iterate over the values of all receivers as they receive new values.

It yields a Selected object that will tell you the source of the received message. To make sure the received value is casted to the correct type, you need to use the selected_from() function to check the source of the message, and the value attribute to access the message:

from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from

channel1: Anycast[int] = Anycast(name="channel1")
channel2: Anycast[str] = Anycast(name="channel2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

async for selected in select(receiver1, receiver2):
    if selected_from(selected, receiver1):
        print(f"Received from receiver1, next number: {selected.value + 1}")
    elif selected_from(selected, receiver2):
        print(f"Received from receiver2, length: {len(selected.value)}")
    else:
        assert False, "Unknown source, this should never happen"
Tip

To prevent common bugs, like when a new receiver is added to the select loop but the handling code is forgotten, select() will check that all the selected receivers are handled in the if-chain.

If this happens, it will raise an UnhandledSelectedError exception.

Not handling a receiver is considered a programming error. Because of this, the exception is a subclass of BaseException instead of Exception. This means that it will not be caught by except Exception blocks.

If for some reason you want to ignore a received value, just add the receiver to the if-chain and do nothing with the value:

from frequenz.channels import Anycast, select, selected_from

channel1: Anycast[int] = Anycast(name="channel1")
channel2: Anycast[str] = Anycast(name="channel2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

async for selected in select(receiver1, receiver2):
    if selected_from(selected, receiver1):
        continue
    if selected_from(selected, receiver2):
        print(f"Received from receiver2, length: {len(selected.value)}")

Stopping¤

The select() async iterator will stop as soon as all the receivers are stopped. You can also end the iteration early by breaking out of the loop as normal.

When a single receiver is stopped, it will be reported via the Selected object. You can use the was_stopped() method to check if the selected receiver was stopped:

from frequenz.channels import Anycast, select, selected_from

channel1: Anycast[int] = Anycast(name="channel1")
channel2: Anycast[str] = Anycast(name="channel2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

async for selected in select(receiver1, receiver2):
    if selected_from(selected, receiver1):
        if selected.was_stopped():
            print("receiver1 was stopped")
            continue
        print(f"Received from receiver1, the next number is: {selected.value + 1}")
    # ...
Tip

The was_stopped() method is a convenience method that is equivalent to checking if the exception attribute is an instance of ReceiverStoppedError.

Error Handling¤

Tip

For more information about handling errors, please refer to the Error Handling section of the user guide.

If a receiver raises an exception while receiving a value, the exception will be raised by the value attribute of the Selected object.

You can use a try-except block to handle exceptions as usual:

from frequenz.channels import Anycast, ReceiverStoppedError, select, selected_from

channel1: Anycast[int] = Anycast(name="channel1")
channel2: Anycast[str] = Anycast(name="channel2")
receiver1 = channel1.new_receiver()
receiver2 = channel2.new_receiver()

async for selected in select(receiver1, receiver2):
    if selected_from(selected, receiver1):
        try:
            print(f"Received from receiver1, next number: {selected.value + 1}")
        except ReceiverStoppedError:
            print("receiver1 was stopped")
        except ValueError as value_error:
            print(f"receiver1 raised a ValueError: {value_error}")
        # ...
    # ...

The Selected object also has a exception attribute that will contain the exception that was raised by the receiver.