Anycast¤
A channel that delivers each message to exactly one receiver.
Description¤
Anycast channels support multiple senders and multiple receivers. Each message sent through any of the senders will be received by exactly one receiver.
Characteristics
- Buffered: Yes, with a global channel buffer
- Buffer full policy: Block senders
- Multiple receivers: Yes
- Multiple senders: Yes
- Thread-safe: No
This channel is buffered, and if the senders are faster than the receivers, then the
channel's buffer will fill up. In that case, the senders will block at the
send()
method until the receivers consume the
messages in the channel's buffer. The channel's buffer size can be configured at
creation time via the limit
argument.
To create a new senders and
receivers you can use the
new_sender()
and
new_receiver()
methods
respectively.
When the channel is not needed anymore, it should be closed with the
close()
method. This will prevent further
attempts to send()
data. Receivers will still be
able to drain the pending values on the channel, but after that, subsequent
receive()
calls will raise a
ReceiverStoppedError
exception.
This channel is useful, for example, to distribute work across multiple workers.
In cases where each message need to be received by every receiver, a broadcast channel may be used.
Examples¤
Send a few numbers to a receiver
This is a very simple example that sends a few numbers from a single sender to a single receiver.
import asyncio
from frequenz.channels import Anycast, Sender
async def send(sender: Sender[int]) -> None:
for msg in range(3):
print(f"sending {msg}")
await sender.send(msg)
async def main() -> None:
channel = Anycast[int](name="numbers")
sender = channel.new_sender()
receiver = channel.new_receiver()
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
msg = await receiver.receive()
print(f"received {msg}")
await asyncio.sleep(0.1) # sleep (or work) with the data
asyncio.run(main())
The output should look something like (although the sending and received might appear more interleaved):
Send a few number from multiple senders to multiple receivers
This is a more complex example that sends a few numbers from multiple senders to multiple receivers, using a small buffer to force the senders to block.
import asyncio
from frequenz.channels import Anycast, Receiver, ReceiverStoppedError, Sender
async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for msg in range(start, stop):
print(f"{name} sending {msg}")
await sender.send(msg)
async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for msg in receiver:
print(f"{name} received {msg}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
async def main() -> None:
acast = Anycast[int](name="numbers", limit=2)
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send("sender_1", acast.new_sender(), 10, 13))
task_group.create_task(send("sender_2", acast.new_sender(), 20, 22))
task_group.create_task(recv("receiver_1", acast.new_receiver()))
task_group.create_task(recv("receiver_2", acast.new_receiver()))
asyncio.run(main())
The output should look something like this(although the sending and received might appear interleaved in a different way):
sender_1 sending 10
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21