aboutsummaryrefslogtreecommitdiff
path: root/swr2_asr/utils
diff options
context:
space:
mode:
authorPherkel2023-09-18 18:13:46 +0200
committerGitHub2023-09-18 18:13:46 +0200
commitf94506764bde3e4d41dc593e9d11aa7330c00e30 (patch)
tree6fc438536a72e195805c1aea97926f4c9bbd4f85 /swr2_asr/utils
parent8b3a0b47813733ef67befa6959a4d24f8518b5b7 (diff)
parent21a3b1d7cc8544fa0031b8934283382bdfd1d8f1 (diff)
Merge pull request #38 from Algo-Boys/decoder
Decoder
Diffstat (limited to 'swr2_asr/utils')
-rw-r--r--swr2_asr/utils/data.py20
-rw-r--r--swr2_asr/utils/decoder.py169
-rw-r--r--swr2_asr/utils/tokenizer.py14
3 files changed, 182 insertions, 21 deletions
diff --git a/swr2_asr/utils/data.py b/swr2_asr/utils/data.py
index d551c98..74cd572 100644
--- a/swr2_asr/utils/data.py
+++ b/swr2_asr/utils/data.py
@@ -6,7 +6,7 @@ import numpy as np
import torch
import torchaudio
from torch import Tensor, nn
-from torch.utils.data import DataLoader, Dataset
+from torch.utils.data import Dataset
from torchaudio.datasets.utils import _extract_tar
from swr2_asr.utils.tokenizer import CharTokenizer
@@ -343,3 +343,21 @@ class MLSDataset(Dataset):
dataset_lookup_entry["chapterid"],
idx,
) # type: ignore
+
+
+def create_lexicon(vocab_counts_path, lexicon_path):
+ """Create a lexicon from the vocab_counts.txt file"""
+ words_list = []
+ with open(vocab_counts_path, "r", encoding="utf-8") as file:
+ for line in file:
+ words = line.split()
+ if len(words) >= 1:
+ word = words[0]
+ words_list.append(word)
+
+ with open(lexicon_path, "w", encoding="utf-8") as file:
+ for word in words_list:
+ file.write(f"{word} ")
+ for char in word:
+ file.write(char + " ")
+ file.write("<SPACE>")
diff --git a/swr2_asr/utils/decoder.py b/swr2_asr/utils/decoder.py
index fcddb79..1fd002a 100644
--- a/swr2_asr/utils/decoder.py
+++ b/swr2_asr/utils/decoder.py
@@ -1,26 +1,155 @@
"""Decoder for CTC-based ASR.""" ""
+import os
+from dataclasses import dataclass
+
import torch
+from torchaudio.datasets.utils import _extract_tar
+from torchaudio.models.decoder import ctc_decoder
+from swr2_asr.utils.data import create_lexicon
from swr2_asr.utils.tokenizer import CharTokenizer
-# TODO: refactor to use torch CTC decoder class
-def greedy_decoder(output, labels, label_lengths, tokenizer: CharTokenizer, collapse_repeated=True):
- """Greedily decode a sequence."""
- blank_label = tokenizer.get_blank_token()
- arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member
- decodes = []
- targets = []
- for i, args in enumerate(arg_maxes):
- decode = []
- targets.append(tokenizer.decode(labels[i][: label_lengths[i]].tolist()))
- for j, index in enumerate(args):
- if index != blank_label:
- if collapse_repeated and j != 0 and index == args[j - 1]:
- continue
- decode.append(index.item())
- decodes.append(tokenizer.decode(decode))
- return decodes, targets
-
-
-# TODO: add beam search decoder
+@dataclass
+class DecoderOutput:
+ """Decoder output."""
+
+ words: list[str]
+
+
+def decoder_factory(decoder_type: str = "greedy") -> callable:
+ """Decoder factory."""
+ if decoder_type == "greedy":
+ return get_greedy_decoder
+ if decoder_type == "lm":
+ return get_beam_search_decoder
+ raise NotImplementedError
+
+
+def get_greedy_decoder(
+ tokenizer: CharTokenizer, # pylint: disable=redefined-outer-name
+ *_,
+):
+ """Greedy decoder."""
+ return GreedyDecoder(tokenizer)
+
+
+def get_beam_search_decoder(
+ tokenizer: CharTokenizer, # pylint: disable=redefined-outer-name
+ hparams: dict, # pylint: disable=redefined-outer-name
+):
+ """Beam search decoder."""
+ hparams = hparams.get("lm", {})
+ language, lang_model_path, n_gram, beam_size, beam_threshold, n_best, lm_weight, word_score = (
+ hparams["language"],
+ hparams["language_model_path"],
+ hparams["n_gram"],
+ hparams["beam_size"],
+ hparams["beam_threshold"],
+ hparams["n_best"],
+ hparams["lm_weight"],
+ hparams["word_score"],
+ )
+
+ if not os.path.isdir(os.path.join(lang_model_path, f"mls_lm_{language}")):
+ url = f"https://dl.fbaipublicfiles.com/mls/mls_lm_{language}.tar.gz"
+ torch.hub.download_url_to_file(url, f"data/mls_lm_{language}.tar.gz")
+ _extract_tar("data/mls_lm_{language}.tar.gz", overwrite=True)
+
+ tokens_path = os.path.join(lang_model_path, f"mls_lm_{language}", "tokens.txt")
+ if not os.path.isfile(tokens_path):
+ tokenizer.create_tokens_txt(tokens_path)
+
+ lexicon_path = os.path.join(lang_model_path, f"mls_lm_{language}", "lexicon.txt")
+ if not os.path.isfile(lexicon_path):
+ occurences_path = os.path.join(lang_model_path, f"mls_lm_{language}", "vocab_counts.txt")
+ create_lexicon(occurences_path, lexicon_path)
+
+ lm_path = os.path.join(lang_model_path, f"mls_lm_{language}", f"{n_gram}-gram_lm.arpa")
+
+ decoder = ctc_decoder(
+ lexicon=lexicon_path,
+ tokens=tokens_path,
+ lm=lm_path,
+ blank_token="_",
+ sil_token="<SPACE>",
+ unk_word="<UNK>",
+ nbest=n_best,
+ beam_size=beam_size,
+ beam_threshold=beam_threshold,
+ lm_weight=lm_weight,
+ word_score=word_score,
+ )
+ return decoder
+
+
+class GreedyDecoder:
+ """Greedy decoder."""
+
+ def __init__(self, tokenizer: CharTokenizer): # pylint: disable=redefined-outer-name
+ self.tokenizer = tokenizer
+
+ def __call__(
+ self, output, greedy_type: str = "inference", labels=None, label_lengths=None
+ ): # pylint: disable=redefined-outer-name
+ """Greedily decode a sequence."""
+ if greedy_type == "train":
+ res = self.train(output, labels, label_lengths)
+ if greedy_type == "inference":
+ res = self.inference(output)
+
+ res = [x.split(" ") for x in res]
+ res = [[DecoderOutput(x)] for x in res]
+ return res
+
+ def train(self, output, labels, label_lengths):
+ """Greedily decode a sequence with known labels."""
+ blank_label = tokenizer.get_blank_token()
+ arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member
+ decodes = []
+ targets = []
+ for i, args in enumerate(arg_maxes):
+ decode = []
+ targets.append(self.tokenizer.decode(labels[i][: label_lengths[i]].tolist()))
+ for j, index in enumerate(args):
+ if index != blank_label:
+ if j != 0 and index == args[j - 1]:
+ continue
+ decode.append(index.item())
+ decodes.append(self.tokenizer.decode(decode))
+ return decodes, targets
+
+ def inference(self, output):
+ """Greedily decode a sequence."""
+ collapse_repeated = True
+ arg_maxes = torch.argmax(output, dim=2) # pylint: disable=no-member
+ blank_label = self.tokenizer.get_blank_token()
+ decodes = []
+ for args in arg_maxes:
+ decode = []
+ for j, index in enumerate(args):
+ if index != blank_label:
+ if collapse_repeated and j != 0 and index == args[j - 1]:
+ continue
+ decode.append(index.item())
+ decodes.append(self.tokenizer.decode(decode))
+
+ return decodes
+
+
+if __name__ == "__main__":
+ tokenizer = CharTokenizer.from_file("data/tokenizers/char_tokenizer_german.json")
+ tokenizer.create_tokens_txt("data/tokenizers/tokens_german.txt")
+
+ hparams = {
+ "language": "german",
+ "lang_model_path": "data",
+ "n_gram": 3,
+ "beam_size": 100,
+ "beam_threshold": 100,
+ "n_best": 1,
+ "lm_weight": 0.5,
+ "word_score": 1.0,
+ }
+
+ get_beam_search_decoder(tokenizer, hparams)
diff --git a/swr2_asr/utils/tokenizer.py b/swr2_asr/utils/tokenizer.py
index 1cc7b84..4e3fddd 100644
--- a/swr2_asr/utils/tokenizer.py
+++ b/swr2_asr/utils/tokenizer.py
@@ -29,9 +29,17 @@ class CharTokenizer:
"""Use a character map and convert integer labels to an text sequence"""
string = []
for i in labels:
+ i = int(i)
string.append(self.index_map[i])
return "".join(string).replace("<SPACE>", " ")
+ def decode_batch(self, labels: list[list[int]]) -> list[str]:
+ """Use a character map and convert integer labels to an text sequence"""
+ string = []
+ for label in labels:
+ string.append(self.decode(label))
+ return string
+
def get_vocab_size(self) -> int:
"""Get the number of unique characters in the dataset"""
return len(self.char_map)
@@ -120,3 +128,9 @@ class CharTokenizer:
load_tokenizer.char_map[char] = int(index)
load_tokenizer.index_map[int(index)] = char
return load_tokenizer
+
+ def create_tokens_txt(self, path: str):
+ """Create a txt file with all the characters"""
+ with open(path, "w", encoding="utf-8") as file:
+ for char, _ in self.char_map.items():
+ file.write(f"{char}\n")