banterbot.utils package
banterbot.utils.closeable_queue module
- class banterbot.utils.closeable_queue.CloseableQueue(maxsize: int = 0)[source]
Bases:
object
A queue that can be closed to prevent further puts. This is useful for when you have a producer thread that you want to stop once it has finished producing items, but you don’t want to stop the consumer thread from consuming items that have already been produced. Must be used as a context manager on the producer thread to ensure that the queue is closed when the producer thread exits.
The intended use case is that the producer thread will put items into the queue until it is finished, then close the queue and exit. The consumer thread will then consume the items in the queue until it is empty, ideally using a for loop to ensure that it exits when the queue is empty and closed.
If a for loop is not used by the consumer thread, then the consumer thread can also use a while loop to consume items from the queue. In this case, the while loop’s condition should be while not queue.closed() to ensure that the consumer thread exits when the queue is empty and closed.
banterbot.utils.indexed_event module
- class banterbot.utils.indexed_event.IndexedEvent(initial_counter: int = 0)[source]
Bases:
Event
A thread synchronization event that uses a counter to manage iterations in a producer-consumer scenario. This class is ideal for situations where a consumer thread processes data chunks provided by a producer thread. The counter ensures that the consumer processes each chunk of data exactly once and waits when no more data is available.
This class extends threading.Event, adding a counter to control the number of times the event allows passage before resetting. It is useful for controlled processing of data chunks in multi-threaded applications, preventing the consumer from proceeding until new data is available.
- clear() None [source]
Resets the event and the counter, typically used to signify that no data is currently available for processing.
- property counter: int
Retrieves the current value of the counter, indicating the number of data chunks available for processing.
- Returns:
The current number of unprocessed data chunks.
- Return type:
int
- decrement(N: int = 1) None [source]
Decrements the counter by a specified amount. It also clears the event if zero is reached, blocking the consumer.
- Parameters:
N (int) – The amount to decrement the counter by. Must be non-negative.
- increment(N: int = 1) None [source]
Increments the counter by a specified amount, indicating that new data chunks are available. It also sets the event, allowing the consumer to resume processing.
- Parameters:
N (int) – The number of new data chunks added. Must be non-negative.
- Raises:
ValueError – If N is less than 1 or N is not a number.
banterbot.utils.nlp module
- class banterbot.utils.nlp.NLP[source]
Bases:
object
A comprehensive toolkit that provides a set of Natural Language Processing utilities. It leverages the capabilities of the spaCy package. The toolkit is designed to automatically download the necessary models if they are not available.
One of the main features of this toolkit is the intelligent model selection mechanism. It is designed to select the most appropriate and lightweight model for each specific task, balancing between computational efficiency and task performance.
- classmethod extract_keywords(strings: list[str]) tuple[tuple[str, ...]] [source]
Extracts keywords from a list of text strings using the en_core_web_md spaCy model.
- Parameters:
strings (list[str]) – A list of strings.
- Returns:
A tuple of extracted keywords as strings.
- Return type:
tuple[str, …]
- classmethod install_upgrade_all_models() None [source]
Lazily checks if models are already installed, and installs any that are missing.
- classmethod model(name: str) Language [source]
Returns the specified spaCy model lazily by loading models the first time they are called, then storing them in the cls._models dictionary.
- Parameters:
name (str) – The name of the spaCy model to return.
- Returns:
The requested spaCy model.
- Return type:
spacy.language.Language
- classmethod segment_sentences(string: str, whitespace: bool = True) tuple[str, ...] [source]
Splits a text string into individual sentences using a specialized spaCy model. The model is a lightweight version of en_core_web_sm designed specifically for sentence segmentation.
- Parameters:
string (str) – The input text string.
whitespace (str) – If True, keep whitespace at the beginning/end of sentences; if False, strip it.
- Returns:
A tuple of individual sentences as strings.
- Return type:
tuple[str, …]
- classmethod segment_words(string: str, whitespace: bool = True) tuple[str, ...] [source]
Splits a text string into individual words using a specialized spaCy model. The model is customized version of en_core_web_md in which words are not split on apostrophes, in order to preserve contractions.
- Parameters:
string (str) – The input text string.
whitespace (str) – If True, include whitespace characters between words; if False, omit it.
- Returns:
A tuple of individual words as strings.
- Return type:
tuple[str, …]
- classmethod tokenize(strings: list[str]) Generator[Doc, None, None] [source]
Given a string or list of strings, returns tokenized versions of the strings as a generator.
- Parameters:
strings (list[str]) – A list of strings.
- Returns:
A stream of spacy.tokens.doc.Doc instances.
- Return type:
Generator[spacy.tokens.doc.Doc, None, None]
banterbot.utils.thread_queue module
- class banterbot.utils.thread_queue.ThreadQueue[source]
Bases:
object
A class for managing and executing tasks in separate threads.
This class maintains a queue of tasks to be executed. Each task is a Thread object, which is executed in its own thread. If there is a task in the queue that hasn’t started executing yet, it will be prevented from running when a new task is added unless it is declared unskippable.
- add_task(thread: Thread, unskippable: bool = False) None [source]
Add a new task to the queue.
This method adds a new task to the queue and starts a wrapper thread to manage its execution. The wrapper thread is responsible for waiting for the previous task to complete, executing the current task if it is unskippable or the last task in the queue, and setting the event for the next task.
- Parameters:
thread (threading.Thread) – The thread to be added to the queue.
unskippable (bool, optional) – Whether the thread should be executed even if a new task is queued.