Source code for banterbot.services.speech_recognition_service

import datetime
import logging
import os
import threading
import time
from typing import Optional, Union

import azure.cognitiveservices.speech as speechsdk

from banterbot.config import DEFAULT_LANGUAGE, INTERRUPTION_DELAY
from banterbot.data.enums import EnvVar
from banterbot.handlers.speech_recognition_handler import SpeechRecognitionHandler
from banterbot.handlers.stream_handler import StreamHandler
from banterbot.managers.stream_manager import StreamManager
from banterbot.models.speech_recognition_input import SpeechRecognitionInput
from banterbot.models.stream_log_entry import StreamLogEntry
from banterbot.utils.closeable_queue import CloseableQueue


[docs] class SpeechRecognitionService: """ The `SpeechRecognitionService` class provides an interface to convert spoken language into written text using Azure Cognitive Speech Services. It allows continuous speech recognition and provides real-time results as sentences are recognized. """ # Create a lock that prevents race conditions when recognizing speech _recognition_lock = threading.Lock()
[docs] def __init__( self, languages: Union[str, list[str]] = None, phrase_list: Optional[list[str]] = None, ) -> None: """ Initializes the `SpeechRecognitionService` instance by setting up the Azure Cognitive Services speech configuration and recognizer. Argument `recognition_language` can take one or more values, each representing a language the recognizer can expect to receive as an input. The recognizer will attempt to auto-detect the language if multiple are provided. Args: languages (Union[str, list[str]): The language(s) the speech-to-text recognizer expects to hear. phrase_list(list[str], optional): Optionally provide the recognizer with context to improve recognition. """ # Initialize the `SpeechRecognizer`. self._init_recognizer(languages=languages, phrase_list=phrase_list) # Set the total offset to zero, which will increment as recognitions occur. self._total_offset = datetime.timedelta() self._last_total_offset = 0 # Initialize the `StreamManager` for handling streaming processes. self._stream_manager = StreamManager() # The latest interruption time. self._interrupt = 0 # A list of active stream handlers. self._stream_handlers = [] self._stream_handlers_lock = threading.Lock()
[docs] def interrupt(self, kill: bool = False) -> None: """ Interrupts the current speech recognition process. Args: kill (bool): Whether the interruption should kill the queues or not. """ self._interrupt = time.perf_counter_ns() with self._stream_handlers_lock: for handler in self._stream_handlers: handler.interrupt(kill=kill) self._stream_handlers.clear() logging.debug(f"SpeechRecognitionService Interrupted")
[docs] def phrases_add(self, phrases: list[str]) -> None: """ Add a new phrase to the PhraseListGrammar instance, which implements a bias towards the specified words/phrases that can help improve speech recognition in circumstances where there may be potential ambiguity. Args: phrases(list[str]): Provide the recognizer with additional text context to improve recognition. """ for phrase in phrases: self._phrase_list_grammar.addPhrase(phrase)
[docs] def phrases_clear(self) -> None: """ Clear all phrases from the PhraseListGrammar instance. """ self._phrase_list_grammar.clear()
[docs] def recognize(self, init_time: Optional[int] = None) -> Union[StreamHandler, tuple[()]]: """ Recognizes speech and returns a generator that yields the recognized sentences as they are processed. Args: init_time (Optional[int]): The time at which the recognition was initialized. Returns: Union[StreamHandler, tuple[()]: A handler for the stream of recognized sentences, or an empty tuple if the recognition was interrupted. """ # Record the time at which the recognition was initialized pre-lock, in order to account for future interruptions. init_time = time.perf_counter_ns() if init_time is None else init_time # Only allow one listener to be active at once. with self.__class__._recognition_lock: if self._interrupt >= init_time: return tuple() else: self._queue = CloseableQueue() iterable = SpeechRecognitionHandler(recognizer=self._recognizer, queue=self._queue) handler = self._stream_manager.stream( iterable=iterable, close_stream=iterable.close, init_shared_data={"init_time": init_time} ) with self._stream_handlers_lock: self._stream_handlers.append(handler) return handler
def _init_recognizer( self, languages: Union[str, list[str]] = None, phrase_list: Optional[list[str]] = None ) -> None: """ Initialize an instance of the the Microsoft Azure Cognitive Services Speech SDK `SpeechRecognizer` class and configure it with any user-provided custom settings, or alternatively its package defaults, then open a connection to reduce overhead. Args: languages (Union[str, list[str]): The language(s) the speech-to-text recognizer expects to hear. phrase_list(list[str], optional): Optionally provide the recognizer with context to improve recognition. """ logging.debug(f"SpeechRecognitionService initialized") # Initialize the speech configuration with the Azure subscription and region self._speech_config = speechsdk.SpeechConfig( subscription=os.environ.get(EnvVar.AZURE_SPEECH_KEY.value), region=os.environ.get(EnvVar.AZURE_SPEECH_REGION.value), ) # Allowing the speech configuration to recognize profane words self._speech_config.set_profanity(speechsdk.ProfanityOption.Raw) # Activate the receipt of data pertaining to word timings self._speech_config.request_word_level_timestamps() # Initialize the speech recognizer with the speech configuration and language settings self._recognizer = speechsdk.SpeechRecognizer( speech_config=self._speech_config, **self._language_kwargs(languages=languages) ) # Initialize an instance of `PhraseListGrammar` which is used to provide context to improve speech recognition self._phrase_list_grammar = speechsdk.PhraseListGrammar.from_recognizer(self._recognizer) # Add any phrases in `phrase_list` to the `PhraseListGrammar`. if phrase_list: self.phrases_add(phrase_list) # Connect the speech recognizer events to their corresponding callbacks self._callbacks_connect() # Creating a new instance of Connection class self._connection = speechsdk.Connection.from_recognizer(self._recognizer) # Pre-connecting the speech recognizer for reduced latency self._connection.open(True) def _language_kwargs(self, languages: Union[str, list[str]] = None) -> dict: """ Set the language(s) being recognized by the Speech-to-Text recognizer in two modes: if a string is given to argument `languages` in the __init__ method, then set the language explicitly in the Azure speech SDK `SpeechRecognizer` object in this `SpeechRecognitionService` instance. If a list of languages is given, use the `AutoDetectSourceLanguageConfig` object from the speech SDK to allow the `SpeechRecognizer` to automatically choose the input language from one of those provided. Args: languages (Union[str, list[str]): The language(s) the speech-to-text recognizer expects to hear. Returns: dict: A dictionary of keywords arguments for the Speech Recognizer. """ # Determine whether language auto-detection should be activated self._languages = DEFAULT_LANGUAGE if languages is None else languages self._auto_detection = not isinstance(self._languages, str) if self._auto_detection: self._speech_config.set_property( property_id=speechsdk.PropertyId.SpeechServiceConnection_LanguageIdMode, value="Continuous", ) config = speechsdk.AutoDetectSourceLanguageConfig(languages=self._languages) kwargs = {"auto_detect_source_language_config": config} else: config = speechsdk.SourceLanguageConfig(language=self._languages) kwargs = {"source_language_config": config} return kwargs def _callback_completed(self, event: speechsdk.SessionEventArgs) -> None: """ Detects when the speech recognizer has disconnected, triggering a `threading.Event` instance. Args: event (speechsdk.SessionEventArgs): Event arguments containing information about the recognition stopped. """ logging.debug("SpeechRecognitionService disconnected") self._queue.close() def _callback_started(self, event: speechsdk.SessionEventArgs) -> None: """ Callback function for recognition started event. Signals that the recognition process has started. Args: event (speechsdk.SessionEventArgs): Event arguments containing information about the recognition started. """ logging.debug("SpeechRecognitionService connected") self._start_recognition_time = time.perf_counter_ns()
[docs] def exception_handler(self, log: list[StreamLogEntry], index: int, shared_data: dict): """ Handles exceptions that occur during the processing of the stream log. """ cutoff = ( datetime.timedelta(microseconds=1e-3 * (shared_data["interrupt"] - shared_data["init_time"])) + INTERRUPTION_DELAY ) if log[index].offset - self._total_offset > cutoff: raise StopIteration elif log[index].offset_end - self._total_offset > cutoff: if (event := log[index].from_cutoff(cutoff=cutoff)) is not None: log[index] = event return log[index]
def _callback_recognized(self, event: speechsdk.SpeechRecognitionEventArgs) -> None: """ Processes the recognized speech event by appending it to the list of events and setting the `new_events` flag. Args: event (speechsdk.SpeechRecognitionEventArgs): The recognized speech event. """ if self._start_recognition_time > self._last_total_offset: self._last_total_offset = self._start_recognition_time self._total_offset = datetime.timedelta(microseconds=event.offset / 10) self._queue.put( SpeechRecognitionInput.from_recognition_result( event.result, self._languages if not self._auto_detection else None ) ) def _callbacks_connect(self) -> None: """ Connects the recognizer events to their corresponding callback methods. """ self._recognizer.session_started.connect(self._callback_started) self._recognizer.recognized.connect(self._callback_recognized) self._recognizer.canceled.connect(self._callback_completed) self._recognizer.session_stopped.connect(self._callback_completed)