diff options
Diffstat (limited to 'swr2_asr')
-rw-r--r-- | swr2_asr/inference.py | 35 | ||||
-rw-r--r-- | swr2_asr/utils/decoder.py | 121 |
2 files changed, 102 insertions, 54 deletions
diff --git a/swr2_asr/inference.py b/swr2_asr/inference.py index 3c58af0..64a6eeb 100644 --- a/swr2_asr/inference.py +++ b/swr2_asr/inference.py @@ -6,25 +6,10 @@ import torchaudio import yaml from swr2_asr.model_deep_speech import SpeechRecognitionModel +from swr2_asr.utils.decoder import decoder_factory from swr2_asr.utils.tokenizer import CharTokenizer -def greedy_decoder(output, tokenizer: CharTokenizer, collapse_repeated=True): - """Greedily decode a sequence.""" - arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member - blank_label = tokenizer.get_blank_token() - decodes = [] - for args in arg_maxes: - decode = [] - for j, index in enumerate(args): - if index != blank_label: - if collapse_repeated and j != 0 and index == args[j - 1]: - continue - decode.append(index.item()) - decodes.append(tokenizer.decode(decode)) - return decodes - - @click.command() @click.option( "--config_path", @@ -46,11 +31,16 @@ def main(config_path: str, file_path: str) -> None: model_config = config_dict.get("model", {}) tokenizer_config = config_dict.get("tokenizer", {}) inference_config = config_dict.get("inference", {}) + decoder_config = config_dict.get("decoder", {}) - if inference_config["device"] == "cpu": + if inference_config.get("device", "") == "cpu": device = "cpu" - elif inference_config["device"] == "cuda": + elif inference_config.get("device", "") == "cuda": device = "cuda" if torch.cuda.is_available() else "cpu" + elif inference_config.get("device", "") == "mps": + device = "mps" + else: + device = "cpu" device = torch.device(device) # pylint: disable=no-member tokenizer = CharTokenizer.from_file(tokenizer_config["tokenizer_path"]) @@ -90,11 +80,16 @@ def main(config_path: str, file_path: str) -> None: spec = spec.unsqueeze(0) spec = spec.transpose(1, 2) spec = spec.unsqueeze(0) + spec = spec.to(device) output = model(spec) # pylint: disable=not-callable output = F.log_softmax(output, dim=2) # (batch, time, n_class) - decoded_preds = greedy_decoder(output, tokenizer) - print(decoded_preds) + decoder = decoder_factory(decoder_config["type"])(tokenizer, decoder_config) + + preds = decoder(output) + preds = " ".join(preds[0][0].words).strip() + + print(preds) if __name__ == "__main__": diff --git a/swr2_asr/utils/decoder.py b/swr2_asr/utils/decoder.py index 098f6a4..2b6d29b 100644 --- a/swr2_asr/utils/decoder.py +++ b/swr2_asr/utils/decoder.py @@ -1,4 +1,5 @@ """Decoder for CTC-based ASR.""" "" +from dataclasses import dataclass import os import torch @@ -9,37 +10,39 @@ from swr2_asr.utils.data import create_lexicon from swr2_asr.utils.tokenizer import CharTokenizer -# TODO: refactor to use torch CTC decoder class -def greedy_decoder( - output, labels, label_lengths, tokenizer: CharTokenizer, collapse_repeated=True -): # pylint: disable=redefined-outer-name - """Greedily decode a sequence.""" - blank_label = tokenizer.get_blank_token() - arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member - decodes = [] - targets = [] - for i, args in enumerate(arg_maxes): - decode = [] - targets.append(tokenizer.decode(labels[i][: label_lengths[i]].tolist())) - for j, index in enumerate(args): - if index != blank_label: - if collapse_repeated and j != 0 and index == args[j - 1]: - continue - decode.append(index.item()) - decodes.append(tokenizer.decode(decode)) - return decodes, targets - - -def beam_search_decoder( +@dataclass +class DecoderOutput: + """Decoder output.""" + + words: list[str] + + +def decoder_factory(decoder_type: str = "greedy") -> callable: + """Decoder factory.""" + if decoder_type == "greedy": + return get_greedy_decoder + if decoder_type == "lm": + return get_beam_search_decoder + raise NotImplementedError + + +def get_greedy_decoder( + tokenizer: CharTokenizer, # pylint: disable=redefined-outer-name + *_, +): + """Greedy decoder.""" + return GreedyDecoder(tokenizer) + + +def get_beam_search_decoder( tokenizer: CharTokenizer, # pylint: disable=redefined-outer-name - tokens_path: str, - lang_model_path: str, - language: str, hparams: dict, # pylint: disable=redefined-outer-name ): """Beam search decoder.""" - - n_gram, beam_size, beam_threshold, n_best, lm_weight, word_score = ( + hparams = hparams.get("lm", {}) + language, lang_model_path, n_gram, beam_size, beam_threshold, n_best, lm_weight, word_score = ( + hparams["language"], + hparams["language_model_path"], hparams["n_gram"], hparams["beam_size"], hparams["beam_threshold"], @@ -53,6 +56,7 @@ def beam_search_decoder( torch.hub.download_url_to_file(url, f"data/mls_lm_{language}.tar.gz") _extract_tar("data/mls_lm_{language}.tar.gz", overwrite=True) + tokens_path = os.path.join(lang_model_path, f"mls_lm_{language}", "tokens.txt") if not os.path.isfile(tokens_path): tokenizer.create_tokens_txt(tokens_path) @@ -79,11 +83,66 @@ def beam_search_decoder( return decoder +class GreedyDecoder: + """Greedy decoder.""" + + def __init__(self, tokenizer: CharTokenizer): # pylint: disable=redefined-outer-name + self.tokenizer = tokenizer + + def __call__( + self, output, greedy_type: str = "inference", labels=None, label_lengths=None + ): # pylint: disable=redefined-outer-name + """Greedily decode a sequence.""" + if greedy_type == "train": + res = self.train(output, labels, label_lengths) + if greedy_type == "inference": + res = self.inference(output) + + res = [[DecoderOutput(words=res)]] + return res + + def train(self, output, labels, label_lengths): + """Greedily decode a sequence with known labels.""" + blank_label = tokenizer.get_blank_token() + arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member + decodes = [] + targets = [] + for i, args in enumerate(arg_maxes): + decode = [] + targets.append(self.tokenizer.decode(labels[i][: label_lengths[i]].tolist())) + for j, index in enumerate(args): + if index != blank_label: + if j != 0 and index == args[j - 1]: + continue + decode.append(index.item()) + decodes.append(self.tokenizer.decode(decode)) + return decodes, targets + + def inference(self, output): + """Greedily decode a sequence.""" + collapse_repeated = True + arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member + blank_label = self.tokenizer.get_blank_token() + decodes = [] + for args in arg_maxes: + decode = [] + for j, index in enumerate(args): + if index != blank_label: + if collapse_repeated and j != 0 and index == args[j - 1]: + continue + decode.append(index.item()) + decodes.append(self.tokenizer.decode(decode)) + + return decodes + + if __name__ == "__main__": tokenizer = CharTokenizer.from_file("data/tokenizers/char_tokenizer_german.json") tokenizer.create_tokens_txt("data/tokenizers/tokens_german.txt") hparams = { + "language": "german", + "lang_model_path": "data", "n_gram": 3, "beam_size": 100, "beam_threshold": 100, @@ -92,10 +151,4 @@ if __name__ == "__main__": "word_score": 1.0, } - beam_search_decoder( - tokenizer, - "data/tokenizers/tokens_german.txt", - "data", - "german", - hparams, - ) + get_beam_search_decoder(tokenizer, hparams) |