Skip to content

timer

frequenz.channels.timer ¤

A receiver that sends a message regularly.

Quick Start¤

If you need to do something as periodically as possible (avoiding drifts), you can use use a periodic() timer.

Periodic Timer
import asyncio
from datetime import datetime, timedelta

from frequenz.channels.timer import Timer


async def main() -> None:
    async for drift in Timer.periodic(timedelta(seconds=1.0)):
        print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")


asyncio.run(main())

If, instead, you need a timeout, for example to abort waiting for other receivers after a certain amount of time, you can use a timeout() timer.

Timeout
import asyncio
from datetime import timedelta

from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.timer import Timer


async def main() -> None:
    channel = Anycast[int](name="data-channel")
    data_receiver = channel.new_receiver()

    timer = Timer.timeout(timedelta(seconds=1.0))

    async for selected in select(data_receiver, timer):
        if selected_from(selected, data_receiver):
            print(f"Received data: {selected.value}")
            timer.reset()
        elif selected_from(selected, timer):
            drift = selected.value
            print(f"No data received for {timer.interval + drift} seconds, giving up")
            break


asyncio.run(main())

This timer will rearm itself automatically after it was triggered, so it will trigger again after the selected interval, no matter what the current drift was.

Tip

It is extremely important to understand how timers behave when they are delayed, we recommned emphatically to read about missed ticks and drifting before using timers in production.

Missed Ticks And Drifting¤

A Timers can be used to send a messages at regular time intervals, but there is one fundamental issue with timers in the asyncio world: the event loop could give control to another task at any time, and that task can take a long time to finish, making the time it takes the next timer message to be received longer than the desired interval.

Because of this, it is very important for users to be able to understand and control how timers behave when they are delayed. Timers will handle missed ticks according to a missing tick policy.

The following built-in policies are available:

  • SkipMissedAndDrift: A policy that drops all the missed ticks, triggers immediately and resets.
  • SkipMissedAndResync: A policy that drops all the missed ticks, triggers immediately and resyncs.
  • TriggerAllMissed: A policy that triggers all the missed ticks immediately until it catches up.

Classes¤

frequenz.channels.timer.MissedTickPolicy ¤

Bases: ABC

A policy to handle timer missed ticks.

To implement a custom policy you need to subclass MissedTickPolicy and implement the calculate_next_tick_time method.

Example

This policy will just wait one more second than the original interval if a tick is missed:

class WaitOneMoreSecond(MissedTickPolicy):
    def calculate_next_tick_time(
        self, *, interval: int, scheduled_tick_time: int, now: int
    ) -> int:
        return scheduled_tick_time + interval + 1_000_000


async def main() -> None:
    timer = Timer(
        interval=timedelta(seconds=1),
        missed_tick_policy=WaitOneMoreSecond(),
    )

    async for drift in timer:
        print(f"The timer has triggered with a drift of {drift}")

asyncio.run(main())
Source code in src/frequenz/channels/timer.py
class MissedTickPolicy(abc.ABC):
    """A policy to handle timer missed ticks.

    To implement a custom policy you need to subclass
    [`MissedTickPolicy`][frequenz.channels.timer.MissedTickPolicy] and implement the
    [`calculate_next_tick_time`][frequenz.channels.timer.MissedTickPolicy.calculate_next_tick_time]
    method.

    Example:
        This policy will just wait one more second than the original interval if a
        tick is missed:

        ```python
        class WaitOneMoreSecond(MissedTickPolicy):
            def calculate_next_tick_time(
                self, *, interval: int, scheduled_tick_time: int, now: int
            ) -> int:
                return scheduled_tick_time + interval + 1_000_000


        async def main() -> None:
            timer = Timer(
                interval=timedelta(seconds=1),
                missed_tick_policy=WaitOneMoreSecond(),
            )

            async for drift in timer:
                print(f"The timer has triggered with a drift of {drift}")

        asyncio.run(main())
        ```
    """

    @abc.abstractmethod
    def calculate_next_tick_time(
        self, *, interval: int, scheduled_tick_time: int, now: int
    ) -> int:
        """Calculate the next tick time according to `missed_tick_policy`.

        This method is called by `ready()` after it has determined that the
        timer has triggered.  It will check if the timer has missed any ticks
        and handle them according to `missed_tick_policy`.

        Args:
            interval: The interval between ticks (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            now: The current loop time (in microseconds).

        Returns:
            The next tick time (in microseconds) according to
                `missed_tick_policy`.
        """

    def __repr__(self) -> str:
        """Return a string representation of the instance.

        Returns:
            The string representation of the instance.
        """
        return f"{type(self).__name__}()"
Functions¤
__repr__ ¤
__repr__() -> str

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in src/frequenz/channels/timer.py
def __repr__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}()"
calculate_next_tick_time abstractmethod ¤
calculate_next_tick_time(
    *, interval: int, scheduled_tick_time: int, now: int
) -> int

Calculate the next tick time according to missed_tick_policy.

This method is called by ready() after it has determined that the timer has triggered. It will check if the timer has missed any ticks and handle them according to missed_tick_policy.

PARAMETER DESCRIPTION
interval

