# Environment Setup

In [1]:
!pip install hazm  # Requires Restart

Collecting hazm
  Downloading hazm-0.10.0-py3-none-any.whl (892 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m892.6/892.6 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting fasttext-wheel<0.10.0,>=0.9.2 (from hazm)
  Downloading fasttext_wheel-0.9.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.4/4.4 MB[0m [31m13.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting flashtext<3.0,>=2.7 (from hazm)
  Downloading flashtext-2.7.tar.gz (14 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting numpy==1.24.3 (from hazm)
  Downloading numpy-1.24.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (17.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m17.3/17.3 MB[0m [31m21.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting python-crfsuite<0.10.0,>=0.9.9 (from hazm)
  Downloading python_crfsuite-0.9.10-cp310-cp310-manylinux_2_17_x86_64

In [1]:
! pip install pydub

Collecting pydub
  Using cached pydub-0.25.1-py2.py3-none-any.whl (32 kB)
Installing collected packages: pydub
Successfully installed pydub-0.25.1


## Setup parsi io

In [2]:
! git clone https://github.com/language-ml/parsi.io.git

fatal: destination path 'parsi.io' already exists and is not an empty directory.


In [3]:
mv parsi.io parsi_io

## Setup Perpos POS Tagger

In [4]:
! pip install sklearn-crfsuite

Collecting sklearn-crfsuite
  Downloading sklearn_crfsuite-0.3.6-py2.py3-none-any.whl (12 kB)
Installing collected packages: sklearn-crfsuite
Successfully installed sklearn-crfsuite-0.3.6


In [5]:
! git clone https://github.com/mhbashari/perpos.git

Cloning into 'perpos'...
remote: Enumerating objects: 41, done.[K
remote: Total 41 (delta 0), reused 0 (delta 0), pack-reused 41[K
Receiving objects: 100% (41/41), 17.33 MiB | 8.56 MiB/s, done.
Resolving deltas: 100% (11/11), done.


In [6]:
import string

from nltk import tree2conlltags


def read_conll(path, col=2):
    with open(path, "r", encoding="utf-8") as conll:
        out = []
        for sent in conll.readlines():
            split = sent.strip("\r\n").split()
            if len(split) > 1:
                none_token_count = col - 1
                new_elem = split[-1:]
                new_elem = split[:none_token_count] + new_elem
                out.append(new_elem)

            else:
                yield out
                out = []


def template(word):
    return "".join([(lambda item: "x" if not item in "آایو" else "a")(char) for char in word])


def isdigit(word):
    return all(map(lambda char: char in "۱۲۳۴۵۶۷۸۹۰1234567890.", word))


def ngram(word, leng=2):
    for i in range(len(word) - 1):
        yield 'word[' + str(i) + ":" + str(i + leng) + "]", word[i:i + leng]


def tree2brackets(tree):
    str, tag = '', ''
    for item in tree2conlltags(tree):
        if item[2][0] in {'B', 'O'} and tag:
            str += tag + '] '
            tag = ''

        if item[2][0] == 'B':
            tag = item[2].split('-')[1]
            str += '['
        str += item[0] + ' '

    if tag:
        str += tag + '] '

    return str.strip()

def word2features(sent, i):
    W = sent[i]
    features = {
        'B': 1.0,
        'W': W,
        'P': W in string.punctuation,
        'T': template(W),
        'D(W)': isdigit(W),
    }
    for leng in range(max(4 + 1, len(W)) + 1):
        for k, v in ngram(W, leng=leng):
            features[k] = v
    if i > 0:
        W = sent[i - 1][0]
        features.update({
            '-1W[-3': W[-3:],
            '-1W[-2': W[-2:],
            '-1W[-1': W[-1:],
            '-1W': W,
            '-1W0W': W + sent[i],
            '-1P': W in string.punctuation,
            '-1T': template(W)
        })
    else:
        features['BOS'] = True
    if i > 1:
        W = sent[i - 2][0]
        features.update({
            '-2W[-3': W[-3:],
            '-2W[-2': W[-2:],
            '-2W[-1': W[-1:],
            '-2P': W in string.punctuation,
            '-2T': template(W)
        })

    if i < len(sent) - 2:
        W = sent[i + 2][0]
        features.update({
            '+2W[-1': W[-1:],
            '+2W[-2': W[-2:],
            '+2W': W,
            '+2P': W in string.punctuation,
            '+2T': template(W)
        })
    if i < len(sent) - 1:
        W = sent[i + 1][0]
        features.update({
            '+1W[-1': W[-1:],
            '+1W': W,
            '+1W0W': W + sent[i],
            '+1W[-2': W[-2:],
            '+1:P': W in string.punctuation,
            '+1:T': template(W)
        })
    else:
        features['EOS'] = True
    if 0 < i < len(sent) - 1:
        features['-1W/+1W'] = sent[i + 1][0] + "/" + sent[i - 1][0]
    return features


def token2features(token_list):
    return [word2features(token_list, i) for i in range(len(token_list))]


def sent2labels(sent):
    return [postag for token, postag in sent]


def sent2tokens(sent):
    return [token for token, postag in sent]


import pickle

class POSTagger:
    def __init__(self, model_path):
        self.model_path = model_path
        self.crf = pickle.load(open(model_path, "rb"))

    def parse(self, token_stream):
        return self.parse_sentences([token_stream])[0]

    def parse_sentences(self, list_of_token_stream):
        X_test = [token2features(s) for s in list_of_token_stream]
        y_pred = self.crf.predict(X_test)
        out = []
        for x_sent, y_pred in zip(list_of_token_stream, y_pred):
            out.append(list(zip(x_sent, y_pred)))
        return out


In [7]:
pos_tagger = POSTagger("perpos/model/perpos.model")

## Setup Aeneas

In [8]:
!pip install -q numpy==1.22.4
!apt-get install ffmpeg
!apt-get install espeak
!pip install -q beautifulsoup4
!pip install -q lxml
!apt-get install libgdal-dev

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.8/16.8 MB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25h[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
chex 0.1.86 requires numpy>=1.24.1, but you have numpy 1.22.4 which is incompatible.
cudf-cu12 24.4.1 requires numpy<2.0a0,>=1.23, but you have numpy 1.22.4 which is incompatible.
hazm 0.10.0 requires numpy==1.24.3, but you have numpy 1.22.4 which is incompatible.
pandas-stubs 2.0.3.230814 requires numpy>=1.25.0; python_version >= "3.9", but you have numpy 1.22.4 which is incompatible.
plotnine 0.12.4 requires numpy>=1.23.0, but you have numpy 1.22.4 which is incompatible.
rmm-cu12 24.4.0 requires numpy<2.0a0,>=1.23, but you have numpy 1.22.4 which is incompatible.
tensorflow 2.15.0 requires numpy<2.0.0,>=1.23.5, but you have numpy 1.22.4 which is incompatible.[0m[31m
Reading packag

In [9]:
!wget https://raw.githubusercontent.com/readbeyond/aeneas/master/install_dependencies.sh
!bash install_dependencies.sh

--2024-06-03 15:43:07--  https://raw.githubusercontent.com/readbeyond/aeneas/master/install_dependencies.sh
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2759 (2.7K) [text/plain]
Saving to: ‘install_dependencies.sh’


2024-06-03 15:43:07 (32.3 MB/s) - ‘install_dependencies.sh’ saved [2759/2759]

[INFO] A.1 Adding deb-multimedia to apt sources...
[INFO] A.1 Adding deb-multimedia to apt sources... done
[INFO] A.2 Updating apt...
Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 http://archive.ubuntu.c

In [10]:
!git clone https://github.com/ReadBeyond/aeneas.git
!cd /content/aeneas
!sudo pip install -r /content/aeneas/requirements.txt
!python /content/aeneas/setup.py build_ext --inplace
!python /content/aeneas/aeneas_check_setup.py

Cloning into 'aeneas'...
remote: Enumerating objects: 5636, done.[K
remote: Counting objects: 100% (19/19), done.[K
remote: Compressing objects: 100% (18/18), done.[K
remote: Total 5636 (delta 0), reused 10 (delta 0), pack-reused 5617[K
Receiving objects: 100% (5636/5636), 29.86 MiB | 22.90 MiB/s, done.
Resolving deltas: 100% (4272/4272), done.
[39mrunning build_ext[0m
[39mbuilding 'aeneas.cdtw.cdtw' extension[0m
Make sure that Python modules winreg, win32api or win32con are installed.[0m
[39mINFO: C compiler: x86_64-linux-gnu-gcc -Wno-unused-result -Wsign-compare -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat -Werror=format-security -g -fwrapv -O2 -fPIC
[0m
[39mcreating build[0m
[39mcreating build/temp.linux-x86_64-cpython-310[0m
[39mcreating build/temp.linux-x86_64-cpython-310/aeneas[0m
[39mcreating build/temp.linux-x86_64-cpython-310/aeneas/cdtw[0m
[39mcreating build/temp.linux-x86_64-cpython-310/aeneas/cint[0m
[39mINFO: compile options: '-I

In [11]:
!pip install -q aeneas

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.5/5.5 MB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for aeneas (setup.py) ... [?25l[?25hdone


In [12]:
!python -m aeneas.diagnostics

[92m[INFO] ffprobe        OK[0m
[92m[INFO] ffmpeg         OK[0m
[92m[INFO] espeak         OK[0m
[92m[INFO] aeneas.tools   OK[0m
[93m[WARN]   The default input encoding of your shell is not UTF-8[0m
[93m[WARN]   The default output encoding of your shell is not UTF-8[0m
[INFO]   If you plan to use aeneas on the command line,
[INFO]   you might want to 'export PYTHONIOENCODING=UTF-8' in your shell
[92m[INFO] aeneas.cdtw    AVAILABLE[0m
[92m[INFO] aeneas.cmfcc   AVAILABLE[0m
[92m[INFO] aeneas.cew     AVAILABLE[0m
[92m[INFO] All required dependencies are met and all available Python C extensions are working[0m


## Import Libraries

In [31]:
import os
import re
import subprocess
import shutil
import json
from functools import reduce
from pydub import AudioSegment
from hazm import Normalizer, sent_tokenize, word_tokenize
from parsi_io.parsi_io.modules.number_extractor import NumberExtractor
from parsi_io.parsi_io.modules.convert_number_to_text import ConvertNumberToText

# Convert Audio to Mono Mp3

In [None]:
raw_data_dir = "raw-data/"

processed_data_dir = "processed-data/"
os.makedirs(processed_data_dir, exist_ok=True)

In [None]:
def convert_audio_to_mono_mp3(input_file_path, output_file_path):
    input_file_name = input_file_path.split('/')[-1].split('.')[0]

    # Load the audio file
    sound = AudioSegment.from_file(input_file_path)

    # Convert stereo to mono
    sound = sound.set_channels(1)

    # Export the audio in MP3 format
    sound.export(output_file_path, format="mp3")

    return output_file_path


In [None]:
# Iterate over all.m4a files in the source directory
for filename in os.listdir(raw_data_dir):
    if filename.endswith('.m4a'):
        # Construct full file path
        source_file_path = os.path.join(raw_data_dir, filename)
        # Construct destination file path
        destination_file_path = os.path.join(processed_data_dir, filename.replace('.m4a', '.mp3'))

        convert_audio_to_mono_mp3(source_file_path, destination_file_path)
        print(f'Converted {filename} to MP3.')

print('All.m4a files have been converted to.mp3.')


Converted 2.m4a to MP3.
Converted 1.m4a to MP3.
Converted 3.m4a to MP3.
Converted 4.m4a to MP3.
Converted 9.m4a to MP3.
Converted 12.m4a to MP3.
Converted 14.m4a to MP3.
Converted 16.m4a to MP3.
Converted 20.m4a to MP3.
Converted 22.m4a to MP3.
Converted 24.m4a to MP3.
Converted 26.m4a to MP3.
Converted 30.m4a to MP3.
Converted 40.m4a to MP3.
Converted 50.m4a to MP3.
Converted 52.m4a to MP3.
Converted 54.m4a to MP3.
Converted 56.m4a to MP3.
Converted 64.m4a to MP3.
Converted 72.m4a to MP3.
Converted 76.m4a to MP3.
Converted 101.m4a to MP3.
Converted 103.m4a to MP3.
Converted 107.m4a to MP3.
Converted 111.m4a to MP3.
All.m4a files have been converted to.mp3.


# Process Text

## Normalization

In [16]:
normalizer = Normalizer()

def normalize_text(text):
  return normalizer.normalize(text)

## Symbol Substitution
This step is designed to unify various forms of symbols into their more commonly used counterparts.

In [17]:
substitution_dict = {'ﯽ': 'ی', '—': '–', '\u200f': '\u200c', '\xad': '\u200c', '\u200e': '\u200c', '\u200d': '\u200c'}

def substitute_symbols(text):
    translation_table = str.maketrans(substitution_dict)
    substituted_text = text.translate(translation_table)
    return substituted_text

## Remove In-text References
This step is designed to remove the references that come inside the text but are not read aloud. For example:
> They have introduced a new tool [1] which ...

In [18]:
def remove_inline_references(text):
    # Define pattern to match references like "[NUM]"
    pattern_fa = r"\[\d+\]|\[۰-۹]+\]"
    pattern_en = r"\[\d+\]|\[0-9]+\]"

    # Use regular expression to remove references
    text_without_refs_fa = re.sub(pattern_fa, " ", text)
    text_without_refs_en = re.sub(pattern_en, " ", text_without_refs_fa)

    return text_without_refs_en

## Remove Reference Lines
This step is designed to remove the references that come at the end of the text but are not read aloud. For example:
> [1] Roshan-AI. Hazm. https://www.roshan-ai.ir/hazm/docs/index.html. Accessed:
May 3, 2024.
>
> [2] ...



In [19]:
def remove_references_lines(text):
    # Define pattern to match references like "[NUM] "
    pattern_fa = r"^\s*\[\d+\]|\[۰-۹]+\]"
    pattern_en = r"^\s*\[\d+\]|\[0-9]+\]"

    # Split text into lines
    lines = text.split('\n')

    # Remove lines starting with references
    cleaned_lines = [line for line in lines if not re.match(pattern_fa, line.strip()) and not re.match(pattern_en, line.strip())]

    # Join cleaned lines back into text
    cleaned_text = '\n'.join(cleaned_lines)

    return cleaned_text

## Remove Link Lines
This step is designed to remove the links and urls that come at the end of the text but are not read aloud. For example:
> Resources:
>
> https://www.roshan-ai.ir/hazm/docs/index.html
>
> https://virgool.io/
>
> ...

In [20]:
def remove_link_lines(text):
    # Define the pattern to match lines starting with http or www
    pattern = r"^\s*(?:http|www)"

    # Split text into lines
    lines = text.split('\n')

    # Remove lines starting with link
    cleaned_lines = [line for line in lines if not re.match(pattern, line.strip())]

    # Join cleaned lines back into text
    cleaned_text = '\n'.join(cleaned_lines)

    return cleaned_text

## Convert Numbers to Text
This step is designed to convert the numbers in digit format into their spoken version. For example:

> 22 → twenty two

In [21]:
persian_digits_pattern = re.compile(r'[۰۱۲۳۴۵۶۷۸۹0123456789]')
num2text = ConvertNumberToText()
extractor = NumberExtractor()

In [22]:
def replace_numbers_with_text(text):
  # Find all number spans in the text
  number_spans = extractor.run(text)

  # Filter out spans that contain digits
  filtered_spans = [span for span in number_spans if persian_digits_pattern.search(span['phrase'])]

  # Convert the filtered numbers to text and replace them in the text
  offset = 0  # Track the offset due to previous replacements
  for span in filtered_spans:
      start, end = span['span']
      start -= offset  # Adjust start position based on previous replacements
      end -= offset  # Adjust end position based on previous replacements
      number_text = span['phrase']
      number_value = span['value']

      # Convert the number to text
      text_value = num2text.run(str(number_value))

      # Replace the number in the text with its textual equivalent
      text = text[:start] + text_value + text[end:]

      # Update the offset
      offset += len(number_text) - len(text_value)

  return text

## Remove Symbols
This step is designed to remove some of the symbols that are not very common or do not affect the TTS-ASR models' outputs. This helps simplify the input to the models.

In [23]:
symbols_to_remove = "«»*[]\"'^&<>{}|٫《》•\x9d\u200b\x7f"

def remove_symbols(text):
    pattern = "[" + re.escape(symbols_to_remove) + "]"
    return re.sub(pattern, ' ', text)

## Remove Extra White Spaces
This step is designed to remove extra white spaces inluding multiple consequent white spaces and new lines.

In [24]:
def remove_extra_white_spaces(text):
    cleaned_text = re.sub(r'\s+', ' ', text)
    return cleaned_text.strip()

## Full Pipeline
Here we define the complete text processing pipelin and the processing code.

In [25]:
pipeline = [
    normalize_text,
    substitute_symbols,
    remove_inline_references,
    remove_references_lines,
    remove_link_lines,
    replace_numbers_with_text,
    remove_symbols,
    remove_extra_white_spaces
  ]

In [26]:
def process_text(input_file_path, output_dir_path):
  input_file_name = input_file_path.split('/')[-1].split('.')[0]
  output_file_path = os.path.join(output_dir_path, input_file_name + '.txt')

  # Check if the output file already exists
  if os.path.exists(output_file_path):
    print(f"Skipping file {input_file_name}. Processed text file already exists.")
    return output_file_path

  # Apply the text processing pipeline
  with open(input_file_path, 'r') as f:
    text = reduce(lambda txt, func: func(txt), pipeline, f.read())

  # Export the processed text
  with open(output_file_path, 'w') as f:
     f.write(text)

  return output_file_path

## Run the text processing pipeline

In [27]:
def process_all_text_files(directory_path, output_dir_path):
    # Ensure the output directory exists
    if not os.path.exists(output_dir_path):
        os.makedirs(output_dir_path)

    # Iterate over all files in the directory
    for filename in os.listdir(directory_path):
        if filename.endswith('.txt'):  # Check if the file is a text file
            input_file_path = os.path.join(directory_path, filename)
            output_file_path = os.path.join(output_dir_path, filename)

            # Call the process_text function for each text file
            process_text(input_file_path, output_dir_path)

In [28]:
process_all_text_files(raw_data_dir, processed_data_dir)

Skipping file 1. Processed text file already exists.
Skipping file 2. Processed text file already exists.
Skipping file 3. Processed text file already exists.
Skipping file 4. Processed text file already exists.
Skipping file 9. Processed text file already exists.
Skipping file 12. Processed text file already exists.
Skipping file 14. Processed text file already exists.
Skipping file 16. Processed text file already exists.
Skipping file 20. Processed text file already exists.
Skipping file 22. Processed text file already exists.
Skipping file 24. Processed text file already exists.
Skipping file 26. Processed text file already exists.
Skipping file 30. Processed text file already exists.
Skipping file 40. Processed text file already exists.
Skipping file 50. Processed text file already exists.
Skipping file 52. Processed text file already exists.
Skipping file 54. Processed text file already exists.
Skipping file 56. Processed text file already exists.
Skipping file 64. Processed text 

# Forced Alignment
Forced alignment is the task of chunking the audio and text files into smaller parts of a few seconds and a few words. The resulting audio-text chunks contain the same content. I.e. the text files are the transcript of the audio files.

Here we use the [Aeneas](https://github.com/readbeyond/aeneas) forced alignment tool which is a good choice for audio and text files that are an exact match.

This tool requires the text be tokenized to sentences. We use the [Hazm](https://www.roshan-ai.ir/hazm/docs/index.html) sentence tokenizer and [Perpos](https://github.com/mhbashari/perpos) POS Tagger and develop a new sentence tokenization tool that keeps the sentences in a predefined length range.

## Sentence Tokenize

In [None]:
def contains_word_letter(input_string):
    regex_pattern = re.compile(r'\w+')
    return bool(regex_pattern.search(input_string))

The implemented sentence tokenizer first uses the Hazm sentnece tokenizer that mostly tokenizes based on punctuation. It then uses the Perpos POS Tagger to identify VERBS and EZAFE tags. It uses the position of VERB tags to further split the sentences into smaller chunks. This tokenizer considers several criteria during tokenization to output meaningful and well-formed splits:

*   It appends all the symbols to the verb of the last sentence. This is because the symbols can affect the pronunciation of the verb (consider how a `?` mark changes intonation).
*   It appends the conjunction `و` to the verb of the last sentence. Because the pronunciation of this word is usually integrated with the previous word by a vowel sound `\o\` and should not be interrupted.
*   It keeps sentences under a maximum length
*   It merges small chunks together to keep sentences longer than a minimum length
*   It avoids splitting the sentences at the EZAFE tags. This is because these wrods are pronounced connected to the previous words by a vowel `\e\` and should not be interrupted.

In [None]:
def get_sub_sentences(tagged_words, min_split_len=5, max_split_len=12):
    i = 0                                                                             # Index of next word in the original sentence
    current_words = []
    sub_sentences = []

    while i < len(tagged_words):
      word, pos = tagged_words[i]
      current_words.append(word)
      i += 1

      while i < len(tagged_words) and not contains_word_letter(tagged_words[i][0]):   # Append symbols to previous sentence
        current_words.append(tagged_words[i][0])
        i += 1

      if pos == "V" and i < len(tagged_words) and tagged_words[i][0] == "و":          # Append "و" to previous verb
        current_words.append("و")
        i += 1

      while i < len(tagged_words) and pos.endswith("e"):                              # Append all EZAFE POSes to previous sentence
        word, pos = tagged_words[i]
        current_words.append(word)
        i += 1

      if i >= len(tagged_words) or len(current_words) >= max_split_len or (len(current_words) > min_split_len and pos == "V"):
        sub_sentences.append(' '.join(current_words))
        current_words = []

    return sub_sentences

In [None]:
def split_sentences(text, min_split_len=5, max_split_len=12):
    print("Tokenizing to sentences...")

    hazm_sentences = sent_tokenize(text)
    splitted_sentences = []

    for sent in hazm_sentences:
      words = word_tokenize(sent)
      tagged_words = pos_tagger.parse(words)
      sub_sentences = get_sub_sentences(tagged_words, min_split_len, max_split_len)
      splitted_sentences.extend(sub_sentences)

    return splitted_sentences

In [None]:
def write_splitted_text(input_file_path, output_file_path='temp_splitted_text.txt'):
  text = ""
  with open(input_file_path, 'r') as f: text = f.read()
  splitted_sentences = split_sentences(text)
  splitted_text = '\n'.join(splitted_sentences)

  # Remove _ artifact from hazm word tokenizer
  splitted_text = splitted_text.replace('_', '\u200c')

  with open(output_file_path, 'w') as f: f.write(splitted_text)

## Forced Alignment

In [None]:
def write_forced_alignment_map(audio_file, text_file, output_json='temp_alignment_map.json'):
    print("Executing forced alignment...")
    command = [
        "python", "-m", "aeneas.tools.execute_task",
        audio_file,
        text_file,
        "task_language=fas|os_task_file_format=json|is_text_type=plain",
        output_json
    ]
    try:
        subprocess.run(command, check=True)
        print("Forced alignment completed successfully.")
    except subprocess.CalledProcessError as e:
        print("Error:", e)

In [None]:
def read_json(json_file):
    with open(json_file, 'r') as file:
        return json.load(file)

In [None]:
def get_aligned_audio_text(audio_file_path, text_file_path, temp_text_file='temp_splitted_text.txt'):
    # Step 1: Split text into sentences and write to temp text file
    write_splitted_text(text_file_path, temp_text_file)

    # Step 2: Perform forced alignment and write result to temp JSON file
    temp_json_file = 'temp_alignment_map.json'
    write_forced_alignment_map(audio_file_path, temp_text_file, temp_json_file)

    # Step 3: Read JSON file and split audio based on start and end spans
    alignment_dict = read_json(temp_json_file)
    audio_segments = []
    for fragment in alignment_dict['fragments']:
        start = float(fragment['begin'])
        end = float(fragment['end'])
        audio_segments.append((start, end))

    # Step 4: Clean up temporary files
    os.remove(temp_json_file)

    return audio_segments

## Run Forced Alignment

In [None]:
def truncate_audio(audio_file, start, end):
    audio = AudioSegment.from_mp3(audio_file)
    truncated_audio = audio[float(start) * 1000:float(end) * 1000]
    truncated_audio.export(audio_file, format="mp3")

In [None]:
def process_audio_files(source_dir, destination_dir, processed_files_log):
    processed_filenames = set()
    with open(processed_files_log, 'r') as f:
      processed_filenames = set([fname.strip() for fname in f.readlines()])

    # Get total number of audio files in the source directory
    audio_files = [audio_file for audio_file in os.listdir(source_dir) if audio_file.endswith('.mp3')]
    total_files = len(audio_files)

    # Iterate through all audio files in the source directory
    for idx, audio_file in enumerate(audio_files, start=1):
        print(f"({idx}/{total_files}): Processing file {audio_file}")
        audio_file_path = os.path.join(source_dir, audio_file)
        text_file_path = os.path.join(source_dir, audio_file.replace('.mp3', '.txt'))

        if not os.path.exists(text_file_path):
            print("\x1b[31m\"Processed text file does not exist!\"\x1b[0m")
            continue

        # Create directory with audio file name in destination directory
        audio_name = os.path.splitext(audio_file)[0]
        if audio_name in processed_filenames:
          print(f"Skippin audio file {audio_name}... already forced aligned!")
          continue

        alignment_dir = os.path.join(destination_dir, audio_name)

        # Process audio and text file
        temp_text_file = 'temp_splitted_text.txt'
        audio_segments = get_aligned_audio_text(audio_file_path, text_file_path, temp_text_file)

        print("Writing output pairs...")
        os.makedirs(alignment_dir, exist_ok=True)
        # Write each line in temp text output file to corresponding txt file
        with open(temp_text_file, 'r', encoding='utf-8') as f:
            lines = f.readlines()
            for idx, line in enumerate(lines):
                segment_text_name = f"{audio_name}-{idx}.txt"
                segment_text_path = os.path.join(alignment_dir, segment_text_name)
                with open(segment_text_path, 'w', encoding='utf-8') as segment_file:
                    segment_file.write(line.strip())

        # Write audio segments to individual files
        for idx, segment in enumerate(audio_segments):
            start, end = segment
            segment_audio_name = f"{audio_name}-{idx}.mp3"
            segment_audio_path = os.path.join(alignment_dir, segment_audio_name)
            # Copy segment from original audio to segment file
            shutil.copy(audio_file_path, segment_audio_path)
            # Truncate audio segment
            truncate_audio(segment_audio_path, start, end)

        with open(processed_files_log, 'a') as f: f.write(f'{audio_name}\n')

        os.remove(temp_text_file)


In [None]:
force_aligned_audio_text_files_dir = "forced-aligned-data/"
os.makedirs(force_aligned_audio_text_files_dir, exist_ok=True)

In [None]:
# This file keeps track of the files already forced aligned not to repeat the processing
forced_aligned_files_log = force_aligned_audio_text_files_dir + 'forced-aligned-files-log.txt'
with open(forced_aligned_files_log, 'a') as file: pass

In [None]:
process_audio_files(processed_data_dir, force_aligned_audio_text_files_dir, forced_aligned_files_log)

(1/25): Processing file 2.mp3
Skippin audio file 2... already forced aligned!
(2/25): Processing file 1.mp3
Skippin audio file 1... already forced aligned!
(3/25): Processing file 3.mp3
Skippin audio file 3... already forced aligned!
(4/25): Processing file 4.mp3
Skippin audio file 4... already forced aligned!
(5/25): Processing file 9.mp3
Skippin audio file 9... already forced aligned!
(6/25): Processing file 12.mp3
Skippin audio file 12... already forced aligned!
(7/25): Processing file 14.mp3
Skippin audio file 14... already forced aligned!
(8/25): Processing file 16.mp3
Skippin audio file 16... already forced aligned!
(9/25): Processing file 20.mp3
Skippin audio file 20... already forced aligned!
(10/25): Processing file 22.mp3
Skippin audio file 22... already forced aligned!
(11/25): Processing file 24.mp3
Skippin audio file 24... already forced aligned!
(12/25): Processing file 26.mp3
Skippin audio file 26... already forced aligned!
(13/25): Processing file 30.mp3
Skippin audio f