Skip to content

Anycast¤

A channel that delivers each message to exactly one receiver.

Description¤

Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver.

Receiver Receiver msg1 msg1 Sender Channel Sender msg2 msg2

Characteristics

  • Buffered: Yes, with a global channel buffer
  • Buffer full policy: Block senders
  • Multiple receivers: Yes
  • Multiple senders: Yes
  • Thread-safe: No

This channel is buffered, and if the senders are faster than the receivers, then the channel's buffer will fill up. In that case, the senders will block at the send() method until the receivers consume the messages in the channel's buffer. The channel's buffer size can be configured at creation time via the limit argument.

To create a new senders and receivers you can use the new_sender() and new_receiver() methods respectively.

When the channel is not needed anymore, it should be closed with the close() method. This will prevent further attempts to send() data. Receivers will still be able to drain the pending values on the channel, but after that, subsequent receive() calls will raise a ReceiverStoppedError exception.

This channel is useful, for example, to distribute work across multiple workers.

In cases where each message need to be received by every receiver, a broadcast channel may be used.

Examples¤

Send a few numbers to a receiver

This is a very simple example that sends a few numbers from a single sender to a single receiver.

import asyncio

from frequenz.channels import Anycast, Sender


async def send(sender: Sender[int]) -> None:
    for msg in range(3):
        print(f"sending {msg}")
        await sender.send(msg)


async def main() -> None:
    channel = Anycast[int](name="numbers")

    sender = channel.new_sender()
    receiver = channel.new_receiver()

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send(sender))
        for _ in range(3):
            msg = await receiver.receive()
            print(f"received {msg}")
            await asyncio.sleep(0.1)  # sleep (or work) with the data


asyncio.run(main())

The output should look something like (although the sending and received might appear more interleaved):

sending 0
sending 1
sending 2
received 0
received 1
received 2
Send a few number from multiple senders to multiple receivers

This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.

import asyncio

from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
    for msg in range(start, stop):
        print(f"{name} sending {msg}")
        await sender.send(msg)


async def recv(name: str, receiver: Receiver[int]) -> None:
    try:
        async for msg in receiver:
            print(f"{name} received {msg}")
        await asyncio.sleep(0.1)  # sleep (or work) with the data
    except ReceiverStoppedError:
        pass


async def main() -> None:
    acast = Anycast[int](name="numbers", limit=2)

    async with asyncio.TaskGroup() as task_group:
        task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
        task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
        task_group.create_task(recv("receiver_1", acast.new_receiver()))
        task_group.create_task(recv("receiver_2", acast.new_receiver()))


asyncio.run(main())

The output should look something like this(although the sending and received might appear interleaved in a different way):

sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21