Skip to content

file_watcher

frequenz.channels.file_watcher ¤

A receiver for watching for new, modified or deleted files.

Tip

Read the FileWatcher documentation for more information.

This module contains the following:

  • FileWatcher: A receiver that watches for file events.
  • Event: A file change event.
  • EventType: The types of changes that can be observed.

Classes¤

frequenz.channels.file_watcher.Event dataclass ¤

A file change event.

Source code in src/frequenz/channels/file_watcher.py
@dataclass(frozen=True)
class Event:
    """A file change event."""

    type: EventType
    """The type of change that was observed."""

    path: pathlib.Path
    """The path where the change was observed."""
Attributes¤
path instance-attribute ¤
path: pathlib.Path

The path where the change was observed.

type instance-attribute ¤
type: EventType

The type of change that was observed.

frequenz.channels.file_watcher.EventType ¤

Bases: Enum

The types of changes that can be observed.

Source code in src/frequenz/channels/file_watcher.py
class EventType(Enum):
    """The types of changes that can be observed."""

    CREATE = Change.added
    """The file was created."""

    MODIFY = Change.modified
    """The file was modified."""

    DELETE = Change.deleted
    """The file was deleted."""
Attributes¤
CREATE class-attribute instance-attribute ¤
CREATE = Change.added

The file was created.

DELETE class-attribute instance-attribute ¤
DELETE = Change.deleted

The file was deleted.

MODIFY class-attribute instance-attribute ¤
MODIFY = Change.modified

The file was modified.

frequenz.channels.file_watcher.FileWatcher ¤

Bases: Receiver[Event]

A receiver that watches for file events.

Usage¤

A FileWatcher receiver can be used to watch for changes in a set of files. It will generate an Event message every time a file is created, modified or deleted, depending on the type of events that it is configured to watch for.

The event message contains the type of change that was observed and the path where the change was observed.

Event Types¤

The following event types are available:

  • CREATE: The file was created.
  • MODIFY: The file was modified.
  • DELETE: The file was deleted.
Example¤
Watch for changes and exit after the file is modified
import asyncio

from frequenz.channels import select, selected_from
from frequenz.channels.file_watcher import EventType, FileWatcher

PATH = "/tmp/test.txt"
file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])


async def update_file() -> None:
    await asyncio.sleep(1)
    with open(PATH, "w", encoding="utf-8") as file:
        file.write("Hello, world!")


async def main() -> None:
    # Create file
    with open(PATH, "w", encoding="utf-8") as file:
        file.write("Hello, world!")
    async with asyncio.TaskGroup() as group:
        group.create_task(update_file())
        async for selected in select(file_watcher):
            if selected_from(selected, file_watcher):
                event = selected.value
                print(f"File {event.path}: {event.type.name}")
                break