The interval between ticks (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

now

The current loop time (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds) according to missed_tick_policy.

Source code in src/frequenz/channels/timer.py
@abc.abstractmethod
def calculate_next_tick_time(
    self, *, interval: int, scheduled_tick_time: int, now: int
) -> int:
    """Calculate the next tick time according to `missed_tick_policy`.

    This method is called by `ready()` after it has determined that the
    timer has triggered.  It will check if the timer has missed any ticks
    and handle them according to `missed_tick_policy`.

    Args:
        interval: The interval between ticks (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        now: The current loop time (in microseconds).

    Returns:
        The next tick time (in microseconds) according to
            `missed_tick_policy`.
    """

frequenz.channels.timer.SkipMissedAndDrift ¤

Bases: MissedTickPolicy

A policy that drops all the missed ticks, triggers immediately and resets.

The SkipMissedAndDrift policy will behave effectively as if the timer was reset every time it is triggered. This means the start time will change and the drift will be accumulated each time a tick is delayed. Only the relative drift will be returned on each tick.

The reset happens only if the delay is larger than the delay tolerance, so it is possible to ignore small delays and not drift in those cases.

Example

This example represents a timer with interval 1 second and delay tolerance of 0.1 seconds.

  1. The first tick, T0, happens exactly at time 0.

  2. The second tick, T1.2, happens at time 1.2 (0.2 seconds late), so the timer triggers immediately but drifts a bit (0.2 seconds), so the next tick is expected at 2.2 seconds.

  3. The third tick, T2.2, happens at 2.3 seconds (0.1 seconds late), so it also triggers immediately but it doesn't drift because the delay is under the delay_tolerance. The next tick is expected at 3.2 seconds.

  4. The fourth tick, T4.2, triggers at 4.3 seconds (1.1 seconds late), so it also triggers immediately but the timer has drifted by 1.1 seconds, so a potential tick T3.2 is skipped (not triggered).

  5. The fifth tick, T5.3, triggers at 5.3 seconds so it is right on time (no drift) and the same happens for tick T6.3, which triggers at 6.3 seconds.

0 1 2 3 4 5 6 T0 T1.2 T2.2 T3.2 T4.2 T5.3 T6.3 time Expected ticks Delivered ticks Undelivered ticks (skipped)

Source code in src/frequenz/channels/timer.py
class SkipMissedAndDrift(MissedTickPolicy):
    """A policy that drops all the missed ticks, triggers immediately and resets.

    The [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy will
    behave effectively as if the timer was
    [reset][frequenz.channels.timer.Timer.reset] every time it is triggered. This means
    the start time will change and the drift will be accumulated each time a tick is
    delayed. Only the relative drift will be returned on each tick.

    The reset happens only if the delay is larger than the
    [delay tolerance][frequenz.channels.timer.SkipMissedAndDrift.delay_tolerance], so it
    is possible to ignore small delays and not drift in those cases.

    Example:
        This example represents a timer with interval 1 second and delay tolerance of
        0.1 seconds.

        1. The first tick, `T0`, happens exactly at time 0.

        2. The second tick, `T1.2`, happens at time 1.2 (0.2 seconds late), so the timer
            triggers immediately but drifts a bit (0.2 seconds), so the next tick is
            expected at 2.2 seconds.

        3. The third tick, `T2.2`, happens at 2.3 seconds (0.1 seconds late), so it also
            triggers immediately but **it doesn't drift** because the delay is **under
            the `delay_tolerance`**. The next tick is expected at 3.2 seconds.

        4. The fourth tick, `T4.2`, triggers at 4.3 seconds (1.1 seconds late), so it
            also triggers immediately but the timer has drifted by 1.1 seconds, so a
            potential tick `T3.2` is skipped (not triggered).

        5. The fifth tick, `T5.3`, triggers at 5.3 seconds so it is right on time (no
            drift) and the same happens for tick `T6.3`, which triggers at 6.3 seconds.

        <center>
        ```bob
        0         1         2         3         4         5         6
        *---------o-*-------|-o*------|-O-------|-o*------|--*------|--*--> time
        T0          T1.2       T2.2     T3.2       T4.2      T5.3      T6.3

        -o- "Expected ticks"
        -*- "Delivered ticks"
        -O- "Undelivered ticks (skipped)"
        ```
        </center>
    """

    def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
        """
        Create an instance.

        See the class documentation for more details.

        Args:
            delay_tolerance: The maximum delay that is tolerated before
                starting to drift.  If a tick is delayed less than this, then
                it is not considered a missed tick and the timer doesn't
                accumulate this drift.

        Raises:
            ValueError: If `delay_tolerance` is negative.
        """
        self._tolerance: int = _to_microseconds(delay_tolerance)
        """The maximum allowed delay before starting to drift."""

        if self._tolerance < 0:
            raise ValueError("delay_tolerance must be positive")

    @property
    def delay_tolerance(self) -> timedelta:
        """Return the maximum delay that is tolerated before starting to drift.

        Returns:
            The maximum delay that is tolerated before starting to drift.
        """
        return timedelta(microseconds=self._tolerance)

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        If the drift is larger than `delay_tolerance`, then it returns `now +
        interval` (so the timer drifts), otherwise it returns
        `scheduled_tick_time + interval` (we consider the delay too small and
        avoid small drifts).

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        drift = now - scheduled_tick_time
        if drift > self._tolerance:
            return now + interval
        return scheduled_tick_time + interval

    def __str__(self) -> str:
        """Return a string representation of the instance.

        Returns:
            The string representation of the instance.
        """
        return f"{type(self).__name__}({self.delay_tolerance})"

    def __repr__(self) -> str:
        """Return a string representation of the instance.

        Returns:
            The string representation of the instance.
        """
        return f"{type(self).__name__}({self.delay_tolerance=})"
Attributes¤
delay_tolerance property ¤
delay_tolerance: timedelta

Return the maximum delay that is tolerated before starting to drift.

RETURNS DESCRIPTION
timedelta

The maximum delay that is tolerated before starting to drift.

Functions¤
__init__ ¤
__init__(*, delay_tolerance: timedelta = timedelta(0))

Create an instance.

See the class documentation for more details.

PARAMETER DESCRIPTION
delay_tolerance

The maximum delay that is tolerated before starting to drift. If a tick is delayed less than this, then it is not considered a missed tick and the timer doesn't accumulate this drift.

TYPE: timedelta DEFAULT: timedelta(0)

RAISES DESCRIPTION
ValueError

If delay_tolerance is negative.

Source code in src/frequenz/channels/timer.py
def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
    """
    Create an instance.

    See the class documentation for more details.

    Args:
        delay_tolerance: The maximum delay that is tolerated before
            starting to drift.  If a tick is delayed less than this, then
            it is not considered a missed tick and the timer doesn't
            accumulate this drift.

    Raises:
        ValueError: If `delay_tolerance` is negative.
    """
    self._tolerance: int = _to_microseconds(delay_tolerance)
    """The maximum allowed delay before starting to drift."""

    if self._tolerance < 0:
        raise ValueError("delay_tolerance must be positive")
__repr__ ¤
__repr__() -> str

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in src/frequenz/channels/timer.py
def __repr__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}({self.delay_tolerance=})"
__str__ ¤
__str__() -> str

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in src/frequenz/channels/timer.py
def __str__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}({self.delay_tolerance})"
calculate_next_tick_time ¤
calculate_next_tick_time(
    *, now: int, scheduled_tick_time: int, interval: int
) -> int

Calculate the next tick time.

If the drift is larger than delay_tolerance, then it returns now + interval (so the timer drifts), otherwise it returns scheduled_tick_time + interval (we consider the delay too small and avoid small drifts).

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in src/frequenz/channels/timer.py
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    If the drift is larger than `delay_tolerance`, then it returns `now +
    interval` (so the timer drifts), otherwise it returns
    `scheduled_tick_time + interval` (we consider the delay too small and
    avoid small drifts).

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    drift = now - scheduled_tick_time
    if drift > self._tolerance:
        return now + interval
    return scheduled_tick_time + interval

frequenz.channels.timer.SkipMissedAndResync ¤

Bases: MissedTickPolicy

A policy that drops all the missed ticks, triggers immediately and resyncs.

If ticks are missed, the SkipMissedAndResync policy will make the Timer trigger immediately and it will schedule to trigger again on the next multiple of the interval, effectively skipping any missed ticks, but re-syncing with the original start time.

Example

This example represents a timer with interval 1 second.

  1. The first tick T0 happens exactly at time 0.

  2. The second tick, T1, happens at time 1.2 (0.2 seconds late), so it triggers immediately. But it re-syncs, so the next tick is still expected at 2 seconds. This re-sync happens on every tick, so all ticks are expected at multiples of 1 second, not matter how delayed they were.

  3. The third tick, T2, happens at time 2.3 (0.3 seconds late), so it also triggers immediately.

  4. The fourth tick, T3, happens at time 4.3 (1.3 seconds late), so it also triggers immediately, but there was also a tick expected at 4 seconds, T4, which is skipped according to this policy to avoid bursts of ticks.

  5. The sixth tick, T5, happens at 5.1 (0.1 seconds late), so it triggers immediately again.

  6. The seventh tick, T6, happens at 6.0, right on time.

0 1 2 3 4 T4 5 6 T0 T1 T2 T3 T5 T6 time Expected ticks Delivered ticks Undelivered ticks (skipped)

