123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397 |
- # %%
- import os
- import pandas as pd
- import numpy as np
- import evaluate
- from transformers import AutoTokenizer, T5ForConditionalGeneration, Seq2SeqTrainer, Seq2SeqTrainingArguments
- from dataclasses import dataclass
- from typing import Union, Dict, List
-
- import pandas as pd
- import numpy as np
- from datasets import Dataset
- import argparse
- import torch
- import evaluate
-
- import os
- from dataclasses import dataclass
- from typing import Union, Dict, List, Optional
- from transformers import AdamW, AutoTokenizer, T5ForConditionalGeneration, T5Config
- from transformers import (
- DataCollator,
- Seq2SeqTrainer,
- Seq2SeqTrainingArguments,
- set_seed,
- )
-
- os.environ["WANDB_DISABLED"] = "true"
-
- # %%
- set_seed(41)
-
- # %%
- def prepare_dataset(batch):
-
- batch['input_ids'] = batch['Grapheme']
- batch['labels'] = batch['Mapped Phoneme']
-
- return batch
-
- # %%
- # Data collator for padding
- @dataclass
- class DataCollatorWithPadding:
- tokenizer: AutoTokenizer
- padding: Union[bool, str] = True
-
- def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
- words = [feature["input_ids"] for feature in features]
- prons = [feature["labels"] for feature in features]
- batch = self.tokenizer(words, padding=self.padding, add_special_tokens=False, return_attention_mask=True, return_tensors='pt')
- pron_batch = self.tokenizer(prons, padding=self.padding, add_special_tokens=True, return_attention_mask=True, return_tensors='pt')
- batch['labels'] = pron_batch['input_ids'].masked_fill(pron_batch.attention_mask.ne(1), -100)
- return batch
-
- # %%
- # Compute metrics (CER and WER)
- def compute_metrics(pred):
- labels_ids = pred.label_ids
- pred_ids = pred.predictions
- pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
- labels_ids[labels_ids == -100] = tokenizer.pad_token_id
- label_str = tokenizer.batch_decode(labels_ids, skip_special_tokens=True)
- cer = cer_metric.compute(predictions=pred_str, references=label_str)
- wer = wer_metric.compute(predictions=pred_str, references=label_str)
- return {"cer": cer, 'wer': wer}
-
- # setting the evaluation metrics
- cer_metric = evaluate.load("cer")
- wer_metric = evaluate.load('wer')
-
- # %% [markdown]
- # # Phase 1
-
- # %%
- def load_pronuncation_dictionary(path, train=True, homograph_only=False, human=False) -> Dataset:
- # path = 'PersianG2P_final.csv'
-
- # Read the CSV file
- df = pd.read_csv(path, index_col=[0])
-
- if homograph_only:
- if human:
- df = df[df['Source'] == 'human']
- if not human:
- df = df[df['Source'] != 'human']
-
- # Drop unnecessary columns
- df = df.drop(['Source', 'Source ID'], axis=1)
-
- # Drop rows where 'Phoneme' is NaN
- df = df.dropna(subset=['Mapped Phoneme'])
-
- # Filter rows based on phoneme length
- Plen = np.array([len(i) for i in df['Mapped Phoneme']])
- df = df.iloc[Plen < 512, :]
-
- # Filter rows based on 'Homograph Grapheme' column
- if homograph_only:
- df = df[df['Homograph Grapheme'].notna() & (df['Homograph Grapheme'] != '')]
- else:
- df = df[df['Homograph Grapheme'].isna() | (df['Homograph Grapheme'] == '')]
-
- # Shuffle the DataFrame
- df = df.sample(frac=1)
-
- # Split into train and test sets
- if train:
- return Dataset.from_pandas(df.iloc[:len(df)-90, :])
- else:
- return Dataset.from_pandas(df.iloc[len(df)-90:, :])
-
- # %%
- # Load datasets (only rows with 'Homograph Grapheme')
- train_data = load_pronuncation_dictionary('PersianG2P_final.csv', train=True)
- train_data = train_data.map(prepare_dataset)
- train_dataset = train_data
-
- dev_data = load_pronuncation_dictionary('PersianG2P_final.csv', train=False)
- dev_data = dev_data.map(prepare_dataset)
- dev_dataset = dev_data
-
- # # Load tokenizer and model from checkpoint
- # checkpoint_path = "checkpoint-320" # Path to your checkpoint
- # tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)
- # model = T5ForConditionalGeneration.from_pretrained(checkpoint_path)
- # # Load tokenizer and model from checkpoint
- # checkpoint_path = "checkpoint-320" # Path to your checkpoint
- tokenizer = AutoTokenizer.from_pretrained('google/byt5-small')
- # model = T5ForConditionalGeneration.from_pretrained(checkpoint_path)
-
- config = T5Config.from_pretrained('google/byt5-small')
-
- config.num_decoder_layers = 2
- config.num_layers = 2
- config.d_kv = 64
- config.d_model = 512
- config.d_ff = 512
-
- print('Initializing a ByT5 model...')
- model = T5ForConditionalGeneration(config)
-
-
- # Data collator
- data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
-
- # Training arguments (default values)
- training_args = Seq2SeqTrainingArguments(
- output_dir="./phase1-t5", # Directory to save the fine-tuned model
- predict_with_generate=True,
- generation_num_beams=5,
- generation_max_length=512,
- evaluation_strategy="steps",
- per_device_train_batch_size=32, # Default batch size
- per_device_eval_batch_size=100, # Default batch size
- num_train_epochs=5, # Fewer epochs for this step
- learning_rate=5e-4, # Default learning rate
- warmup_steps=1000, # Default warmup steps
- logging_steps=1000, # Default logging steps
- save_steps=4000, # Default save steps
- eval_steps=1000, # Default evaluation steps
- save_total_limit=2, # Keep only the last 2 checkpoints
- load_best_model_at_end=True, # Load the best model at the end of training
- fp16=False, # Disable FP16 by default
- remove_unused_columns=False,
- )
-
- # Trainer
- trainer = Seq2SeqTrainer(
- model=model,
- tokenizer=tokenizer,
- args=training_args,
- compute_metrics=compute_metrics,
- train_dataset=train_dataset,
- eval_dataset=dev_dataset,
- data_collator=data_collator,
- )
-
- # Fine-tune the model
- trainer.train()
-
- # Save the fine-tuned model
- trainer.save_model("./phase1-t5")
-
- # %%
- import matplotlib.pyplot as plt
-
- # Extract training and validation loss from the log history
- train_loss = []
- val_loss = []
- for log in trainer.state.log_history:
- if "loss" in log:
- train_loss.append(log["loss"])
- if "eval_loss" in log:
- val_loss.append(log["eval_loss"])
-
- # Plot the training and validation loss
- plt.figure(figsize=(10, 6))
- plt.plot(train_loss, label="Training Loss", marker="o")
- plt.plot(val_loss, label="Validation Loss", marker="o")
- plt.xlabel("Steps")
- plt.ylabel("Loss")
- plt.title("Training and Validation Loss")
- plt.legend()
- plt.grid()
-
- # Save the plot to disk
- plt.savefig("phase1-t5.png")
-
- # Optionally, close the plot to free up memory
- plt.close()
-
- # %% [markdown]
- # # Phase 2
-
- # %%
- # Load datasets (only rows with 'Homograph Grapheme')
- train_data = load_pronuncation_dictionary('PersianG2P_final.csv',
- train=True,
- homograph_only=True)
- train_data = train_data.map(prepare_dataset)
- train_dataset = train_data
-
- dev_data = load_pronuncation_dictionary('PersianG2P_final.csv',
- train=False,
- homograph_only=True)
- dev_data = dev_data.map(prepare_dataset)
- dev_dataset = dev_data
-
- # Load tokenizer and model from the previous fine-tuning step
- checkpoint_path = "./phase1-t5" # Path to the model from Step 1
- tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)
- model = T5ForConditionalGeneration.from_pretrained(checkpoint_path)
-
- # Data collator
- data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
-
- # Training arguments (default values)
- training_args = Seq2SeqTrainingArguments(
- output_dir="./phase2-t5", # Directory to save the final fine-tuned model
- predict_with_generate=True,
- generation_num_beams=5,
- generation_max_length=512,
- evaluation_strategy="steps",
- per_device_train_batch_size=32, # Default batch size
- per_device_eval_batch_size=100, # Default batch size
- num_train_epochs=30, # More epochs for this step
- learning_rate=5e-4, # Lower learning rate for fine-tuning
- warmup_steps=1000, # Default warmup steps
- logging_steps=1000, # Default logging steps
- save_steps=4000, # Default save steps
- eval_steps=1000, # Default evaluation steps
- save_total_limit=2, # Keep only the last 2 checkpoints
- load_best_model_at_end=True, # Load the best model at the end of training
- fp16=False, # Disable FP16 by default
- )
-
- # Trainer
- trainer = Seq2SeqTrainer(
- model=model,
- tokenizer=tokenizer,
- args=training_args,
- compute_metrics=compute_metrics,
- train_dataset=train_dataset,
- eval_dataset=dev_dataset,
- data_collator=data_collator,
- )
-
- # Fine-tune the model
- trainer.train()
-
- # Save the fine-tuned model
- trainer.save_model("./phase2-t5")
-
-
- # %%
- import matplotlib.pyplot as plt
-
- # Extract training and validation loss from the log history
- train_loss = []
- val_loss = []
- for log in trainer.state.log_history:
- if "loss" in log:
- train_loss.append(log["loss"])
- if "eval_loss" in log:
- val_loss.append(log["eval_loss"])
-
- # Plot the training and validation loss
- plt.figure(figsize=(10, 6))
- plt.plot(train_loss, label="Training Loss", marker="o")
- plt.plot(val_loss, label="Validation Loss", marker="o")
- plt.xlabel("Steps")
- plt.ylabel("Loss")
- plt.title("Training and Validation Loss")
- plt.legend()
- plt.grid()
-
- # Save the plot to disk
- plt.savefig("phase2-t5.png")
-
- # Optionally, close the plot to free up memory
- plt.close()
-
- # %% [markdown]
- # # Phase 3
-
- # %%
- # Load datasets (only rows with 'Homograph Grapheme')
- train_data = load_pronuncation_dictionary('PersianG2P_final_augmented_final.csv',
- train=True,
- homograph_only=True,
- human=True)
- train_data = train_data.map(prepare_dataset)
- train_dataset = train_data
-
- dev_data = load_pronuncation_dictionary('PersianG2P_final_augmented_final.csv',
- train=False,
- homograph_only=True,
- human=True)
- dev_data = dev_data.map(prepare_dataset)
- dev_dataset = dev_data
-
- # Load tokenizer and model from the previous fine-tuning step
- checkpoint_path = "./phase2-t5" # Path to the model from Step 1
- tokenizer = AutoTokenizer.from_pretrained(checkpoint_path)
- model = T5ForConditionalGeneration.from_pretrained(checkpoint_path)
-
- # Data collator
- data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
-
- # Training arguments (default values)
- training_args = Seq2SeqTrainingArguments(
- output_dir="./phase3-t5", # Directory to save the final fine-tuned model
- predict_with_generate=True,
- generation_num_beams=5,
- generation_max_length=512,
- evaluation_strategy="steps",
- per_device_train_batch_size=32, # Default batch size
- per_device_eval_batch_size=100, # Default batch size
- num_train_epochs=50, # More epochs for this step
- learning_rate=5e-4, # Lower learning rate for fine-tuning
- warmup_steps=1000, # Default warmup steps
- logging_steps=1000, # Default logging steps
- save_steps=4000, # Default save steps
- eval_steps=1000, # Default evaluation steps
- save_total_limit=2, # Keep only the last 2 checkpoints
- load_best_model_at_end=True, # Load the best model at the end of training
- fp16=False, # Disable FP16 by default
- )
-
- # Trainer
- trainer = Seq2SeqTrainer(
- model=model,
- tokenizer=tokenizer,
- args=training_args,
- compute_metrics=compute_metrics,
- train_dataset=train_dataset,
- eval_dataset=dev_dataset,
- data_collator=data_collator,
- )
-
- # Fine-tune the model
- trainer.train()
-
- # Save the fine-tuned model
- trainer.save_model("./phase3-t5")
-
-
- # %%
- import matplotlib.pyplot as plt
-
- # Extract training and validation loss from the log history
- train_loss = []
- val_loss = []
- for log in trainer.state.log_history:
- if "loss" in log:
- train_loss.append(log["loss"])
- if "eval_loss" in log:
- val_loss.append(log["eval_loss"])
-
- # Plot the training and validation loss
- plt.figure(figsize=(10, 6))
- plt.plot(train_loss, label="Training Loss", marker="o")
- plt.plot(val_loss, label="Validation Loss", marker="o")
- plt.xlabel("Steps")
- plt.ylabel("Loss")
- plt.title("Training and Validation Loss")
- plt.legend()
- plt.grid()
-
- # Save the plot to disk
- plt.savefig("phase3-t5.png")
-
- # Optionally, close the plot to free up memory
- plt.close()
-
|