asyncio.run(main())
Source code in src/frequenz/channels/file_watcher.py
class FileWatcher(Receiver[Event]):
    """A receiver that watches for file events.

    # Usage

    A [`FileWatcher`][frequenz.channels.file_watcher.FileWatcher] receiver can be used
    to watch for changes in a set of files. It will generate an
    [`Event`][frequenz.channels.file_watcher.Event] message every time a file is
    created, modified or deleted, depending on the type of events that it is configured
    to watch for.

    The [event][frequenz.channels.file_watcher.EventType] message contains the
    [`type`][frequenz.channels.file_watcher.Event.type] of change that was observed and
    the [`path`][frequenz.channels.file_watcher.Event.path] where the change was
    observed.

    # Event Types

    The following event types are available:

    * [`CREATE`][frequenz.channels.file_watcher.EventType.CREATE]:
        {{docstring_summary("frequenz.channels.file_watcher.EventType.CREATE")}}
    * [`MODIFY`][frequenz.channels.file_watcher.EventType.MODIFY]:
        {{docstring_summary("frequenz.channels.file_watcher.EventType.MODIFY")}}
    * [`DELETE`][frequenz.channels.file_watcher.EventType.DELETE]:
        {{docstring_summary("frequenz.channels.file_watcher.EventType.DELETE")}}

    # Example

    Example: Watch for changes and exit after the file is modified
        ```python
        import asyncio

        from frequenz.channels import select, selected_from
        from frequenz.channels.file_watcher import EventType, FileWatcher

        PATH = "/tmp/test.txt"
        file_watcher = FileWatcher(paths=[PATH], event_types=[EventType.MODIFY])


        async def update_file() -> None:
            await asyncio.sleep(1)
            with open(PATH, "w", encoding="utf-8") as file:
                file.write("Hello, world!")


        async def main() -> None:
            # Create file
            with open(PATH, "w", encoding="utf-8") as file:
                file.write("Hello, world!")
            async with asyncio.TaskGroup() as group:
                group.create_task(update_file())
                async for selected in select(file_watcher):
                    if selected_from(selected, file_watcher):
                        event = selected.value
                        print(f"File {event.path}: {event.type.name}")
                        break


        asyncio.run(main())
        ```
    """

    def __init__(
        self,
        paths: list[pathlib.Path | str],
        event_types: abc.Iterable[EventType] = frozenset(EventType),
    ) -> None:
        """Create a `FileWatcher` instance.

        Args:
            paths: Paths to watch for changes.
            event_types: Types of events to watch for. Defaults to watch for
                all event types.
        """
        self.event_types: frozenset[EventType] = frozenset(event_types)
        """The types of events to watch for."""

        self._stop_event: asyncio.Event = asyncio.Event()
        self._paths: list[pathlib.Path] = [
            path if isinstance(path, pathlib.Path) else pathlib.Path(path)
            for path in paths
        ]
        self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch(
            *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
        )
        self._awatch_stopped_exc: Exception | None = None
        self._changes: set[FileChange] = set()

    def _filter_events(
        self,
        change: Change,
        path: str,  # pylint: disable=unused-argument
    ) -> bool:
        """Filter events based on the event type and path.

        Args:
            change: The type of change to be notified.
            path: The path of the file that changed.

        Returns:
            Whether the event should be notified.
        """
        return change in [event_type.value for event_type in self.event_types]

    def __del__(self) -> None:
        """Cleanup registered watches.

        `awatch` passes the `stop_event` to a separate task/thread. This way
        `awatch` getting destroyed properly. The background task will continue
        until the signal is received.
        """
        self._stop_event.set()

    async def ready(self) -> bool:
        """Wait until the receiver is ready with a value or an error.

        Once a call to `ready()` has finished, the value should be read with
        a call to `consume()` (`receive()` or iterated over). The receiver will
        remain ready (this method will return immediately) until it is
        consumed.

        Returns:
            Whether the receiver is still active.
        """
        # if there are messages waiting to be consumed, return immediately.
        if self._changes:
            return True

        # if it was already stopped, return immediately.
        if self._awatch_stopped_exc is not None:
            return False

        try:
            self._changes = await anext(self._awatch)
        except StopAsyncIteration as err:
            self._awatch_stopped_exc = err

        return True

    def consume(self) -> Event:
        """Return the latest event once `ready` is complete.

        Returns:
            The next event that was received.

        Raises:
            ReceiverStoppedError: if there is some problem with the receiver.
        """
        if not self._changes and self._awatch_stopped_exc is not None:
            raise ReceiverStoppedError(self) from self._awatch_stopped_exc

        assert self._changes, "`consume()` must be preceded by a call to `ready()`"
        # Tuple of (Change, path) returned by watchfiles
        change, path_str = self._changes.pop()
        return Event(type=EventType(change), path=pathlib.Path(path_str))

    def __str__(self) -> str:
        """Return a string representation of this receiver."""
        if len(self._paths) > 3:
            paths = [str(p) for p in self._paths[:3]]
            paths.append("…")
        else:
            paths = [str(p) for p in self._paths]
        event_types = [event_type.name for event_type in self.event_types]
        return f"{type(self).__name__}:{','.join(event_types)}:{','.join(paths)}"

    def __repr__(self) -> str:
        """Return a string representation of this receiver."""
        return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})"