Source code in src/frequenz/channels/timer.py
class SkipMissedAndResync(MissedTickPolicy):
    """A policy that drops all the missed ticks, triggers immediately and resyncs.

    If ticks are missed, the
    [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy will
    make the [`Timer`][frequenz.channels.timer.Timer] trigger immediately and it will
    schedule to trigger again on the next multiple of the
    [interval][frequenz.channels.timer.Timer.interval], effectively skipping any missed
    ticks, but re-syncing with the original start time.

    Example:
        This example represents a timer with interval 1 second.

        1. The first tick `T0` happens exactly at time 0.

        2. The second tick, `T1`, happens at time 1.2 (0.2 seconds late), so it triggers
            immediately. But it re-syncs, so the next tick is still expected at
            2 seconds. This re-sync happens on every tick, so all ticks are expected at
            multiples of 1 second, not matter how delayed they were.

        3. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also
            triggers immediately.

        4. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
            triggers immediately, but there was also a tick expected at 4 seconds, `T4`,
            which is skipped according to this policy to avoid bursts of ticks.

        6. The sixth tick, `T5`, happens at 5.1 (0.1 seconds late), so it triggers
            immediately again.

        7. The seventh tick, `T6`, happens at 6.0, right on time.

        <center>
        ```bob
        0         1         2         3         4   T4    5         6
        *---------o-*-------o--*------o---------o--*O-----o-*-------*-----> time
        T0          T1         T2                  T3       T5      T6

        -o- "Expected ticks"
        -*- "Delivered ticks"
        -O- "Undelivered ticks (skipped)"
        ```
        </center>
    """

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        Calculate the next multiple of `interval` after `scheduled_tick_time`.

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        # We need to resync (align) the next tick time to the current time
        drift = now - scheduled_tick_time
        delta_to_next_tick = interval - (drift % interval)
        return now + delta_to_next_tick
Functions¤
__repr__ ¤
__repr__() -> str

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in src/frequenz/channels/timer.py
def __repr__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}()"
calculate_next_tick_time ¤
calculate_next_tick_time(
    *, now: int, scheduled_tick_time: int, interval: int
) -> int

Calculate the next tick time.

Calculate the next multiple of interval after scheduled_tick_time.

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in src/frequenz/channels/timer.py
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    Calculate the next multiple of `interval` after `scheduled_tick_time`.

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    # We need to resync (align) the next tick time to the current time
    drift = now - scheduled_tick_time
    delta_to_next_tick = interval - (drift % interval)
    return now + delta_to_next_tick

frequenz.channels.timer.Timer ¤

Bases: Receiver[timedelta]

A receiver that sends a message regularly.

Timers are started by default after they are created. This can be disabled by using auto_start=False option when creating them. In this case, the timer will not be started until reset() is called. Receiving from the timer (either using receive() or using the async iterator interface) will also start the timer at that point.

Timers need to be created in a context where a asyncio loop is already running. If no loop is specified, the current running loop is used.

Timers can be stopped by calling stop(). A stopped timer will raise a ReceiverStoppedError or stop the async iteration as usual.

Once a timer is explicitly stopped, it can only be started again by explicitly calling reset() (trying to receive from it or using the async iterator interface will keep failing).

Timer messages are timedeltas containing the drift of the timer, i.e. the difference between when the timer should have triggered and the time when it actually triggered.

This drift will likely never be 0, because if there is a task that is running when it should trigger, the timer will be delayed. In this case the drift will be positive. A negative drift should be technically impossible, as the timer uses asyncios loop monotonic clock.

Warning

Even when the asyncio loop's monotonic clock is a float, timers use ints to represent time internally. The internal time is tracked in microseconds, so the timer resolution is 1 microsecond (interval must be at least 1 microsecond).

This is to avoid floating point errors when performing calculations with time, which can lead to very hard to reproduce, and debug, issues.

If the timer is delayed too much, then it will behave according to the missed_tick_policy. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy.

For the most common cases, a specialized constructor is provided:

