123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import argparse
- import numpy as np
- import torch
- from sklearn.model_selection import KFold
- from DeepTraCDR_model import DeepTraCDR, Optimizer
- from data_sampler import RegressionSampler
- from data_loader import load_data
-
- def parse_arguments() -> argparse.Namespace:
- """
- Parses command-line arguments for the DeepTraCDR regression task.
-
- Returns:
- Parsed arguments as a Namespace object.
- """
- parser = argparse.ArgumentParser(description="DeepTraCDR Regression Task")
- parser.add_argument('-device', type=str, default="cuda:0" if torch.cuda.is_available() else "cpu",
- help="Device to run the model on (e.g., 'cuda:0' or 'cpu')")
- parser.add_argument('-data', type=str, default='gdsc', help="Dataset to use (default: gdsc)")
- parser.add_argument('--wd', type=float, default=1e-5, help="Weight decay for optimizer")
- parser.add_argument('--layer_size', nargs='+', type=int, default=[512], help="Layer sizes for the model")
- parser.add_argument('--gamma', type=float, default=15, help="Gamma parameter for decoder")
- parser.add_argument('--epochs', type=int, default=1000, help="Number of training epochs")
- parser.add_argument('--test_freq', type=int, default=50, help="Frequency of evaluation during training")
- parser.add_argument('--lr', type=float, default=0.0001, help="Learning rate for optimizer")
- parser.add_argument('--patience', type=int, default=20, help="Patience for early stopping")
- return parser.parse_args()
-
- def normalize_adj_matrix(adj_matrix: np.ndarray) -> torch.Tensor:
- """
- Normalizes the adjacency matrix using min-shift normalization and converts it to a torch tensor.
-
- Args:
- adj_matrix: Input adjacency matrix as a NumPy array.
-
- Returns:
- Normalized adjacency matrix as a torch tensor.
- """
- adj_matrix = adj_matrix - np.min(adj_matrix)
- if isinstance(adj_matrix, np.ndarray):
- adj_matrix = torch.from_numpy(adj_matrix).float()
- return adj_matrix
-
- def main():
- """
- Main function to run the DeepTraCDR regression task with k-fold cross-validation.
- """
- # Set precision for matrix multiplication
- torch.set_float32_matmul_precision('high')
-
- # Parse command-line arguments
- args = parse_arguments()
-
- # Load dataset
- full_adj, drug_fingerprints, exprs, null_mask, pos_num, args = load_data(args)
- print(f"Original full_adj shape: {full_adj.shape}")
- print(f"Normalized full_adj shape: {full_adj.shape}")
- print("\n--- Data Shapes ---")
- print(f"Expression data shape: {exprs.shape}")
- print(f"Null mask shape: {null_mask.shape}")
-
- # Normalize adjacency matrix
- full_adj = normalize_adj_matrix(full_adj)
-
- # Initialize k-fold cross-validation parameters
- k = 5
- n_kfolds = 5
- all_metrics = {'rmse': [], 'pcc': [], 'scc': []}
-
- # Perform k-fold cross-validation
- for n_kfold in range(n_kfolds):
- kfold = KFold(n_splits=k, shuffle=True, random_state=n_kfold)
- for fold, (train_idx, test_idx) in enumerate(kfold.split(np.arange(pos_num))):
- # Initialize data sampler
- sampler = RegressionSampler(full_adj, train_idx, test_idx, null_mask)
-
- # Initialize model
- model = DeepTraCDR(
- adj_mat=full_adj,
- cell_exprs=exprs,
- drug_finger=drug_fingerprints,
- layer_size=args.layer_size,
- gamma=args.gamma,
- device=args.device
- )
-
- # Initialize optimizer
- opt = Optimizer(
- model=model,
- train_data=sampler.train_data,
- test_data=sampler.test_data,
- test_mask=sampler.test_mask,
- train_mask=sampler.train_mask,
- adj_matrix=full_adj,
- lr=args.lr,
- wd=args.wd,
- epochs=args.epochs,
- test_freq=args.test_freq,
- device=args.device,
- patience=args.patience
- )
-
- # Train model and collect metrics
- true, pred, best_rmse, best_pcc, best_scc = opt.train()
- all_metrics['rmse'].append(best_rmse)
- all_metrics['pcc'].append(best_pcc)
- all_metrics['scc'].append(best_scc)
-
- print(f"Fold {n_kfold * k + fold + 1}: RMSE={best_rmse:.4f}, PCC={best_pcc:.4f}, SCC={best_scc:.4f}")
-
- # Compute and display final average metrics
- print("\nFinal Average Metrics:")
- for metric, values in all_metrics.items():
- mean = np.mean(values)
- std = np.std(values)
- print(f"{metric.upper()}: {mean:.4f} ± {std:.4f}")
-
- if __name__ == "__main__":
- main()
|