Source code for banterbot.utils.indexed_event

import threading


[docs] class IndexedEvent(threading.Event): """ A thread synchronization event that uses a counter to manage iterations in a producer-consumer scenario. This class is ideal for situations where a consumer thread processes data chunks provided by a producer thread. The counter ensures that the consumer processes each chunk of data exactly once and waits when no more data is available. This class extends threading.Event, adding a counter to control the number of times the event allows passage before resetting. It is useful for controlled processing of data chunks in multi-threaded applications, preventing the consumer from proceeding until new data is available. """ def __init__(self, initial_counter: int = 0) -> None: """ Initializes the IndexedEvent with an optional initial counter value, which represents the number of items initially available for processing. Args: initial_counter (int): The initial count of items available for processing. Must be non-negative. Raises: ValueError: If the initial counter is set to a negative value. """ super().__init__() self._lock: threading.Lock = threading.Lock() self._counter: int = initial_counter @property def counter(self) -> int: """ Retrieves the current value of the counter, indicating the number of data chunks available for processing. Returns: int: The current number of unprocessed data chunks. """ return self._counter @counter.setter def counter(self, N: int) -> None: """ Wrapper for method `set`. Args: N (int): The number of data chunks available. Must be non-negative. Raises: ValueError: If N is less than 1 or N is not a number. """ self.set(N=N)
[docs] def clear(self) -> None: """ Resets the event and the counter, typically used to signify that no data is currently available for processing. """ # with self._lock: super().clear() self._counter = 0
[docs] def increment(self, N: int = 1) -> None: """ Increments the counter by a specified amount, indicating that new data chunks are available. It also sets the event, allowing the consumer to resume processing. Args: N (int): The number of new data chunks added. Must be non-negative. Raises: ValueError: If N is less than 1 or N is not a number. """ # with self._lock: self._counter += N if self._counter > 0: super().set()
[docs] def decrement(self, N: int = 1) -> None: """ Decrements the counter by a specified amount. It also clears the event if zero is reached, blocking the consumer. Args: N (int): The amount to decrement the counter by. Must be non-negative. """ # with self._lock: self._counter -= N if self._counter > 0: super().set() else: super().clear()
[docs] def is_set(self) -> bool: """ Checks if the event is set, meaning data is available for processing. Returns: bool: True if data is available, False otherwise. """ return super().is_set()
[docs] def set(self, N: int = 1) -> None: """ Directly sets the counter to a specified value, indicating the exact number of data chunks available. Args: N (int): The number of data chunks available. Must be non-negative. Raises: ValueError: If N is less than 1 or N is not a number. """ # with self._lock: if N > 0: super().set() else: super().clear() self._counter = N