{ "cells": [ { "cell_type": "markdown", "metadata": { "id": "ABgLYF9R8viP" }, "source": [ "# Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "Qs-J5B3ykaYz", "outputId": "66f79f0c-f2e3-47b0-fc05-f56416962f0c" }, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/media/external_10TB/mahta_fetrat/venv/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", " from .autonotebook import tqdm as notebook_tqdm\n" ] } ], "source": [ "from GE2PE import GE2PE\n", "\n", "g2p = GE2PE(model_path='GE2PE/phase3-t5/checkpoint-487100', GPU=True)\n" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "id": "ymcwy5hG9H5l" }, "outputs": [], "source": [ "import pandas as pd\n", "import re\n", "from jiwer import cer\n", "from difflib import SequenceMatcher" ] }, { "cell_type": "markdown", "metadata": { "id": "EOZGZa2lMfPe" }, "source": [ "# Setup POSTagger" ] }, { "cell_type": "code", "execution_count": 3, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "8TJZ2DrKMg7F", "outputId": "c2597cbc-6d60-40c7-e246-983b9eaf8437" }, "outputs": [], "source": [ "# ! git clone https://huggingface.co/roshan-research/spacy_pos_tagger_parsbertpostagger" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "id": "pCiJ-VnfMiwj" }, "outputs": [], "source": [ "import spacy\n", "# spacy.require_gpu()\n", "\"\"\"این ماژول شامل کلاسها و توابعی برای برچسبگذاری توکنهاست.\"\"\"\n", "\n", "from nltk.tag import stanford # noqa: I001\n", "from hazm import SequenceTagger\n", "\n", "import os\n", "import subprocess\n", "\n", "from spacy.tokens import Doc\n", "from spacy.tokens import DocBin\n", "from spacy.vocab import Vocab\n", "\n", "from sklearn.metrics import classification_report,f1_score,accuracy_score,precision_score,recall_score\n", "\n", "from tqdm import tqdm\n", "\n", "\n", "punctuation_list = [\n", " '\"',\n", " \"#\",\n", " \"(\",\n", " \")\",\n", " \"*\",\n", " \",\",\n", " \"-\",\n", " \".\",\n", " \"/\",\n", " \":\",\n", " \"[\",\n", " \"]\",\n", " \"«\",\n", " \"»\",\n", " \"،\",\n", " \";\",\n", " \"?\",\n", " \"!\",\n", "]\n", "\n", "\n", "class POSTagger(SequenceTagger):\n", " \"\"\"این کلاسها شامل توابعی برای برچسبگذاری توکنهاست.\"\"\"\n", "\n", " def __init__(\n", " self: \"POSTagger\", model=None, data_maker=None, universal_tag=False,\n", " ) -> None:\n", " data_maker = self.data_maker if data_maker is None else data_maker\n", " self.__is_universal = universal_tag\n", " super().__init__(model, data_maker)\n", "\n", " def __universal_converter(self: \"POSTagger\", tagged_list):\n", " return [(word, tag.split(\",\")[0]) for word, tag in tagged_list]\n", "\n", " def __is_punc(self: \"POSTagger\", word):\n", " return word in punctuation_list\n", "\n", " def data_maker(self: \"POSTagger\", tokens):\n", " \"\"\"تابعی که لیستی از لیستی از کلمات توکنایز شده را گرفته و لیست دو بعدی از از دیکشنریهایی که تعیینکننده ویژگیها هر کلمه هستند را برمیگرداند.\n", "\n", " Examples:\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model')\n", " >>> posTagger.data_maker(tokens = [['دلم', 'اینجا', 'ماندهاست', '.']])\n", " [[{'word': 'دلم', 'is_first': True, 'is_last': False, 'prefix-1': 'د', 'prefix-2': 'دل', 'prefix-3': 'دلم', 'suffix-1': 'م', 'suffix-2': 'لم', 'suffix-3': 'دلم', 'prev_word': '', 'two_prev_word': '', 'next_word': 'اینجا', 'two_next_word': 'مانده\\u200cاست', 'is_numeric': False, 'prev_is_numeric': '', 'next_is_numeric': False, 'is_punc': False, 'prev_is_punc': '', 'next_is_punc': False}, {'word': 'اینجا', 'is_first': False, 'is_last': False, 'prefix-1': 'ا', 'prefix-2': 'ای', 'prefix-3': 'این', 'suffix-1': 'ا', 'suffix-2': 'جا', 'suffix-3': 'نجا', 'prev_word': 'دلم', 'two_prev_word': '.', 'next_word': 'مانده\\u200cاست', 'two_next_word': '.', 'is_numeric': False, 'prev_is_numeric': False, 'next_is_numeric': False, 'is_punc': False, 'prev_is_punc': False, 'next_is_punc': False}, {'word': 'مانده\\u200cاست', 'is_first': False, 'is_last': False, 'prefix-1': 'م', 'prefix-2': 'ما', 'prefix-3': 'مان', 'suffix-1': 'ت', 'suffix-2': 'ست', 'suffix-3': 'است', 'prev_word': 'اینجا', 'two_prev_word': 'دلم', 'next_word': '.', 'two_next_word': '', 'is_numeric': False, 'prev_is_numeric': False, 'next_is_numeric': False, 'is_punc': False, 'prev_is_punc': False, 'next_is_punc': True}, {'word': '.', 'is_first': False, 'is_last': True, 'prefix-1': '.', 'prefix-2': '.', 'prefix-3': '.', 'suffix-1': '.', 'suffix-2': '.', 'suffix-3': '.', 'prev_word': 'مانده\\u200cاست', 'two_prev_word': 'اینجا', 'next_word': '', 'two_next_word': '', 'is_numeric': False, 'prev_is_numeric': False, 'next_is_numeric': '', 'is_punc': True, 'prev_is_punc': False, 'next_is_punc': ''}]]\n", "\n", " Args:\n", " tokens (List[List[str]]): جملاتی که نیاز به تبدیل آن به برداری از ویژگیها است.\n", "\n", " Returns:\n", " List(List(Dict())): لیستی از لیستی از دیکشنریهای بیانکننده ویژگیهای یک کلمه.\n", " \"\"\"\n", " return [\n", " [self.features(token, index) for index in range(len(token))]\n", " for token in tokens\n", " ]\n", "\n", " def features(self: \"POSTagger\", sentence, index):\n", " \"\"\"features.\"\"\"\n", " return {\n", " \"word\": sentence[index],\n", " \"is_first\": index == 0,\n", " \"is_last\": index == len(sentence) - 1,\n", " # *ix\n", " \"prefix-1\": sentence[index][0],\n", " \"prefix-2\": sentence[index][:2],\n", " \"prefix-3\": sentence[index][:3],\n", " \"suffix-1\": sentence[index][-1],\n", " \"suffix-2\": sentence[index][-2:],\n", " \"suffix-3\": sentence[index][-3:],\n", " # word\n", " \"prev_word\": \"\" if index == 0 else sentence[index - 1],\n", " \"two_prev_word\": \"\" if index == 0 else sentence[index - 2],\n", " \"next_word\": \"\" if index == len(sentence) - 1 else sentence[index + 1],\n", " \"two_next_word\": (\n", " \"\"\n", " if index in {len(sentence) - 1, len(sentence) - 2}\n", " else sentence[index + 2]\n", " ),\n", " # digit\n", " \"is_numeric\": sentence[index].isdigit(),\n", " \"prev_is_numeric\": \"\" if index == 0 else sentence[index - 1].isdigit(),\n", " \"next_is_numeric\": (\n", " \"\" if index == len(sentence) - 1 else sentence[index + 1].isdigit()\n", " ),\n", " # punc\n", " \"is_punc\": self.__is_punc(sentence[index]),\n", " \"prev_is_punc\": \"\" if index == 0 else self.__is_punc(sentence[index - 1]),\n", " \"next_is_punc\": (\n", " \"\"\n", " if index == len(sentence) - 1\n", " else self.__is_punc(sentence[index + 1])\n", " ),\n", " }\n", "\n", " def tag(self: \"POSTagger\", tokens):\n", " \"\"\"یک جمله را در قالب لیستی از توکنها دریافت میکند و در خروجی لیستی از\n", " `(توکن، برچسب)`ها برمیگرداند.\n", "\n", " Examples:\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model')\n", " >>> posTagger.tag(tokens = ['من', 'به', 'مدرسه', 'ایران', 'رفته_بودم', '.'])\n", " [('من', 'PRON'), ('به', 'ADP'), ('مدرسه', 'NOUN,EZ'), ('ایران', 'NOUN'), ('رفته_بودم', 'VERB'), ('.', 'PUNCT')]\n", "\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model', universal_tag = True)\n", " >>> posTagger.tag(tokens = ['من', 'به', 'مدرسه', 'ایران', 'رفته_بودم', '.'])\n", " [('من', 'PRON'), ('به', 'ADP'), ('مدرسه', 'NOUN'), ('ایران', 'NOUN'), ('رفته_بودم', 'VERB'), ('.', 'PUNCT')]\n", "\n", " Args:\n", " tokens (List[str]): لیستی از توکنهای یک جمله که باید برچسبگذاری شود.\n", "\n", " Returns:\n", " (List[Tuple[str,str]]): لیستی از `(توکن، برچسب)`ها.\n", "\n", " \"\"\"\n", " tagged_token = super().tag(tokens)\n", " return (\n", " self.__universal_converter(tagged_token)\n", " if self.__is_universal\n", " else tagged_token\n", " )\n", "\n", " def tag_sents(self: \"POSTagger\", sentences):\n", " \"\"\"جملات را در قالب لیستی از توکنها دریافت میکند\n", " و در خروجی، لیستی از لیستی از `(توکن، برچسب)`ها برمیگرداند.\n", "\n", " هر لیست از `(توکن، برچسب)`ها مربوط به یک جمله است.\n", "\n", " Examples:\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model')\n", " >>> posTagger.tag_sents(sentences = [['من', 'به', 'مدرسه', 'ایران', 'رفته_بودم', '.']])\n", " [[('من', 'PRON'), ('به', 'ADP'), ('مدرسه', 'NOUN,EZ'), ('ایران', 'NOUN'), ('رفته_بودم', 'VERB'), ('.', 'PUNCT')]]\n", "\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model', universal_tag = True)\n", " >>> posTagger.tag_sents(sentences = [['من', 'به', 'مدرسه', 'ایران', 'رفته_بودم', '.']])\n", " [[('من', 'PRON'), ('به', 'ADP'), ('مدرسه', 'NOUN'), ('ایران', 'NOUN'), ('رفته_بودم', 'VERB'), ('.', 'PUNCT')]]\n", "\n", " Args:\n", " sentences (List[List[str]]): لیستی از جملات که باید برچسبگذاری شود.\n", "\n", " Returns:\n", " (List[List[Tuple[str,str]]]): لیستی از لیستی از `(توکن، برچسب)`ها.\n", " هر لیست از `(توکن،برچسب)`ها مربوط به یک جمله است.\n", "\n", " \"\"\"\n", " tagged_sents = super().tag_sents(sentences)\n", " return (\n", " [self.__universal_converter(tagged_sent) for tagged_sent in tagged_sents]\n", " if self.__is_universal\n", " else tagged_sents\n", " )\n", "\n", "\n", "class StanfordPOSTagger(stanford.StanfordPOSTagger):\n", " \"\"\"StanfordPOSTagger.\"\"\"\n", "\n", " def __init__(\n", " self: \"StanfordPOSTagger\",\n", " model_filename: \"str\",\n", " path_to_jar: str,\n", " *args, # noqa: ANN002\n", " **kwargs, # noqa: ANN003\n", " ) -> None:\n", " self._SEPARATOR = \"/\"\n", " super(stanford.StanfordPOSTagger, self).__init__(\n", " model_filename=model_filename,\n", " path_to_jar=path_to_jar,\n", " *args, # noqa: B026\n", " **kwargs,\n", " )\n", "\n", " def tag(self: \"StanfordPOSTagger\", tokens):\n", " \"\"\"tag.\n", "\n", " Examples:\n", " >>> tagger = StanfordPOSTagger(model_filename='persian.tagger', path_to_jar='stanford_postagger.jar')\n", " >>> tagger.tag(['من', 'به', 'مدرسه', 'رفته_بودم', '.'])\n", " [('من', 'PRO'), ('به', 'P'), ('مدرسه', 'N'), ('رفته_بودم', 'V'), ('.', 'PUNC')]\n", "\n", " \"\"\"\n", " return self.tag_sents([tokens])[0]\n", "\n", " def tag_sents(self: \"StanfordPOSTagger\", sentences):\n", " \"\"\"tag_sents.\"\"\"\n", " refined = ([w.replace(\" \", \"_\") for w in s] for s in sentences)\n", " return super(stanford.StanfordPOSTagger, self).tag_sents(refined)\n", "\n", "\n", "class SpacyPOSTagger(POSTagger):\n", " def __init__(\n", " self: \"SpacyPOSTagger\",\n", " model_path=None,\n", " using_gpu=False,\n", " gpu_id=0\n", " ):\n", " \"\"\"\n", " Initialize the SpacyPOSTagger with a model and data paths.\n", "\n", " Args:\n", " - model_path: Path to a pre-trained spaCy model.\n", " - test_dataset: Test dataset for evaluation. It has a similar structure to the training dataset.\n", " - test_directory: Directory to save the test data in spaCy format.\n", " - using_gpu: Set to True if you want use gpu (if you dont have one and set this to True the function use cpu automatically)\n", " This constructor calls the constructor of the parent class POSTagger.\n", " \"\"\"\n", " super().__init__(universal_tag=True)\n", " self.model_path = model_path #### Usually an output directory for spacy model contain two other directory name model-last , model-best,You should give model_path like this : output/model-last\n", " self.using_gpu = using_gpu\n", " self.gpu_id = gpu_id\n", " self.tagger = None\n", " self._setup()\n", "\n", " def _setup(self: \"SpacyPOSTagger\"):\n", " \"\"\"\n", " Set up the configuration for the spaCy model, including GPU settings and data preparation.\n", "\n", " This function initializes and configures the spaCy model, checks for GPU availability, and prepares the training and testing datasets in spaCy format.\n", "\n", " If using GPU, GPU settings are configured to enhance processing speed. Then, the pre-trained spaCy model is loaded based on the provided model path.\n", "\n", " Training and testing datasets are prepared and saved in the respective directories for use during model training and evaluation.\n", " \"\"\" # noqa: D212\n", " if self.using_gpu:\n", " self._setup_gpu()\n", " else:\n", " print(\"------------- You Prefer to use CPU --------------\")\n", "\n", "\n", " def _setup_model(self: \"SpacyPOSTagger\",sents):\n", " \"\"\"\n", " Initialize and configure the spaCy model for tagging without GPU settings.\n", "\n", " This method loads and configures the spaCy model based on the provided model path. It also sets up a custom tokenizer for text processing and constructs a dictionary for reference.\n", "\n", " Args:\n", " - model_path: Path to the pre-trained spaCy model.\n", "\n", " This method is typically called during setup to prepare the model for tagging tasks.\n", " \"\"\"\n", " self.peykare_dict = {} # Initialize a dictionary for reference\n", " self.tagger = spacy.load(self.model_path) # Load the spaCy model\n", " self._set_peykare_dictionary(sents) # Construct a reference dictionary\n", " self.tagger.tokenizer = self._custom_tokenizer # Set a custom tokenizer for the model.\n", "\n", " def _setup_gpu(self: \"SpacyPOSTagger\"):\n", " \"\"\"\n", " Check GPU availability and configure spaCy to use it if possible.\n", "\n", " This method checks whether a GPU is available and, if so, configures spaCy to utilize it for improved processing speed. It sets the 'gpu_availability' attribute to 'True' or 'False' accordingly.\n", "\n", " This check is performed during setup to make use of available GPU resources for enhanced performance.\n", " \"\"\"\n", " print(\"------------------ GPU Setup Process Started ---------------------\")\n", " gpu_available = spacy.prefer_gpu(gpu_id=self.gpu_id) # Check if a GPU is available\n", " if gpu_available:\n", " print(\"------------ GPU is available and ready for use -------------\")\n", " spacy.require_gpu(gpu_id=self.gpu_id) # Configure spaCy to use the GPU\n", " self.gpu_availability = True\n", " else:\n", " print(\"------------ GPU is not available; spaCy will use CPU -------------\")\n", " self.gpu_availability = False\n", "\n", "\n", " def _setup_dataset(self: \"SpacyPOSTagger\", dataset,saved_directory,data_type='train'):\n", " \"\"\"\n", " Setup the training dataset in spaCy's binary format.\n", "\n", " This function prepares the training dataset and saves it in spaCy's binary format.\n", " \"\"\"\n", " assert data_type in ['train','test']\n", " db = DocBin()\n", " for sent in tqdm(dataset):\n", " words = [word[0] for word in sent]\n", " tags = [word[1] for word in sent]\n", " doc = Doc(Vocab(strings=words), words=words)\n", " for d, tag in zip(doc, tags):\n", " d.tag_ = tag\n", " db.add(doc)\n", "\n", " self._handle_data_path(saved_directory)\n", " db.to_disk(f'{saved_directory}/{data_type}.spacy')\n", "\n", " def _handle_data_path(self,path='POSTaggerDataset'):\n", " \"\"\"\n", " Create the directory if it doesn't exist.\n", "\n", " This method checks if the specified directory exists, and if not, it creates the directory to store the data.\n", "\n", " Args:\n", " - path: The path to the directory (default is 'POSTaggerDataset').\n", "\n", " This method is called to ensure the directory is available for saving processed data.\n", " \"\"\"\n", " if not os.path.exists(path):\n", " os.makedirs(path)\n", "\n", "\n", " def _custom_tokenizer(self: \"SpacyPOSTagger\", text):\n", " \"\"\"\n", " Implement a custom tokenization method for spaCy.\n", "\n", " This method defines a custom tokenization method for spaCy. It is used to tokenize input text based on a custom dictionary, or it raises an error if tokenization is not available.\n", "\n", " Args:\n", " - text: The input text to be tokenized.\n", "\n", " This custom tokenization method is used by the spaCy model during processing.\n", "\n", " \"\"\"\n", "\n", " if text in self.peykare_dict:\n", " return Doc(self.tagger.vocab, self.peykare_dict[text])\n", " else:\n", " raise ValueError('No tokenization available for input.')\n", "\n", " def _set_peykare_dictionary(self: \"SpacyPOSTagger\", sents):\n", " \"\"\"\n", " Create a dictionary for custom tokenization.\n", "\n", " This method constructs a dictionary to store custom tokenization mappings based on input sentences. It is used for custom tokenization in spaCy.\n", "\n", " Args:\n", " - sents: Input sentences to build the custom tokenization dictionary.\n", "\n", " This method is called during setup to establish a dictionary for tokenization.\n", " \"\"\"\n", " self.peykare_dict = {' '.join([w for w in item]): [w for w in item] for item in sents}\n", "\n", "\n", " def _add_to_dict(self: \"SpacyPOSTagger\", sents):\n", " \"\"\"\n", " Add the sentences to dictianory if it doesnt exist already\n", " \"\"\"\n", " for sent in sents:\n", " key = ' '.join(sent)\n", " if key not in self.peykare_dict:\n", " self.peykare_dict[key] = sent\n", "\n", "\n", " def tag(self: \"SpacyPOSTagger\", tokens,universal_tag=True):\n", " \"\"\"یک جمله را در قالب لیستی از توکنها دریافت میکند و در خروجی لیستی از\n", " `(توکن، برچسب)`ها برمیگرداند.\n", "\n", " Examples:\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model')\n", " >>> posTagger.tag(tokens = ['من', 'به', 'مدرسه', 'ایران', 'رفته_بودم', '.'])\n", " [('من', 'PRON'), ('به', 'ADP'), ('مدرسه', 'NOUN,EZ'), ('ایران', 'NOUN'), ('رفته_بودم', 'VERB'), ('.', 'PUNCT')]\n", "\n", " >>> posTagger = POSTagger(model = 'pos_tagger.model', universal_tag = True)\n", " >>> posTagger.tag(tokens = ['من', 'به', 'مدرسه', 'ایران', 'رفته_بودم', '.'])\n", " [('من', 'PRON'), ('به', 'ADP'), ('مدرسه', 'NOUN'), ('ایران', 'NOUN'), ('رفته_بودم', 'VERB'), ('.', 'PUNCT')]\n", "\n", " Args:\n", " tokens (List[str]): لیستی از توکنهای یک جمله که باید برچسبگذاری شود.\n", "\n", " Returns:\n", " (List[Tuple[str,str]]): لیستی از `(توکن، برچسب)`ها.\n", "\n", " \"\"\"\n", " if self.tagger == None:\n", " self._setup_model([tokens])\n", " self._add_to_dict([tokens])\n", "\n", " text = ' '.join(tokens)\n", " doc = self.tagger(text)\n", " if not universal_tag:\n", " tags = [tok.tag_ for tok in doc]\n", " else:\n", " tags = [tok.tag_.replace(',EZ','') for tok in doc]\n", "\n", " return list(zip(tokens,tags))\n", " # noqa: W293\n", "\n", " def tag_sents(self:\"SpacyPOSTagger\",sents,universal_tag=True,batch_size=128):\n", " \"\"\"\n", " Args:\n", " sents : List[List[Tokens]]\n", " batch_size : number of batches give to model for processing sentences each time\n", " \"\"\"\n", " \"\"\"\n", " Returns : List[List[Tuple(str,str)]]\n", " \"\"\"\n", " if self.tagger == None:\n", " self._setup_model(sents)\n", "\n", " self._add_to_dict(sents)\n", "\n", " docs = list(self.tagger.pipe((' '.join([w for w in sent]) for sent in sents), batch_size=batch_size))\n", " if not universal_tag:\n", " tags = [[w.tag_ for w in doc] for doc in docs]\n", " else:\n", " tags = [[w.tag_.replace(',EZ','') for w in doc] for doc in docs]\n", "\n", " combined_out = [list(zip(tok,tag)) for tok,tag in zip(sents,tags)]\n", " return combined_out\n", "\n", " def train(\n", " self: \"SpacyPOSTagger\",\n", " train_dataset,\n", " test_dataset,\n", " data_directory,\n", " base_config_file,\n", " train_config_path,\n", " output_dir,\n", " use_direct_config=False\n", " ):\n", " \"\"\"\n", " Train the spaCy model using a subprocess and a configuration file.\n", "\n", " This method executes the training process for the spaCy model by invoking spaCy's training module using subprocess. It takes input configuration files, training and testing data, and GPU settings (if available).\n", "\n", " Args:\n", " - train_dataset: Training dataset for the tagger. It is a list of sentences, where each sentence is a list of token-tag pairs.\n", " - test_dataset: Testing dataset for the tagger. It is a list of sentences, where each sentence is a list of token-tag pairs.\n", " - data_directory: Directory to save the training and testing data in spaCy format.\n", " - base_config_file: Path to the base configuration file for spaCy.\n", " - train_config_path: if use_direct_config set to True this is the path of config file for training that you will use\n", " if use_direct_config set to False this is the path that you want train config file will create with base_config\n", " - output_dir: Directory for storing the trained model and training logs.\n", "\n", " Upon successful training, this method updates the model path to the trained model.\n", "\n", " This method is typically called to initiate the training process of the spaCy model.\n", " \"\"\"\n", "\n", " self.spacy_train_directory = data_directory\n", " self.train_dataset = train_dataset ### List[List[Tuple]]\n", " self.test_dataset = test_dataset\n", " if self.train_dataset:\n", " # Set up the training dataset configuration\n", " self._setup_dataset(dataset=self.train_dataset, saved_directory=self.spacy_train_directory, data_type='train')\n", "\n", " if self.test_dataset:\n", " self._setup_dataset(test_dataset,saved_directory=data_directory,data_type='test')\n", "\n", " train_data = f'{data_directory}/train.spacy'\n", " test_data = f'{data_directory}/test.spacy'\n", "\n", " if use_direct_config == False:\n", " self._setup_train_config(base_config_file, train_config_path=train_config_path)\n", " else:\n", " self.train_config_file = train_config_path\n", "\n", " command = f\"python -m spacy train {self.train_config_file} --output ./{output_dir} --paths.train ./{train_data} --paths.dev ./{test_data}\"\n", " if self.gpu_availability:\n", " command += f\" --gpu-id {self.gpu_id}\"\n", "\n", " subprocess.run(command, shell=True)\n", " self.model_path = f\"{output_dir}/model-last\"\n", " self._setup_model([[w for w,_ in sent] for sent in test_dataset])\n", "\n", " def _setup_train_config(self: \"SpacyPOSTagger\", base_config, train_config_path):\n", " \"\"\"\n", " Create and configure the training configuration file for spaCy.\n", "\n", " This method sets up the training configuration file by copying a base configuration file and customizing it according to the specified parameters.\n", "\n", " Args:\n", " - base_config: Path to the base configuration file.\n", " - train_config_file_name: Name of the training configuration file for saving it.\n", "\n", " This method is called to generate the training configuration file used in the training process.\n", " \"\"\"\n", " self.train_config_file = train_config_path\n", " print(\"----------------- Setting up the training configuration file ----------------------\")\n", " command = f\"python -m spacy init fill-config {base_config} {train_config_path}\" # Generate the training configuration file\n", " subprocess.run(command, shell=True)\n", " print(\"----------------- Training configuration file created successfully ----------------------\")\n", " print(f\"----------------- Training Config file address is {train_config_path} --------------------\")\n", "\n", " def evaluate(self: \"SpacyPOSTagger\", test_sents,batch_size):\n", " \"\"\"\n", " Evaluate the spaCy model on input sentences using different tag options.\n", "\n", " This method evaluates the spaCy model on input sentences with and without 'EZ' tags and reports classification metrics.\n", "\n", " Args:\n", " - sents: List of sentences for evaluation.\n", " - batch_size : number of batches that model should process each time\n", " This method calls the internal evaluation method for both tag options.\n", "\n", " This method is typically used for model evaluation and reporting metrics.\n", " \"\"\"\n", " self._setup_model([[w for w,_ in sent] for sent in test_sents])\n", " if self.tagger:\n", " golds, predictions = self._get_labels_and_predictions(test_sents,batch_size)\n", " print(\"-----------------------------------------\")\n", " self._evaluate_tags(test_sents, golds, predictions, use_ez_tags=True,batch_size=batch_size)\n", " print(\"-----------------------------------------\")\n", " self._evaluate_tags(test_sents, golds, predictions, use_ez_tags=False,batch_size=batch_size)\n", " else:\n", " raise ValueError(\"Model does not exist.Please train a new one with train method of this instance or give a model_path , setup the model with self._setup_model() and then call evaluate\")\n", "\n", " def _evaluate_tags(self, sents, golds=None, predictions=None, use_ez_tags=True,batch_size=128):\n", " \"\"\"\n", " Evaluate model predictions and report classification metrics.\n", "\n", " This method evaluates model predictions and reports classification metrics for the specified tag option.\n", "\n", " Args:\n", " - sents: List of sentences for evaluation.\n", " - golds: List of gold labels (optional).\n", " - predictions: List of model predictions (optional).\n", " - use_ez_tags: A flag indicating whether to consider 'EZ' tags.\n", " - batch_size : number of batches model should process\n", "\n", " If `golds` and `predictions` are not provided, they are automatically extracted from the input sentences.\n", "\n", " This method calculates and displays precision, recall, and F1-score for the specified tag option.\n", "\n", " This method is called by the `evaluate` method to perform model evaluation.\n", " \"\"\"\n", " if golds is None or predictions is None:\n", " golds, predictions = self._get_labels_and_predictions(sents,batch_size)\n", "\n", " predictions_cleaned = []\n", " golds_cleaned = []\n", " if use_ez_tags:\n", " get_tag_func = self._get_ez_tags\n", " else:\n", " get_tag_func = self._remove_ez_tags\n", "\n", " for preds, golds in zip(predictions, golds):\n", " for pred in preds:\n", " pred_cleaned = get_tag_func(pred)\n", " predictions_cleaned.append(pred_cleaned)\n", " for gold in golds:\n", " gold_cleaned = get_tag_func(gold)\n", " golds_cleaned.append(gold_cleaned)\n", "\n", " print(classification_report(golds_cleaned, predictions_cleaned))\n", " print('Precision: %.5f' % precision_score(golds_cleaned, predictions_cleaned, average='weighted'))\n", " print('Recall: %.5f' % recall_score(golds_cleaned, predictions_cleaned, average='weighted'))\n", " print('F1-Score: %.5f' % f1_score(golds_cleaned, predictions_cleaned, average='macro'))\n", "\n", " def _get_ez_tags(self, label):\n", " \"\"\"\n", " Extract 'EZ' tags from labels.\n", "\n", " This method extracts 'EZ' tags from labels if they are present and returns them.\n", "\n", " Args:\n", " - label: The label containing 'EZ' tags.\n", "\n", " Returns:\n", " The 'EZ' tags or '-' if 'EZ' tags are not present.\n", " \"\"\"\n", " if 'EZ' in label:\n", " label = 'EZ'\n", " else:\n", " label = '-'\n", "\n", " return label\n", "\n", " def _remove_ez_tags(self, label):\n", " \"\"\"\n", " Remove 'EZ' tags from labels.\n", "\n", " This method removes 'EZ' tags from labels if they are present and returns the cleaned label.\n", "\n", " Args:\n", " - label: The label containing 'EZ' tags.\n", "\n", " Returns:\n", " The label with 'EZ' tags removed.\n", " \"\"\"\n", " return label.replace(',EZ', '') if 'EZ' in label else label\n", "\n", " def _evaluate_ez_tags(self, sents):\n", " \"\"\"\n", " Evaluate model predictions with 'EZ' tags included.\n", "\n", " This method evaluates model predictions with 'EZ' tags included.\n", " \"\"\"\n", " self._evaluate_tags(sents, use_ez_tags=True)\n", "\n", " def _evaluate_normal_tags(self, sents):\n", " \"\"\"\n", " Evaluate model predictions without 'EZ' tags.\n", "\n", " This method evaluates model predictions without 'EZ' tags.\n", " \"\"\"\n", " self._evaluate_tags(sents, use_ez_tags=False)\n", "\n", " def _get_labels_and_predictions(self: \"SpacyPOSTagger\", sents,batch_size):\n", " \"\"\"\n", " Extract gold labels and model predictions for evaluation.\n", "\n", " This method extracts gold labels and model predictions from input sentences.\n", "\n", " Args:\n", " - sents: List of sentences for evaluation.\n", "\n", " Returns:\n", " Lists of gold labels and model predictions.\n", "\n", " This method is typically used for gathering data to perform model evaluation.\n", " \"\"\"\n", " gold_labels = [[tag for _, tag in sent] for sent in sents]\n", " tokens = [[w for w,_ in sent] for sent in sents]\n", " prediction_labels = self.tag_sents(tokens,batch_size)\n", " return gold_labels, prediction_labels\n" ] }, { "cell_type": "code", "execution_count": 5, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "5_IBJXq1Mnfo", "outputId": "c07d6be2-70cb-45b1-ed9f-a4eb49fafb90" }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "------------------ GPU Setup Process Started ---------------------\n", "------------ GPU is not available; spaCy will use CPU -------------\n" ] } ], "source": [ "spacy_posTagger = SpacyPOSTagger(model_path = '/content/spacy_pos_tagger_parsbertpostagger', using_gpu=True)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "from hazm import WordTokenizer, Normalizer\n", "\n", "tokenizer = WordTokenizer()\n", "normalizer = Normalizer()" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# Ezafe Correction\n", "def get_last_subword(word):\n", " # Split the word by word boundaries\n", " subwords = re.findall(r'\\b\\w+\\b', word)\n", "\n", " if len(subwords) > 1 and subwords[-1] in ['های', 'ی']:\n", " return (subwords[-2], subwords[-1])\n", "\n", " return (subwords[-1], '')" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [], "source": [ "def get_EZ_tags(grapheme, compound=False):\n", " grapheme = re.sub('ۀ', 'هی', grapheme)\n", " tokens = tokenizer.tokenize(normalizer.normalize(grapheme))\n", " tags = spacy_posTagger.tag(tokens=tokens, universal_tag=False)\n", " tags = [(t[0], t[1], '') for t in tags if 'EZ' in t[1]]\n", "\n", " if compound:\n", " return tags\n", "\n", " splitted_tags = []\n", " for t in tags:\n", " subword1, subword2 = get_last_subword(t[0])\n", " splitted_tags.append((subword1, t[1], subword2))\n", " return splitted_tags" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "def get_naive_phonetic(word):\n", " char_map = {\n", " 'ا': 'A', 'ب': 'b', 'پ': 'p', 'ت': 't', 'ث': 's', 'ج': 'j', 'چ': 'C',\n", " 'ح': 'h', 'خ': 'x', 'د': 'd', 'ذ': 'z', 'ر': 'r', 'ز': 'z', 'ژ': 'Z',\n", " 'س': 's', 'ش': 'S', 'ص': 's', 'ض': 'z', 'ط': 't', 'ظ': 'z', 'ع': '?',\n", " 'غ': 'q', 'ف': 'f', 'ق': 'q', 'ک': 'k', 'گ': 'g', 'ل': 'l', 'م': 'm',\n", " 'ن': 'n', 'و': 'v', 'ه': 'h', 'ی': 'y', 'ء': '?','ئ': '?', 'ؤ': '?',\n", " 'آ': 'A', 'أ': '?', 'إ': '?', 'ۀ': 'eye'\n", " }\n", " mapped_string = ''.join(char_map.get(char, char) for char in word)\n", " return mapped_string" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "merged_dict_path = 'final_merged_dict_pruned_POS_prob.csv'\n", "merged_dict_df = pd.read_csv(merged_dict_path)\n", "\n", "merged_dict = {}\n", "\n", "for idx, row in merged_dict_df.iterrows():\n", " g, ps, nodiff, pos, prob = row['grapheme'], eval(row['phoneme']), row['nodiff'], eval(row['POS']), eval(row['prob'])\n", " if g not in merged_dict:\n", " merged_dict[g] = {'phoneme': [], 'nodiff': nodiff, 'POS': pos, 'prob': prob}\n", "\n", " for p in ps:\n", " merged_dict[g]['phoneme'].append(''.join(p))\n", "\n", "\n", "inverted_merged_dict = {}\n", "\n", "for key, value_list in merged_dict.items():\n", " for value in value_list['phoneme']:\n", " inverted_merged_dict[value] = key\n", "\n", "\n", "def word_in_dict(word, inverted_dictionary=inverted_merged_dict):\n", " return word in inverted_dictionary" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "import itertools\n", "def get_word_phonetic_candidates(word):\n", " subwords = []\n", " for match in re.finditer(r'\\b(\\w+)\\b', word):\n", " match_text = match.group()\n", " match_span = match.span()\n", " subwords.append((match_text, match_span))\n", "\n", " subword_candidates = []\n", " for subword, _ in subwords:\n", " if subword in merged_dict:\n", " subword_candidates.append(merged_dict[subword]['phoneme'])\n", " else:\n", " subword_candidates.append([get_naive_phonetic(subword)])\n", "\n", " # Generate all possible combinations and concatenate\n", " word_candidates = [''.join(comb) for comb in itertools.product(*subword_candidates)]\n", " return word_candidates" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "def get_updated_span(match_span, displacements):\n", " new_start, new_end = match_span[0], match_span[1]\n", " for start, displacement in displacements:\n", " if start <= new_start:\n", " new_start += displacement\n", " new_end += displacement\n", "\n", " return (new_start, new_end)" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "def add_1_for_EZ(phonemes):\n", " # Convert 'a' to 'A' in '-ha' and '-haye'\n", " phonemes = phonemes.replace('-ha', '-hA').replace('-haye', '-hAye')\n", " \n", " # Add '1' after '-e', '-ye', and '-hAye'\n", " phonemes = phonemes.replace('-e', '-e1').replace('-ye', '-ye1').replace('-hAye', '-hAye1')\n", " \n", " return phonemes" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "def correct_phonetic_model_EZ_by_tags(grapheme, model_output):\n", " EZ_tags = get_EZ_tags(grapheme, compound=False)\n", "\n", " matches = []\n", " matched_spans = set()\n", "\n", " for word, tag, ending in EZ_tags:\n", " phonetic_candidates = get_word_phonetic_candidates(word)\n", "\n", " for phonetic in phonetic_candidates:\n", " if phonetic.endswith('Aye') and not word.endswith('ه'): continue\n", " for match in re.finditer(r'\\b(\\w+)\\b', model_output):\n", " match_text = match.group()\n", " match_span = match.span()\n", " \n", " if match_text not in ['e', 'ye', 'i', 'ha', 'hA', 'hAye', 'haye'] and match_span not in matched_spans and SequenceMatcher(None, match_text, phonetic).ratio() > 0.75:\n", " matches.append((match_span, match_text, phonetic, ending))\n", " matched_spans.add(match_span)\n", "\n", " non_matches = []\n", " for match in re.finditer(r'\\b(\\w+)\\b', model_output):\n", " match_text = match.group()\n", " match_span = match.span()\n", " if not match_text.endswith('e') and not match_text.endswith('ye') and not match_text.endswith('i') and not match_text.endswith('ha')and not match_text.endswith('hAye') and not match_text.endswith('haye') and match_span not in matched_spans:\n", " non_matches.append((match_text, match_span))\n", "\n", " displacements = []\n", " for match_span, _, phonetic, ending in matches:\n", " match_span = get_updated_span(match_span, displacements)\n", " if model_output[match_span[1]:].startswith('-e') or model_output[match_span[1]:].startswith('-ye') or model_output[match_span[1]:].startswith('-hAye') or model_output[match_span[1]:].startswith('-haye') or model_output[match_span[1]:].startswith('-hA-ye') or model_output[match_span[1]:].startswith('-ha-ye'):\n", " continue\n", "\n", " output_word = model_output[match_span[0]:match_span[1]]\n", " if len(output_word) >= 4 and output_word[-3] in 'еeiuoaāäâāɒáA' and output_word.endswith('ye') and \\\n", " ((word_in_dict(output_word[:-2], inverted_merged_dict) and not word_in_dict(output_word, inverted_merged_dict)) or \\\n", " SequenceMatcher(None, output_word[:-2], phonetic).ratio() > SequenceMatcher(None, output_word, phonetic).ratio()):\n", " model_output = model_output[:match_span[1] - 2] + '-' + model_output[match_span[1] - 2:]\n", " displacements.append((match_span[1] - 2, 1))\n", " continue\n", "\n", " if len(output_word) >= 3 and output_word.endswith('e') and \\\n", " ((word_in_dict(output_word[:-1], inverted_merged_dict) and not word_in_dict(output_word, inverted_merged_dict)) or \\\n", " SequenceMatcher(None, output_word[:-1], phonetic).ratio() > SequenceMatcher(None, output_word, phonetic).ratio()):\n", " model_output = model_output[:match_span[1] - 1] + '-' + model_output[match_span[1] - 1:]\n", " displacements.append((match_span[1] - 1, 1))\n", " continue\n", "\n", " if ending == 'ی' and len(output_word) >= 4 and output_word[-2:] == 'ye' and output_word[-3] == phonetic[-1]:\n", " model_output = model_output[:match_span[1] - 2] + '-' + model_output[match_span[1] - 2:]\n", " displacements.append((match_span[1] - 2, 1))\n", " continue\n", "\n", " if ending == 'های' and len(output_word) >= 7 and output_word[-5:] == 'hAye':\n", " model_output = model_output[:match_span[1] - 4] + '-' + model_output[match_span[1] - 4:]\n", " displacements.append((match_span[1] - 5, 1))\n", " continue\n", "\n", " if ending == 'های' and len(output_word) >= 6 and output_word[-4:] == 'haye':\n", " model_output = model_output[:match_span[1] - 4] + '-hAye' + model_output[match_span[1]:]\n", " displacements.append((match_span[1] - 4, 1))\n", " continue\n", "\n", " if ending not in ['ی', 'های'] and len(output_word) >= 3 and output_word[-2] == phonetic[-1] and output_word[-1] == 'e':\n", " model_output = model_output[:match_span[1] - 1] + '-' + model_output[match_span[1] - 1:]\n", " displacements.append((match_span[1] - 1, 1))\n", " continue\n", "\n", " if output_word[-1] in 'еeiuoaāäâāɒáA' and not output_word.endswith('haye') and not output_word.endswith('hAye'):\n", " model_output = model_output[:match_span[1]] + '-ye' + model_output[match_span[1]:]\n", " displacements.append((match_span[1], 3))\n", " continue\n", "\n", " if not output_word.endswith('e'):\n", " model_output = model_output[:match_span[1]] + '-e' + model_output[match_span[1]:]\n", " displacements.append((match_span[1], 2))\n", "\n", " for non_match, match_span in non_matches:\n", " match_span = get_updated_span(match_span, displacements)\n", " output_word = model_output[match_span[0]:match_span[1]]\n", " if re.match(r'^-e\\b', model_output[match_span[1]:]):\n", " model_output = model_output[:match_span[1]] + model_output[match_span[1] + 2:]\n", " displacements.append((match_span[1] + 2, -2))\n", " continue\n", "\n", " if re.match(r'^-ye\\b', model_output[match_span[1]:]):\n", " model_output = model_output[:match_span[1]] + model_output[match_span[1] + 3:]\n", " displacements.append((match_span[1] + 3, -3))\n", " continue\n", "\n", " if len(output_word) >= 4 and output_word[-3] in 'еeiuoaāäâāɒáA' and output_word.endswith('ye') and (word_in_dict(output_word[:-2], inverted_merged_dict) and not word_in_dict(output_word, inverted_merged_dict)):\n", " model_output = model_output[:match_span[1] - 2] + model_output[match_span[1]:]\n", " displacements.append((match_span[1], -2))\n", " continue\n", "\n", " if len(output_word) >= 3 and output_word.endswith('e') and (word_in_dict(output_word[:-1], inverted_merged_dict) and not word_in_dict(output_word, inverted_merged_dict)):\n", " model_output = model_output[:match_span[1] - 1] + model_output[match_span[1]:]\n", " displacements.append((match_span[1], -1))\n", "\n", "\n", " model_output = add_1_for_EZ(model_output)\n", "\n", " return model_output" ] }, { "cell_type": "markdown", "metadata": { "id": "AdU8VMTIOWLZ" }, "source": [ "# Get Data" ] }, { "cell_type": "markdown", "metadata": { "id": "XhbCA2tkR45b" }, "source": [ "## Get Merged Dict" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "dGYh5bDyRfTg" }, "outputs": [], "source": [ "merged_dict_path = \"final_merged_dict_pruned_POS_prob.csv\"" ] }, { "cell_type": "code", "execution_count": 16, "metadata": { "id": "WV2x_iLQRhHI" }, "outputs": [], "source": [ "import pandas as pd\n", "merged_dict_df = pd.read_csv(merged_dict_path)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": { "colab": { "base_uri": "https://localhost:8080/" }, "id": "NTDIcLdNUbNX", "outputId": "736b59cd-67cf-4d4f-d4da-4466bff487a1" }, "outputs": [ { "data": { "text/plain": [ "(116630, 5)" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "merged_dict_df.shape" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "id": "JoW9aygRdcP6" }, "outputs": [], "source": [ "consonants_regex = '(?=' + '|'.join(['q', 'r', 't', 'y', 'p', 's', 'd', 'f', 'g', 'h', 'j', 'k', 'l', 'z', 'x', 'c', 'v', 'b', 'n', 'm', 'Q', 'R', 'T', 'Y', 'P', 'S', 'D', 'F', 'G', 'H', 'J', 'K', 'L', 'Z', 'X', 'C', 'V', 'B', 'N', 'M' ]) + ')'\n", "vowels_regex = '(?=' + '|'.join(['a', 'A', 'e', 'i', 'u', 'o']) + ')'" ] }, { "cell_type": "code", "execution_count": 19, "metadata": { "id": "OJTyOEoMR-cV" }, "outputs": [], "source": [ "merged_dict = {}\n", "\n", "for idx, row in merged_dict_df.iterrows():\n", " g, ps, nodiff, pos, prob = row['grapheme'], eval(row['phoneme']), row['nodiff'], eval(row['POS']), eval(row['prob'])\n", " if g not in merged_dict:\n", " merged_dict[g] = {'phoneme': [], 'nodiff': nodiff, 'POS': pos, 'prob': prob}\n", "\n", " for p in ps:\n", " merged_dict[g]['phoneme'].append(''.join(p))" ] }, { "cell_type": "code", "execution_count": 23, "metadata": { "id": "ujrYr29iy9TJ" }, "outputs": [], "source": [ "inverted_merged_dict = {}\n", "\n", "for key, value_list in merged_dict.items():\n", " for value in value_list:\n", " inverted_merged_dict[value] = key" ] }, { "cell_type": "code", "execution_count": 24, "metadata": { "id": "OoaIwa8nOegN" }, "outputs": [], "source": [ "def word_in_dict(word, inverted_dictionary=inverted_merged_dict):\n", " return word in inverted_dictionary" ] }, { "cell_type": "markdown", "metadata": { "id": "XjAPkfq7SF87" }, "source": [ "## Get Evaluation Data" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "id": "hJO-UAPDQvcb" }, "outputs": [], "source": [ "sentence_bench = pd.read_csv('SentenceBench.csv')" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "colab": { "base_uri": "https://localhost:8080/", "height": 143 }, "id": "qlYbrnUa9LAN", "outputId": "ce68f665-681a-454a-9a02-d5d14047fc68" }, "outputs": [ { "data": { "text/html": [ "
\n", " | dataset | \n", "grapheme | \n", "phoneme | \n", "homograph word | \n", "pronunciation | \n", "
---|---|---|---|---|---|
0 | \n", "polyphone | \n", "من قدر تو را میدانم | \n", "man qadr-e to rA mi-dAnam | \n", "قدر | \n", "qadr | \n", "
1 | \n", "polyphone | \n", "از قضای الهی به قدر الهی پناه میبرم | \n", "?az qazAy ?elAhi be qadar-e ?elAhi panAh mi-baram | \n", "قدر | \n", "qadar | \n", "
2 | \n", "polyphone | \n", "به دست و صورتم کرم زدم | \n", "be dast-o suratam kerem zadam | \n", "کرم | \n", "kerem | \n", "