Source code for banterbot.utils.thread_queue

import logging
import threading


[docs] class ThreadQueue: """ A class for managing and executing tasks in separate threads. This class maintains a queue of tasks to be executed. Each task is a Thread object, which is executed in its own thread. If there is a task in the queue that hasn't started executing yet, it will be prevented from running when a new task is added unless it is declared unskippable. """ def __init__(self): logging.debug(f"ThreadQueue initialized") self._lock = threading.Lock() self._event_queue: list[threading.Event] = []
[docs] def add_task(self, thread: threading.Thread, unskippable: bool = False) -> None: """ Add a new task to the queue. This method adds a new task to the queue and starts a wrapper thread to manage its execution. The wrapper thread is responsible for waiting for the previous task to complete, executing the current task if it is unskippable or the last task in the queue, and setting the event for the next task. Args: thread (threading.Thread): The thread to be added to the queue. unskippable (bool, optional): Whether the thread should be executed even if a new task is queued. """ with self._lock: index = len(self._event_queue) self._event_queue.append(threading.Event()) wrapper_thread = threading.Thread( target=self._thread_wrapper, args=(thread, index, unskippable), daemon=thread.daemon ) wrapper_thread.start()
[docs] def is_alive(self) -> bool: """ Check if the last task in the queue is still running. Returns: bool: True if the last task is still running, False otherwise. """ if not self._event_queue: return False else: return not self._event_queue[-1].is_set()
def _thread_wrapper(self, thread: threading.Thread, index: int, unskippable: bool) -> None: """ A wrapper function for executing threads in the queue. This function is responsible for waiting for the previous task to complete before starting the current task, and setting the event for the next task in the queue. If the current task is skippable and not the last task in the queue, it will not be executed. Args: thread (threading.Thread): The thread to be executed. index (int): The index of the thread in the event queue. unskippable (bool): Whether the thread should be executed even if a new task is added to the queue. """ if index > 0: self._event_queue[index - 1].wait() if unskippable or index == len(self._event_queue) - 1: logging.debug(f"ThreadQueue thread {index} started") thread.start() thread.join() else: logging.debug(f"ThreadQueue thread {index} skipped") self._event_queue[index].set()