Source code in src/frequenz/channels/timer.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
class Timer(Receiver[timedelta]):
    """A receiver that sends a message regularly.

    [`Timer`][frequenz.channels.timer.Timer]s are started by default after they are
    created. This can be disabled by using `auto_start=False` option when creating
    them. In this case, the timer will not be started until
    [`reset()`][frequenz.channels.timer.Timer.reset] is called. Receiving from the timer
    (either using [`receive()`][frequenz.channels.timer.Timer.receive] or using the
    async iterator interface) will also start the timer at that point.

    Timers need to be created in a context where
    a [`asyncio`][] loop is already running. If no
    [`loop`][frequenz.channels.timer.Timer.loop] is specified, the current running loop
    is used.

    Timers can be stopped by calling [`stop()`][frequenz.channels.timer.Timer.stop].
    A stopped timer will raise
    a [`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] or stop the async
    iteration as usual.

    Once a timer is explicitly stopped, it can only be started again by explicitly
    calling [`reset()`][frequenz.channels.timer.Timer.reset] (trying to receive from it
    or using the async iterator interface will keep failing).

    Timer messages are [`timedelta`][datetime.timedelta]s containing the drift of the
    timer, i.e. the difference between when the timer should have triggered and the time
    when it actually triggered.

    This drift will likely never be `0`, because if there is a task that is
    running when it should trigger, the timer will be delayed. In this case the
    drift will be positive. A negative drift should be technically impossible,
    as the timer uses [`asyncio`][]s loop monotonic clock.

    Warning:
        Even when the [`asyncio`][] loop's monotonic clock is a [`float`][], timers use
        `int`s to represent time internally. The internal time is tracked in
        microseconds, so the timer resolution is 1 microsecond
        ([`interval`][frequenz.channels.timer.Timer.interval] must be at least
         1 microsecond).

        This is to avoid floating point errors when performing calculations with time,
        which can lead to very hard to reproduce, and debug, issues.

    If the timer is delayed too much, then it will behave according to the
    [`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing
    ticks might or might not trigger a message and the drift could be accumulated or not
    depending on the chosen policy.

    For the most common cases, a specialized constructor is provided:

    * [`periodic()`][frequenz.channels.timer.Timer.periodic]:
        {{docstring_summary("frequenz.channels.timer.Timer.periodic")}}

    * [`timeout()`][frequenz.channels.timer.Timer.timeout]:
        {{docstring_summary("frequenz.channels.timer.Timer.timeout")}}
    """

    def __init__(  # pylint: disable=too-many-arguments
        self,
        interval: timedelta,
        missed_tick_policy: MissedTickPolicy,
        /,
        *,
        auto_start: bool = True,
        start_delay: timedelta = timedelta(0),
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """Create an instance.

        See the class documentation for details.

        Args:
            interval: The time between timer ticks. Must be at least
                1 microsecond.
            missed_tick_policy: The policy of the timer when it misses a tick.
                Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift`
                or a custom class deriving from `MissedTickPolicy`. See the
                documentation of the each class for more details.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            start_delay: The delay before the timer should start. If `auto_start` is
                `False`, an exception is raised. This has microseconds resolution,
                anything smaller than a microsecond means no delay.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond; if `start_delay` is negative or `start_delay` was specified
                but `auto_start` is `False`.
        """
        if interval < timedelta(microseconds=1):
            raise ValueError(
                f"The `interval` must be positive and at least 1 microsecond, not {interval}"
            )

        if start_delay > timedelta(0) and auto_start is False:
            raise ValueError(
                "`auto_start` must be `True` if a `start_delay` is specified"
            )

        self._interval: int = _to_microseconds(interval)
        """The time to between timer ticks."""

        self._missed_tick_policy: MissedTickPolicy = missed_tick_policy
        """The policy of the timer when it misses a tick.

        See the documentation of `MissedTickPolicy` for details.
        """

        self._loop: asyncio.AbstractEventLoop = (
            loop if loop is not None else asyncio.get_running_loop()
        )
        """The event loop to use to track time."""

        self._stopped: bool = True
        """Whether the timer was requested to stop.

        If this is `False`, then the timer is running.

        If this is `True`, then it is stopped or there is a request to stop it
        or it was not started yet:

        * If `_next_msg_time` is `None`, it means it wasn't started yet (it was
          created with `auto_start=False`).  Any receiving method will start
          it by calling `reset()` in this case.

        * If `_next_msg_time` is not `None`, it means there was a request to
          stop it.  In this case receiving methods will raise
          a `ReceiverClosedError`.
        """

        self._next_tick_time: int | None = None
        """The absolute (monotonic) time when the timer should trigger.

        If this is `None`, it means the timer didn't start yet, but it should
        be started as soon as it is used.
        """

        self._current_drift: timedelta | None = None
        """The difference between `_next_msg_time` and the triggered time.

        This is calculated by `ready()` but is returned by `consume()`. If
        `None` it means `ready()` wasn't called and `consume()` will assert.
        `consume()` will set it back to `None` to tell `ready()` that it needs
        to wait again.
        """

        if auto_start:
            self.reset(start_delay=start_delay)

    # We need a noqa here because the docs have a Raises section but the documented
    # exceptions are raised indirectly.
    @classmethod
    def timeout(  # noqa: DOC502
        cls,
        delay: timedelta,
        /,
        *,
        auto_start: bool = True,
        start_delay: timedelta = timedelta(0),
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> Timer:
        """Create a timer useful for tracking timeouts.

        A [timeout][frequenz.channels.timer.Timer.timeout] is
        a [`Timer`][frequenz.channels.timer.Timer] that
        [resets][frequenz.channels.timer.Timer.reset] automatically after it triggers,
        so it will trigger again after the selected interval, no matter what the current
        drift was. This means timeout timers will accumulate drift.

        Tip:
            Timeouts are a shortcut to create
            a [`Timer`][frequenz.channels.timer.Timer] with the
            [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy.

        Example: Timeout example
            ```python
            import asyncio
            from datetime import timedelta

            from frequenz.channels import Anycast, select, selected_from
            from frequenz.channels.timer import Timer


            async def main() -> None:
                channel = Anycast[int](name="data-channel")
                data_receiver = channel.new_receiver()

                timer = Timer.timeout(timedelta(seconds=1.0))

                async for selected in select(data_receiver, timer):
                    if selected_from(selected, data_receiver):
                        print(f"Received data: {selected.value}")
                    elif selected_from(selected, timer):
                        drift = selected.value
                        print(f"No data received for {timer.interval + drift} seconds, giving up")
                        break


            asyncio.run(main())
            ```

        Args:
            delay: The time until the timer ticks. Must be at least
                1 microsecond.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            start_delay: The delay before the timer should start. If `auto_start` is
                `False`, an exception is raised. This has microseconds resolution,
                anything smaller than a microsecond means no delay.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Returns:
            The timer instance.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond; if `start_delay` is negative or `start_delay` was specified
                but `auto_start` is `False`.
        """
        return Timer(
            delay,
            SkipMissedAndDrift(delay_tolerance=timedelta(0)),
            auto_start=auto_start,
            start_delay=start_delay,
            loop=loop,
        )

    # We need a noqa here because the docs have a Raises section but the documented
    # exceptions are raised indirectly.
    @classmethod
    def periodic(  # noqa: DOC502 pylint: disable=too-many-arguments
        cls,
        period: timedelta,
        /,
        *,
        skip_missed_ticks: bool = False,
        auto_start: bool = True,
        start_delay: timedelta = timedelta(0),
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> Timer:
        """Create a periodic timer.

        A [periodic timer][frequenz.channels.timer.Timer.periodic] is
        a [`Timer`][frequenz.channels.timer.Timer] that tries as hard as possible to
        trigger at regular intervals. This means that if the timer is delayed for any
        reason, it will trigger immediately and then try to catch up with the original
        schedule.

        Optionally, a periodic timer can be configured to skip missed ticks and re-sync
        with the original schedule (`skip_missed_ticks` argument). This could be useful
        if you want the timer is as periodic as possible but if there are big delays you
        don't end up with big bursts.

        Tip:
            Periodic timers are a shortcut to create
            a [`Timer`][frequenz.channels.timer.Timer] with either the
            [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy (when
            `skip_missed_ticks` is `False`) or
            [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]
            otherwise.

        Example:
            ```python
            import asyncio
            from datetime import datetime, timedelta

            from frequenz.channels.timer import Timer


            async def main() -> None:
                async for drift in Timer.periodic(timedelta(seconds=1.0)):
                    print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")


            asyncio.run(main())
            ```

        Args:
            period: The time between timer ticks. Must be at least
                1 microsecond.
            skip_missed_ticks: Whether to skip missed ticks or trigger them
                all until it catches up.
            auto_start: Whether the timer should be started when the
                instance is created. This can only be `True` if there is
                already a running loop or an explicit `loop` that is running
                was passed.
            start_delay: The delay before the timer should start. If `auto_start` is
                `False`, an exception is raised. This has microseconds resolution,
                anything smaller than a microsecond means no delay.
            loop: The event loop to use to track time. If `None`,
                `asyncio.get_running_loop()` will be used.

        Returns:
            The timer instance.

        Raises:
            RuntimeError: if it was called without a loop and there is no
                running loop.
            ValueError: if `interval` is not positive or is smaller than 1
                microsecond; if `start_delay` is negative or `start_delay` was specified
                but `auto_start` is `False`.
        """
        missed_tick_policy = (
            SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed()
        )
        return Timer(
            period,
            missed_tick_policy,
            auto_start=auto_start,
            start_delay=start_delay,
            loop=loop,
        )

    @property
    def interval(self) -> timedelta:
        """The interval between timer ticks.

        Returns:
            The interval between timer ticks.
        """
        return timedelta(microseconds=self._interval)

    @property
    def missed_tick_policy(self) -> MissedTickPolicy:
        """The policy of the timer when it misses a tick.

        Returns:
            The policy of the timer when it misses a tick.
        """
        return self._missed_tick_policy

    @property
    def loop(self) -> asyncio.AbstractEventLoop:
        """The event loop used by the timer to track time.

        Returns:
            The event loop used by the timer to track time.
        """
        return self._loop

    @property
    def is_running(self) -> bool:
        """Whether the timer is running.

        This will be `False` if the timer was stopped, or not started yet.

        Returns:
            Whether the timer is running.
        """
        return not self._stopped

    def reset(self, *, start_delay: timedelta = timedelta(0)) -> None:
        """Reset the timer to start timing from now (plus an optional delay).

        If the timer was stopped, or not started yet, it will be started.

        This can only be called with a running loop, see the class documentation for
        more details.

        Args:
            start_delay: The delay before the timer should start. This has microseconds
                resolution, anything smaller than a microsecond means no delay.

        Raises:
            RuntimeError: if it was called without a running loop.
            ValueError: if `start_delay` is negative.
        """
        start_delay_ms = _to_microseconds(start_delay)

        if start_delay_ms < 0:
            raise ValueError(f"`start_delay` can't be negative, got {start_delay}")
        self._stopped = False
        self._next_tick_time = self._now() + start_delay_ms + self._interval
        self._current_drift = None

    def stop(self) -> None:
        """Stop the timer.

        Once `stop` has been called, all subsequent calls to `ready()` will
        immediately return False and calls to `consume()` / `receive()` or any
        use of the async iterator interface will raise
        a `ReceiverStoppedError`.

        You can restart the timer with `reset()`.
        """
        self._stopped = True
        # We need to make sure it's not None, otherwise `ready()` will start it
        self._next_tick_time = self._now()

    # We need a noqa here because the docs have a Raises section but the documented
    # exceptions are raised indirectly.
    async def ready(self) -> bool:  # noqa: DOC502
        """Wait until the timer `interval` passed.

        Once a call to `ready()` has finished, the resulting tick information
        must be read with a call to `consume()` (`receive()` or iterated over)
        to tell the timer it should wait for the next interval.

        The timer will remain ready (this method will return immediately)
        until it is consumed.

        Returns:
            Whether the timer was started and it is still running.

        Raises:
            RuntimeError: if it was called without a running loop.
        """
        # If there are messages waiting to be consumed, return immediately.
        if self._current_drift is not None:
            return True

        # If `_next_tick_time` is `None`, it means it was created with
        # `auto_start=False` and should be started.
        if self._next_tick_time is None:
            self.reset()
            assert (
                self._next_tick_time is not None
            ), "This should be assigned by reset()"

        # If a stop was explicitly requested, we bail out.
        if self._stopped:
            return False

        now = self._now()
        time_to_next_tick = self._next_tick_time - now

        # If we didn't reach the tick yet, sleep until we do.
        # We need to do this in a loop also reacting to the reset event, as the timer
        # could be reset while we are sleeping, in which case we need to recalculated
        # the time to the next tick and try again.
        while time_to_next_tick > 0:
            await asyncio.sleep(time_to_next_tick / 1_000_000)
            now = self._now()
            time_to_next_tick = self._next_tick_time - now

        # If a stop was explicitly requested during the sleep, we bail out.
        if self._stopped:
            return False

        self._current_drift = timedelta(microseconds=now - self._next_tick_time)
        self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time(
            now=now,
            scheduled_tick_time=self._next_tick_time,
            interval=self._interval,
        )

        return True

    def consume(self) -> timedelta:
        """Return the latest drift once `ready()` is complete.

        Once the timer has triggered (`ready()` is done), this method returns the
        difference between when the timer should have triggered and the time when
        it actually triggered. See the class documentation for more details.

        Returns:
            The difference between when the timer should have triggered and the
                time when it actually did.

        Raises:
            ReceiverStoppedError: if the timer was stopped via `stop()`.
        """
        # If it was stopped and there it no pending result, we raise
        # (if there is a pending result, then we still want to return it first)
        if self._stopped and self._current_drift is None:
            raise ReceiverStoppedError(self)

        assert (
            self._current_drift is not None
        ), "calls to `consume()` must be follow a call to `ready()`"
        drift = self._current_drift
        self._current_drift = None
        return drift

    def _now(self) -> int:
        """Return the current monotonic clock time in microseconds.

        Returns:
            The current monotonic clock time in microseconds.
        """
        return _to_microseconds(self._loop.time())

    def __str__(self) -> str:
        """Return a string representation of the timer.

        Returns:
            The string representation of the timer.
        """
        return f"{type(self).__name__}({self.interval})"

    def __repr__(self) -> str:
        """Return a string representation of the timer.

        Returns:
            The string representation of the timer.
        """
        return (
            f"{type(self).__name__}<{self.interval=}, {self.missed_tick_policy=}, "
            f"{self.loop=}, {self.is_running=}>"
        )
Attributes¤
interval property ¤
interval: timedelta

The interval between timer ticks.

RETURNS DESCRIPTION
timedelta

The interval between timer ticks.

is_running property ¤
is_running: bool

Whether the timer is running.

This will be False if the timer was stopped, or not started yet.

RETURNS DESCRIPTION
bool

Whether the timer is running.

loop property ¤
loop: asyncio.AbstractEventLoop

The event loop used by the timer to track time.

RETURNS DESCRIPTION
AbstractEventLoop

The event loop used by the timer to track time.

missed_tick_policy property ¤
missed_tick_policy: MissedTickPolicy

The policy of the timer when it misses a tick.

RETURNS DESCRIPTION
MissedTickPolicy

The policy of the timer when it misses a tick.

Functions¤
__aiter__ ¤
__aiter__() -> Self

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Self

self, since no extra setup is needed for the iterator.

Source code in src/frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__ async ¤
__anext__() -> _T

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
_T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def __anext__(self) -> _T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__init__ ¤
__init__(
    interval: timedelta,
    missed_tick_policy: MissedTickPolicy,
    /,
    *,
    auto_start: bool = True,
    start_delay: timedelta = timedelta(0),
    loop: asyncio.AbstractEventLoop | None = None,
) -> None

Create an instance.

See the class documentation for details.

PARAMETER DESCRIPTION
interval

The time between timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

missed_tick_policy

The policy of the timer when it misses a tick. Commonly one of TriggerAllMissed, SkipMissedAndResync, SkipMissedAndDrift or a custom class deriving from MissedTickPolicy. See the documentation of the each class for more details.

TYPE: MissedTickPolicy

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

start_delay

The delay before the timer should start. If auto_start is False, an exception is raised. This has microseconds resolution, anything smaller than a microsecond means no delay.

TYPE: timedelta DEFAULT: timedelta(0)

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: AbstractEventLoop | None DEFAULT: None

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond; if start_delay is negative or start_delay was specified but auto_start is False.

Source code in src/frequenz/channels/timer.py
def __init__(  # pylint: disable=too-many-arguments
    self,
    interval: timedelta,
    missed_tick_policy: MissedTickPolicy,
    /,
    *,
    auto_start: bool = True,
    start_delay: timedelta = timedelta(0),
    loop: asyncio.AbstractEventLoop | None = None,
) -> None:
    """Create an instance.

    See the class documentation for details.

    Args:
        interval: The time between timer ticks. Must be at least
            1 microsecond.
        missed_tick_policy: The policy of the timer when it misses a tick.
            Commonly one of `TriggerAllMissed`, `SkipMissedAndResync`, `SkipMissedAndDrift`
            or a custom class deriving from `MissedTickPolicy`. See the
            documentation of the each class for more details.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        start_delay: The delay before the timer should start. If `auto_start` is
            `False`, an exception is raised. This has microseconds resolution,
            anything smaller than a microsecond means no delay.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond; if `start_delay` is negative or `start_delay` was specified
            but `auto_start` is `False`.
    """
    if interval < timedelta(microseconds=1):
        raise ValueError(
            f"The `interval` must be positive and at least 1 microsecond, not {interval}"
        )

    if start_delay > timedelta(0) and auto_start is False:
        raise ValueError(
            "`auto_start` must be `True` if a `start_delay` is specified"
        )

    self._interval: int = _to_microseconds(interval)
    """The time to between timer ticks."""

    self._missed_tick_policy: MissedTickPolicy = missed_tick_policy
    """The policy of the timer when it misses a tick.

    See the documentation of `MissedTickPolicy` for details.
    """

    self._loop: asyncio.AbstractEventLoop = (
        loop if loop is not None else asyncio.get_running_loop()
    )
    """The event loop to use to track time."""

    self._stopped: bool = True
    """Whether the timer was requested to stop.

    If this is `False`, then the timer is running.

    If this is `True`, then it is stopped or there is a request to stop it
    or it was not started yet:

    * If `_next_msg_time` is `None`, it means it wasn't started yet (it was
      created with `auto_start=False`).  Any receiving method will start
      it by calling `reset()` in this case.

    * If `_next_msg_time` is not `None`, it means there was a request to
      stop it.  In this case receiving methods will raise
      a `ReceiverClosedError`.
    """

    self._next_tick_time: int | None = None
    """The absolute (monotonic) time when the timer should trigger.

    If this is `None`, it means the timer didn't start yet, but it should
    be started as soon as it is used.
    """

    self._current_drift: timedelta | None = None
    """The difference between `_next_msg_time` and the triggered time.

    This is calculated by `ready()` but is returned by `consume()`. If
    `None` it means `ready()` wasn't called and `consume()` will assert.
    `consume()` will set it back to `None` to tell `ready()` that it needs
    to wait again.
    """

    if auto_start:
        self.reset(start_delay=start_delay)
__repr__ ¤
__repr__() -> str

Return a string representation of the timer.

RETURNS DESCRIPTION
str

The string representation of the timer.

Source code in src/frequenz/channels/timer.py
def __repr__(self) -> str:
    """Return a string representation of the timer.

    Returns:
        The string representation of the timer.
    """
    return (
        f"{type(self).__name__}<{self.interval=}, {self.missed_tick_policy=}, "
        f"{self.loop=}, {self.is_running=}>"
    )
__str__ ¤
__str__() -> str

Return a string representation of the timer.

RETURNS DESCRIPTION
str

The string representation of the timer.

Source code in src/frequenz/channels/timer.py
def __str__(self) -> str:
    """Return a string representation of the timer.

    Returns:
        The string representation of the timer.
    """
    return f"{type(self).__name__}({self.interval})"
consume ¤
consume() -> timedelta

Return the latest drift once ready() is complete.

Once the timer has triggered (ready() is done), this method returns the difference between when the timer should have triggered and the time when it actually triggered. See the class documentation for more details.

RETURNS DESCRIPTION
timedelta

The difference between when the timer should have triggered and the time when it actually did.

RAISES DESCRIPTION
ReceiverStoppedError

if the timer was stopped via stop().

Source code in src/frequenz/channels/timer.py
def consume(self) -> timedelta:
    """Return the latest drift once `ready()` is complete.

    Once the timer has triggered (`ready()` is done), this method returns the
    difference between when the timer should have triggered and the time when
    it actually triggered. See the class documentation for more details.

    Returns:
        The difference between when the timer should have triggered and the
            time when it actually did.

    Raises:
        ReceiverStoppedError: if the timer was stopped via `stop()`.
    """
    # If it was stopped and there it no pending result, we raise
    # (if there is a pending result, then we still want to return it first)
    if self._stopped and self._current_drift is None:
        raise ReceiverStoppedError(self)

    assert (
        self._current_drift is not None
    ), "calls to `consume()` must be follow a call to `ready()`"
    drift = self._current_drift
    self._current_drift = None
    return drift
map ¤
map(call: Callable[[_T], _U]) -> Receiver[_U]

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[_T], _U]

RETURNS DESCRIPTION
Receiver[_U]

A Receiver to read results of the given function from.

Source code in src/frequenz/channels/_receiver.py
def map(self, call: Callable[[_T], _U]) -> Receiver[_U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
periodic classmethod ¤
periodic(
    period: timedelta,
    /,
    *,
    skip_missed_ticks: bool = False,
    auto_start: bool = True,
    start_delay: timedelta = timedelta(0),
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer

Create a periodic timer.

A periodic timer is a Timer that tries as hard as possible to trigger at regular intervals. This means that if the timer is delayed for any reason, it will trigger immediately and then try to catch up with the original schedule.

Optionally, a periodic timer can be configured to skip missed ticks and re-sync with the original schedule (skip_missed_ticks argument). This could be useful if you want the timer is as periodic as possible but if there are big delays you don't end up with big bursts.

Tip

Periodic timers are a shortcut to create a Timer with either the TriggerAllMissed policy (when skip_missed_ticks is False) or SkipMissedAndResync otherwise.

Example
import asyncio
from datetime import datetime, timedelta

from frequenz.channels.timer import Timer


async def main() -> None:
    async for drift in Timer.periodic(timedelta(seconds=1.0)):
        print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")


asyncio.run(main())
PARAMETER DESCRIPTION
period

The time between timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

skip_missed_ticks

Whether to skip missed ticks or trigger them all until it catches up.

TYPE: bool DEFAULT: False

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

start_delay

The delay before the timer should start. If auto_start is False, an exception is raised. This has microseconds resolution, anything smaller than a microsecond means no delay.

TYPE: timedelta DEFAULT: timedelta(0)

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: AbstractEventLoop | None DEFAULT: None

RETURNS DESCRIPTION
Timer

The timer instance.

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond; if start_delay is negative or start_delay was specified but auto_start is False.

Source code in src/frequenz/channels/timer.py
@classmethod
def periodic(  # noqa: DOC502 pylint: disable=too-many-arguments
    cls,
    period: timedelta,
    /,
    *,
    skip_missed_ticks: bool = False,
    auto_start: bool = True,
    start_delay: timedelta = timedelta(0),
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer:
    """Create a periodic timer.

    A [periodic timer][frequenz.channels.timer.Timer.periodic] is
    a [`Timer`][frequenz.channels.timer.Timer] that tries as hard as possible to
    trigger at regular intervals. This means that if the timer is delayed for any
    reason, it will trigger immediately and then try to catch up with the original
    schedule.

    Optionally, a periodic timer can be configured to skip missed ticks and re-sync
    with the original schedule (`skip_missed_ticks` argument). This could be useful
    if you want the timer is as periodic as possible but if there are big delays you
    don't end up with big bursts.

    Tip:
        Periodic timers are a shortcut to create
        a [`Timer`][frequenz.channels.timer.Timer] with either the
        [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy (when
        `skip_missed_ticks` is `False`) or
        [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]
        otherwise.

    Example:
        ```python
        import asyncio
        from datetime import datetime, timedelta

        from frequenz.channels.timer import Timer


        async def main() -> None:
            async for drift in Timer.periodic(timedelta(seconds=1.0)):
                print(f"The timer has triggered at {datetime.now()} with a drift of {drift}")


        asyncio.run(main())
        ```

    Args:
        period: The time between timer ticks. Must be at least
            1 microsecond.
        skip_missed_ticks: Whether to skip missed ticks or trigger them
            all until it catches up.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        start_delay: The delay before the timer should start. If `auto_start` is
            `False`, an exception is raised. This has microseconds resolution,
            anything smaller than a microsecond means no delay.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Returns:
        The timer instance.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond; if `start_delay` is negative or `start_delay` was specified
            but `auto_start` is `False`.
    """
    missed_tick_policy = (
        SkipMissedAndResync() if skip_missed_ticks else TriggerAllMissed()
    )
    return Timer(
        period,
        missed_tick_policy,
        auto_start=auto_start,
        start_delay=start_delay,
        loop=loop,
    )
ready async ¤
ready() -> bool

Wait until the timer interval passed.

Once a call to ready() has finished, the resulting tick information must be read with a call to consume() (receive() or iterated over) to tell the timer it should wait for the next interval.

The timer will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the timer was started and it is still running.

RAISES DESCRIPTION
RuntimeError

if it was called without a running loop.

Source code in src/frequenz/channels/timer.py
async def ready(self) -> bool:  # noqa: DOC502
    """Wait until the timer `interval` passed.

    Once a call to `ready()` has finished, the resulting tick information
    must be read with a call to `consume()` (`receive()` or iterated over)
    to tell the timer it should wait for the next interval.

    The timer will remain ready (this method will return immediately)
    until it is consumed.

    Returns:
        Whether the timer was started and it is still running.

    Raises:
        RuntimeError: if it was called without a running loop.
    """
    # If there are messages waiting to be consumed, return immediately.
    if self._current_drift is not None:
        return True

    # If `_next_tick_time` is `None`, it means it was created with
    # `auto_start=False` and should be started.
    if self._next_tick_time is None:
        self.reset()
        assert (
            self._next_tick_time is not None
        ), "This should be assigned by reset()"

    # If a stop was explicitly requested, we bail out.
    if self._stopped:
        return False

    now = self._now()
    time_to_next_tick = self._next_tick_time - now

    # If we didn't reach the tick yet, sleep until we do.
    # We need to do this in a loop also reacting to the reset event, as the timer
    # could be reset while we are sleeping, in which case we need to recalculated
    # the time to the next tick and try again.
    while time_to_next_tick > 0:
        await asyncio.sleep(time_to_next_tick / 1_000_000)
        now = self._now()
        time_to_next_tick = self._next_tick_time - now

    # If a stop was explicitly requested during the sleep, we bail out.
    if self._stopped:
        return False

    self._current_drift = timedelta(microseconds=now - self._next_tick_time)
    self._next_tick_time = self._missed_tick_policy.calculate_next_tick_time(
        now=now,
        scheduled_tick_time=self._next_tick_time,
        interval=self._interval,
    )

    return True
receive async ¤
receive() -> _T

Receive a message from the channel.

RETURNS DESCRIPTION
_T

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def receive(self) -> _T:
    """Receive a message from the channel.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received
reset ¤
reset(*, start_delay: timedelta = timedelta(0)) -> None

Reset the timer to start timing from now (plus an optional delay).

If the timer was stopped, or not started yet, it will be started.

This can only be called with a running loop, see the class documentation for more details.

PARAMETER DESCRIPTION
start_delay

The delay before the timer should start. This has microseconds resolution, anything smaller than a microsecond means no delay.

TYPE: timedelta DEFAULT: timedelta(0)

RAISES DESCRIPTION
RuntimeError

if it was called without a running loop.

ValueError

if start_delay is negative.

Source code in src/frequenz/channels/timer.py
def reset(self, *, start_delay: timedelta = timedelta(0)) -> None:
    """Reset the timer to start timing from now (plus an optional delay).

    If the timer was stopped, or not started yet, it will be started.

    This can only be called with a running loop, see the class documentation for
    more details.

    Args:
        start_delay: The delay before the timer should start. This has microseconds
            resolution, anything smaller than a microsecond means no delay.

    Raises:
        RuntimeError: if it was called without a running loop.
        ValueError: if `start_delay` is negative.
    """
    start_delay_ms = _to_microseconds(start_delay)

    if start_delay_ms < 0:
        raise ValueError(f"`start_delay` can't be negative, got {start_delay}")
    self._stopped = False
    self._next_tick_time = self._now() + start_delay_ms + self._interval
    self._current_drift = None
stop ¤
stop() -> None

Stop the timer.

Once stop has been called, all subsequent calls to ready() will immediately return False and calls to consume() / receive() or any use of the async iterator interface will raise a ReceiverStoppedError.

You can restart the timer with reset().

Source code in src/frequenz/channels/timer.py
def stop(self) -> None:
    """Stop the timer.

    Once `stop` has been called, all subsequent calls to `ready()` will
    immediately return False and calls to `consume()` / `receive()` or any
    use of the async iterator interface will raise
    a `ReceiverStoppedError`.

    You can restart the timer with `reset()`.
    """
    self._stopped = True
    # We need to make sure it's not None, otherwise `ready()` will start it
    self._next_tick_time = self._now()
timeout classmethod ¤
timeout(
    delay: timedelta,
    /,
    *,
    auto_start: bool = True,
    start_delay: timedelta = timedelta(0),
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer

Create a timer useful for tracking timeouts.

A timeout is a Timer that resets automatically after it triggers, so it will trigger again after the selected interval, no matter what the current drift was. This means timeout timers will accumulate drift.

Tip

Timeouts are a shortcut to create a Timer with the SkipMissedAndDrift policy.

Timeout example
import asyncio
from datetime import timedelta

from frequenz.channels import Anycast, select, selected_from
from frequenz.channels.timer import Timer


async def main() -> None:
    channel = Anycast[int](name="data-channel")
    data_receiver = channel.new_receiver()

    timer = Timer.timeout(timedelta(seconds=1.0))

    async for selected in select(data_receiver, timer):
        if selected_from(selected, data_receiver):
            print(f"Received data: {selected.value}")
        elif selected_from(selected, timer):
            drift = selected.value
            print(f"No data received for {timer.interval + drift} seconds, giving up")
            break


asyncio.run(main())
PARAMETER DESCRIPTION
delay

The time until the timer ticks. Must be at least 1 microsecond.

TYPE: timedelta

auto_start

Whether the timer should be started when the instance is created. This can only be True if there is already a running loop or an explicit loop that is running was passed.

TYPE: bool DEFAULT: True

start_delay

The delay before the timer should start. If auto_start is False, an exception is raised. This has microseconds resolution, anything smaller than a microsecond means no delay.

TYPE: timedelta DEFAULT: timedelta(0)

loop

The event loop to use to track time. If None, asyncio.get_running_loop() will be used.

TYPE: AbstractEventLoop | None DEFAULT: None

RETURNS DESCRIPTION
Timer

The timer instance.

RAISES DESCRIPTION
RuntimeError

if it was called without a loop and there is no running loop.

ValueError

if interval is not positive or is smaller than 1 microsecond; if start_delay is negative or start_delay was specified but auto_start is False.

Source code in src/frequenz/channels/timer.py
@classmethod
def timeout(  # noqa: DOC502
    cls,
    delay: timedelta,
    /,
    *,
    auto_start: bool = True,
    start_delay: timedelta = timedelta(0),
    loop: asyncio.AbstractEventLoop | None = None,
) -> Timer:
    """Create a timer useful for tracking timeouts.

    A [timeout][frequenz.channels.timer.Timer.timeout] is
    a [`Timer`][frequenz.channels.timer.Timer] that
    [resets][frequenz.channels.timer.Timer.reset] automatically after it triggers,
    so it will trigger again after the selected interval, no matter what the current
    drift was. This means timeout timers will accumulate drift.

    Tip:
        Timeouts are a shortcut to create
        a [`Timer`][frequenz.channels.timer.Timer] with the
        [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy.

    Example: Timeout example
        ```python
        import asyncio
        from datetime import timedelta

        from frequenz.channels import Anycast, select, selected_from
        from frequenz.channels.timer import Timer


        async def main() -> None:
            channel = Anycast[int](name="data-channel")
            data_receiver = channel.new_receiver()

            timer = Timer.timeout(timedelta(seconds=1.0))

            async for selected in select(data_receiver, timer):
                if selected_from(selected, data_receiver):
                    print(f"Received data: {selected.value}")
                elif selected_from(selected, timer):
                    drift = selected.value
                    print(f"No data received for {timer.interval + drift} seconds, giving up")
                    break


        asyncio.run(main())
        ```

    Args:
        delay: The time until the timer ticks. Must be at least
            1 microsecond.
        auto_start: Whether the timer should be started when the
            instance is created. This can only be `True` if there is
            already a running loop or an explicit `loop` that is running
            was passed.
        start_delay: The delay before the timer should start. If `auto_start` is
            `False`, an exception is raised. This has microseconds resolution,
            anything smaller than a microsecond means no delay.
        loop: The event loop to use to track time. If `None`,
            `asyncio.get_running_loop()` will be used.

    Returns:
        The timer instance.

    Raises:
        RuntimeError: if it was called without a loop and there is no
            running loop.
        ValueError: if `interval` is not positive or is smaller than 1
            microsecond; if `start_delay` is negative or `start_delay` was specified
            but `auto_start` is `False`.
    """
    return Timer(
        delay,
        SkipMissedAndDrift(delay_tolerance=timedelta(0)),
        auto_start=auto_start,
        start_delay=start_delay,
        loop=loop,
    )

frequenz.channels.timer.TriggerAllMissed ¤

Bases: MissedTickPolicy

A policy that triggers all the missed ticks immediately until it catches up.

The TriggerAllMissed policy will trigger all missed ticks immediately until it catches up with the current time. This means that if the timer is delayed for any reason, when it finally gets some time to run, it will trigger all the missed ticks in a burst. The drift is not accumulated and the next tick will be scheduled according to the original start time.

Example

This example represents a timer with interval 1 second.

  1. The first tick, T0 happens exactly at time 0.

  2. The second tick, T1, happens at time 1.2 (0.2 seconds late), so it triggers immediately. But it re-syncs, so the next tick is still expected at 2 seconds. This re-sync happens on every tick, so all ticks are expected at multiples of 1 second, not matter how delayed they were.

  3. The third tick, T2, happens at time 2.3 (0.3 seconds late), so it also triggers immediately.

  4. The fourth tick, T3, happens at time 4.3 (1.3 seconds late), so it also triggers immediately.

  5. The fifth tick, T4, which was also already delayed (by 0.3 seconds), triggers immediately too, resulting in a small catch-up burst.

  6. The sixth tick, T5, happens at 5.1 (0.1 seconds late), so it triggers immediately again.

  7. The seventh tick, T6, happens at 6.0, right on time.

0 1 2 3 4 T4 5 6 T0 T1 T2 T3 T5 T6 time Expected ticks Delivered ticks

Source code in src/frequenz/channels/timer.py
class TriggerAllMissed(MissedTickPolicy):
    """A policy that triggers all the missed ticks immediately until it catches up.

    The [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] policy will
    trigger all missed ticks immediately until it catches up with the current time.
    This means that if the timer is delayed for any reason, when it finally gets some
    time to run, it will trigger all the missed ticks in a burst. The drift is not
    accumulated and the next tick will be scheduled according to the original start
    time.

    Example:
        This example represents a timer with interval 1 second.

        1. The first tick, `T0` happens exactly at time 0.

        2. The second tick, `T1`, happens at time 1.2 (0.2 seconds late), so it triggers
            immediately. But it re-syncs, so the next tick is still expected at
            2 seconds. This re-sync happens on every tick, so all ticks are expected at
            multiples of 1 second, not matter how delayed they were.

        3. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also
            triggers immediately.

        4. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
            triggers immediately.

        5. The fifth tick, `T4`, which was also already delayed (by 0.3 seconds),
            triggers immediately too, resulting in a small *catch-up* burst.

        6. The sixth tick, `T5`, happens at 5.1 (0.1 seconds late), so it triggers
            immediately again.

        7. The seventh tick, `T6`, happens at 6.0, right on time.

        <center>
        ```bob
        0         1         2         3         4   T4    5         6
        *---------o-*-------o--*------o---------o--**-----o*--------*-----> time
        T0          T1         T2                  T3      T5       T6

        -o- "Expected ticks"
        -*- "Delivered ticks"
        ```
        </center>
    """

    def calculate_next_tick_time(
        self, *, now: int, scheduled_tick_time: int, interval: int
    ) -> int:
        """Calculate the next tick time.

        This method always returns `scheduled_tick_time + interval`, as all
        ticks need to produce a trigger event.

        Args:
            now: The current loop time (in microseconds).
            scheduled_tick_time: The time the current tick was scheduled to
                trigger (in microseconds).
            interval: The interval between ticks (in microseconds).

        Returns:
            The next tick time (in microseconds).
        """
        return scheduled_tick_time + interval
Functions¤
__repr__ ¤
__repr__() -> str

Return a string representation of the instance.

RETURNS DESCRIPTION
str

The string representation of the instance.

Source code in src/frequenz/channels/timer.py
def __repr__(self) -> str:
    """Return a string representation of the instance.

    Returns:
        The string representation of the instance.
    """
    return f"{type(self).__name__}()"
calculate_next_tick_time ¤
calculate_next_tick_time(
    *, now: int, scheduled_tick_time: int, interval: int
) -> int

Calculate the next tick time.

This method always returns scheduled_tick_time + interval, as all ticks need to produce a trigger event.

PARAMETER DESCRIPTION
now

The current loop time (in microseconds).

TYPE: int

scheduled_tick_time

The time the current tick was scheduled to trigger (in microseconds).

TYPE: int

interval

The interval between ticks (in microseconds).

TYPE: int

RETURNS DESCRIPTION
int

The next tick time (in microseconds).

Source code in src/frequenz/channels/timer.py
def calculate_next_tick_time(
    self, *, now: int, scheduled_tick_time: int, interval: int
) -> int:
    """Calculate the next tick time.

    This method always returns `scheduled_tick_time + interval`, as all
    ticks need to produce a trigger event.

    Args:
        now: The current loop time (in microseconds).
        scheduled_tick_time: The time the current tick was scheduled to
            trigger (in microseconds).
        interval: The interval between ticks (in microseconds).

    Returns:
        The next tick time (in microseconds).
    """
    return scheduled_tick_time + interval