Attributes¤
event_types instance-attribute ¤
event_types: frozenset[EventType] = frozenset(event_types)

The types of events to watch for.

Functions¤
__aiter__ ¤
__aiter__() -> Self

Initialize the async iterator over received values.

RETURNS DESCRIPTION
Self

self, since no extra setup is needed for the iterator.

Source code in src/frequenz/channels/_receiver.py
def __aiter__(self) -> Self:
    """Initialize the async iterator over received values.

    Returns:
        `self`, since no extra setup is needed for the iterator.
    """
    return self
__anext__ async ¤
__anext__() -> _T

Await the next value in the async iteration over received values.

RETURNS DESCRIPTION
_T

The next value received.

RAISES DESCRIPTION
StopAsyncIteration

if the receiver stopped producing messages.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def __anext__(self) -> _T:
    """Await the next value in the async iteration over received values.

    Returns:
        The next value received.

    Raises:
        StopAsyncIteration: if the receiver stopped producing messages.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        await self.ready()
        return self.consume()
    except ReceiverStoppedError as exc:
        raise StopAsyncIteration() from exc
__del__ ¤
__del__() -> None

Cleanup registered watches.

awatch passes the stop_event to a separate task/thread. This way awatch getting destroyed properly. The background task will continue until the signal is received.

Source code in src/frequenz/channels/file_watcher.py
def __del__(self) -> None:
    """Cleanup registered watches.

    `awatch` passes the `stop_event` to a separate task/thread. This way
    `awatch` getting destroyed properly. The background task will continue
    until the signal is received.
    """
    self._stop_event.set()
__init__ ¤
__init__(
    paths: list[pathlib.Path | str],
    event_types: abc.Iterable[EventType] = frozenset(
        EventType
    ),
) -> None

Create a FileWatcher instance.

PARAMETER DESCRIPTION
paths

Paths to watch for changes.

TYPE: list[Path | str]

event_types

Types of events to watch for. Defaults to watch for all event types.

TYPE: Iterable[EventType] DEFAULT: frozenset(EventType)

Source code in src/frequenz/channels/file_watcher.py
def __init__(
    self,
    paths: list[pathlib.Path | str],
    event_types: abc.Iterable[EventType] = frozenset(EventType),
) -> None:
    """Create a `FileWatcher` instance.

    Args:
        paths: Paths to watch for changes.
        event_types: Types of events to watch for. Defaults to watch for
            all event types.
    """
    self.event_types: frozenset[EventType] = frozenset(event_types)
    """The types of events to watch for."""

    self._stop_event: asyncio.Event = asyncio.Event()
    self._paths: list[pathlib.Path] = [
        path if isinstance(path, pathlib.Path) else pathlib.Path(path)
        for path in paths
    ]
    self._awatch: abc.AsyncGenerator[set[FileChange], None] = awatch(
        *self._paths, stop_event=self._stop_event, watch_filter=self._filter_events
    )
    self._awatch_stopped_exc: Exception | None = None
    self._changes: set[FileChange] = set()
__repr__ ¤
__repr__() -> str

Return a string representation of this receiver.

Source code in src/frequenz/channels/file_watcher.py
def __repr__(self) -> str:
    """Return a string representation of this receiver."""
    return f"{type(self).__name__}({self._paths!r}, {self.event_types!r})"
__str__ ¤
__str__() -> str

Return a string representation of this receiver.

Source code in src/frequenz/channels/file_watcher.py
def __str__(self) -> str:
    """Return a string representation of this receiver."""
    if len(self._paths) > 3:
        paths = [str(p) for p in self._paths[:3]]
        paths.append("…")
    else:
        paths = [str(p) for p in self._paths]
    event_types = [event_type.name for event_type in self.event_types]
    return f"{type(self).__name__}:{','.join(event_types)}:{','.join(paths)}"
consume ¤
consume() -> Event

Return the latest event once ready is complete.

RETURNS DESCRIPTION
Event

The next event that was received.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

Source code in src/frequenz/channels/file_watcher.py
def consume(self) -> Event:
    """Return the latest event once `ready` is complete.

    Returns:
        The next event that was received.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
    """
    if not self._changes and self._awatch_stopped_exc is not None:
        raise ReceiverStoppedError(self) from self._awatch_stopped_exc

    assert self._changes, "`consume()` must be preceded by a call to `ready()`"
    # Tuple of (Change, path) returned by watchfiles
    change, path_str = self._changes.pop()
    return Event(type=EventType(change), path=pathlib.Path(path_str))
map ¤
map(call: Callable[[_T], _U]) -> Receiver[_U]

Return a receiver with call applied on incoming messages.

PARAMETER DESCRIPTION
call

function to apply on incoming messages.

TYPE: Callable[[_T], _U]

RETURNS DESCRIPTION
Receiver[_U]

A Receiver to read results of the given function from.

Source code in src/frequenz/channels/_receiver.py
def map(self, call: Callable[[_T], _U]) -> Receiver[_U]:
    """Return a receiver with `call` applied on incoming messages.

    Args:
        call: function to apply on incoming messages.

    Returns:
        A `Receiver` to read results of the given function from.
    """
    return _Map(self, call)
ready async ¤
ready() -> bool

Wait until the receiver is ready with a value or an error.

Once a call to ready() has finished, the value should be read with a call to consume() (receive() or iterated over). The receiver will remain ready (this method will return immediately) until it is consumed.

RETURNS DESCRIPTION
bool

Whether the receiver is still active.

Source code in src/frequenz/channels/file_watcher.py
async def ready(self) -> bool:
    """Wait until the receiver is ready with a value or an error.

    Once a call to `ready()` has finished, the value should be read with
    a call to `consume()` (`receive()` or iterated over). The receiver will
    remain ready (this method will return immediately) until it is
    consumed.

    Returns:
        Whether the receiver is still active.
    """
    # if there are messages waiting to be consumed, return immediately.
    if self._changes:
        return True

    # if it was already stopped, return immediately.
    if self._awatch_stopped_exc is not None:
        return False

    try:
        self._changes = await anext(self._awatch)
    except StopAsyncIteration as err:
        self._awatch_stopped_exc = err

    return True
receive async ¤
receive() -> _T

Receive a message from the channel.

RETURNS DESCRIPTION
_T

The received message.

RAISES DESCRIPTION
ReceiverStoppedError

if there is some problem with the receiver.

ReceiverError

if there is some problem with the receiver.

Source code in src/frequenz/channels/_receiver.py
async def receive(self) -> _T:
    """Receive a message from the channel.

    Returns:
        The received message.

    Raises:
        ReceiverStoppedError: if there is some problem with the receiver.
        ReceiverError: if there is some problem with the receiver.
    """
    try:
        received = await self.__anext__()  # pylint: disable=unnecessary-dunder-call
    except StopAsyncIteration as exc:
        # If we already had a cause and it was the receiver was stopped,
        # then reuse that error, as StopAsyncIteration is just an artifact
        # introduced by __anext__.
        if (
            isinstance(exc.__cause__, ReceiverStoppedError)
            # pylint is not smart enough to figure out we checked above
            # this is a ReceiverStoppedError and thus it does have
            # a receiver member
            and exc.__cause__.receiver is self  # pylint: disable=no-member
        ):
            raise exc.__cause__
        raise ReceiverStoppedError(self) from exc
    return received