Browse Source

Upload files to 'Scenario3/External'

master
Zahra Asgari 6 days ago
parent
commit
a653772053
2 changed files with 641 additions and 0 deletions
  1. 180
    0
      Scenario3/External/main.py
  2. 461
    0
      Scenario3/External/model.py

+ 180
- 0
Scenario3/External/main.py View File

@@ -0,0 +1,180 @@
# main.py
import argparse
import numpy as np
import torch
from sklearn.metrics import roc_auc_score, average_precision_score
from typing import Dict, List, Tuple
from model import DeepTraCDR, Optimizer
from utils import evaluate_auc
from data_sampler import ExterSampler
from data_loader import load_data
from torch.optim.lr_scheduler import OneCycleLR
def parse_arguments() -> argparse.Namespace:
"""
Parse command-line arguments for the DeepTraCDR model training pipeline.
Returns:
argparse.Namespace: Parsed arguments.
"""
parser = argparse.ArgumentParser(description="DeepTraCDR Advanced: Graph-based Neural Network for Drug Response Prediction")
parser.add_argument('--device', type=str, default="cuda:0" if torch.cuda.is_available() else "cpu",
help="Device to run the model on (cuda:0 or cpu)")
parser.add_argument('--data', type=str, default='tcga', help="Dataset to use (e.g., tcga)")
parser.add_argument('--wd', type=float, default=1e-7, help="Weight decay for optimizer")
parser.add_argument('--layer_size', nargs='+', type=int, default=[512],
help="List of layer sizes for the GCN model")
parser.add_argument('--gamma', type=float, default=20.0, help="Gamma parameter for model")
parser.add_argument('--epochs', type=int, default=1000, help="Number of training epochs")
parser.add_argument('--test_freq', type=int, default=50, help="Frequency of evaluation during training")
parser.add_argument('--lr', type=float, default=0.0005, help="Learning rate for optimizer")
return parser.parse_args()
def initialize_data(args: argparse.Namespace) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, int, argparse.Namespace]:
"""
Load and preprocess the dataset for training.
Args:
args (argparse.Namespace): Command-line arguments.
Returns:
Tuple containing adjacency matrix, drug fingerprints, expression data, null mask, positive sample count, and args.
"""
try:
full_adj, drug_fingerprints, exprs, null_mask, pos_num, args = load_data(args)
print(f"Data loaded successfully:")
print(f" - Adjacency matrix shape: {full_adj.shape}")
print(f" - Expression data shape: {exprs.shape}")
print(f" - Null mask shape: {null_mask.shape}")
print(f" - Drug fingerprints shape: {drug_fingerprints.shape}")
return full_adj, drug_fingerprints, exprs, null_mask, pos_num, args
except Exception as e:
raise RuntimeError(f"Failed to load data: {str(e)}")
def convert_to_tensor(data: np.ndarray, device: str) -> torch.Tensor:
"""
Convert a NumPy array to a PyTorch tensor and move it to the specified device.
Args:
data (np.ndarray): Input NumPy array.
device (str): Target device (e.g., 'cuda:0' or 'cpu').
Returns:
torch.Tensor: Tensor on the specified device.
"""
if isinstance(data, np.ndarray):
return torch.from_numpy(data).float().to(device)
return data.float().to(device)
def train_single_fold(
fold_idx: int,
full_adj: torch.Tensor,
exprs: torch.Tensor,
drug_fingerprints: torch.Tensor,
null_mask: torch.Tensor,
pos_num: int,
args: argparse.Namespace
) -> Tuple[float, float]:
"""
Train the DeepTraCDR model for a single fold and return evaluation metrics.
Args:
fold_idx (int): Current fold index.
full_adj (torch.Tensor): Adjacency matrix.
exprs (torch.Tensor): Gene expression data.
drug_fingerprints (torch.Tensor): Drug fingerprint data.
null_mask (torch.Tensor): Null mask for sampling.
pos_num (int): Number of positive samples.
args (argparse.Namespace): Command-line arguments.
Returns:
Tuple[float, float]: Best AUC and AUPRC for the fold.
"""
# Define train/test split
train_index = np.arange(pos_num)
test_index = np.arange(full_adj.shape[0] - pos_num) + pos_num
# Initialize sampler
sampler = ExterSampler(full_adj, null_mask, train_index, test_index)
# Initialize model
model = DeepTraCDR(
adj_mat=full_adj,
cell_exprs=exprs,
drug_finger=drug_fingerprints,
layer_size=args.layer_size,
gamma=args.gamma,
device=args.device
)
# Initialize optimizer
optimizer = Optimizer(
model=model,
train_data=sampler.train_data,
test_data=sampler.test_data,
test_mask=sampler.test_mask,
train_mask=sampler.train_mask,
adj_matrix=full_adj,
evaluate_fun=evaluate_auc,
lr=args.lr,
wd=args.wd,
epochs=args.epochs,
test_freq=args.test_freq,
device=args.device
)
# Train model and collect metrics
_, _, best_auc, best_auprc = optimizer.train()
print(f"Fold {fold_idx + 1}: AUC={best_auc:.4f}, AUPRC={best_auprc:.4f}")
return best_auc, best_auprc
def summarize_metrics(metrics: Dict[str, List[float]]) -> None:
"""
Summarize metrics across all folds by computing mean and standard deviation.
Args:
metrics (Dict[str, List[float]]): Dictionary of metrics (e.g., {'auc': [...], 'auprc': [...]})
"""
print("\nFinal Average Metrics:")
for metric, values in metrics.items():
mean_val = np.mean(values)
std_val = np.std(values)
print(f"{metric.upper()}: {mean_val:.4f} ± {std_val:.4f}")
def main():
"""
Main function to orchestrate the DeepTraCDR training and evaluation pipeline.
"""
# Set precision for matrix multiplications
torch.set_float32_matmul_precision('high')
# Parse arguments
args = parse_arguments()
# Load and preprocess data
full_adj, drug_fingerprints, exprs, null_mask, pos_num, args = initialize_data(args)
# Convert adjacency matrix to tensor
full_adj = convert_to_tensor(full_adj, args.device)
# Initialize metrics storage
metrics = {'auc': [], 'auprc': []}
n_folds = 25
# Perform k-fold cross-validation
for fold_idx in range(n_folds):
best_auc, best_auprc = train_single_fold(
fold_idx, full_adj, exprs, drug_fingerprints, null_mask, pos_num, args
)
metrics['auc'].append(best_auc)
metrics['auprc'].append(best_auprc)
# Summarize results
summarize_metrics(metrics)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"Error occurred: {str(e)}")
raise

+ 461
- 0
Scenario3/External/model.py View File

@@ -0,0 +1,461 @@
# model.py
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import MultiheadAttention, TransformerEncoder, TransformerEncoderLayer
from typing import Tuple, List
from sklearn.metrics import roc_auc_score, average_precision_score
from utils import torch_corr_x_y, cross_entropy_loss, prototypical_loss
class ConstructAdjMatrix(nn.Module):
"""
Constructs normalized adjacency matrices for graph-based operations.
"""
def __init__(self, original_adj_mat: torch.Tensor | np.ndarray, device: str = "cpu"):
"""
Initialize the adjacency matrix construction module.
Args:
original_adj_mat (torch.Tensor | np.ndarray): Input adjacency matrix.
device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu').
"""
super().__init__()
self.device = device
# Convert to tensor if input is NumPy array
if isinstance(original_adj_mat, np.ndarray):
original_adj_mat = torch.from_numpy(original_adj_mat).float()
self.adj = original_adj_mat.to(device)
def forward(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Compute normalized adjacency matrices for cells and drugs.
Returns:
Tuple of aggregated and self-loop adjacency matrices for cells and drugs.
"""
with torch.no_grad():
# Degree normalization for cells and drugs
d_x = torch.diag(torch.pow(torch.sum(self.adj, dim=1) + 1, -0.5))
d_y = torch.diag(torch.pow(torch.sum(self.adj, dim=0) + 1, -0.5))
# Aggregated Laplacian matrices
agg_cell_lp = torch.mm(torch.mm(d_x, self.adj), d_y)
agg_drug_lp = torch.mm(torch.mm(d_y, self.adj.T), d_x)
# Self-loop matrices
self_cell_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=1) + 1, -1), 1))
self_drug_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=0) + 1, -1), 1))
return (
agg_cell_lp.to(self.device),
agg_drug_lp.to(self.device),
self_cell_lp.to(self.device),
self_drug_lp.to(self.device)
)
class LoadFeature(nn.Module):
"""
Loads and processes cell expression and drug fingerprint features.
"""
def __init__(self, cell_exprs: np.ndarray, drug_fingerprints: List[np.ndarray], device: str = "cpu"):
"""
Initialize the feature loading module.
Args:
cell_exprs (np.ndarray): Cell expression data.
drug_fingerprints (List[np.ndarray]): List of drug fingerprint matrices.
device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu').
"""
super().__init__()
self.device = device
# Convert inputs to tensors
self.cell_exprs = torch.from_numpy(cell_exprs).float().to(device)
self.drug_fingerprints = [torch.from_numpy(fp).float().to(device) for fp in drug_fingerprints]
# Drug projection layers
self.drug_proj = nn.ModuleList([
nn.Sequential(
nn.Linear(fp.shape[1], 512),
nn.BatchNorm1d(512),
nn.GELU(),
nn.Dropout(0.5)
).to(device) for fp in drug_fingerprints
])
# Transformer encoder for drug features
self.transformer = TransformerEncoder(
TransformerEncoderLayer(
d_model=512,
nhead=8,
dim_feedforward=2048,
batch_first=True
),
num_layers=1
).to(device)
# Normalization layers
self.cell_norm = nn.LayerNorm(cell_exprs.shape[1]).to(device)
self.drug_norm = nn.LayerNorm(512).to(device)
# Cell encoder
self.cell_encoder = nn.Sequential(
nn.Linear(cell_exprs.shape[1], 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.5),
nn.Linear(1024, 512)
).to(device)
def forward(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Process cell and drug features.
Returns:
Tuple of encoded cell features and drug features.
"""
# Process cell features
cell_feat = self.cell_norm(self.cell_exprs) # [num_cells x num_features]
cell_encoded = self.cell_encoder(cell_feat) # [num_cells x 512]
# Process drug features
projected = [proj(fp) for proj, fp in zip(self.drug_proj, self.drug_fingerprints)] # List of [num_samples x 512]
stacked = torch.stack(projected, dim=1) # [num_samples x num_drugs x 512]
drug_feat = self.transformer(stacked) # [num_samples x num_drugs x 512]
drug_feat = self.drug_norm(drug_feat.mean(dim=1)) # [num_samples x 512]
return cell_encoded, drug_feat
class GEncoder(nn.Module):
"""
Graph encoder for combining cell and drug features with graph structure.
"""
def __init__(
self,
agg_c_lp: torch.Tensor,
agg_d_lp: torch.Tensor,
self_c_lp: torch.Tensor,
self_d_lp: torch.Tensor,
device: str = "cpu"
):
"""
Initialize the graph encoder.
Args:
agg_c_lp (torch.Tensor): Aggregated cell Laplacian matrix.
agg_d_lp (torch.Tensor): Aggregated drug Laplacian matrix.
self_c_lp (torch.Tensor): Self-loop cell Laplacian matrix.
self_d_lp (torch.Tensor): Self-loop drug Laplacian matrix.
device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu').
"""
super().__init__()
self.agg_c_lp = agg_c_lp
self.agg_d_lp = agg_d_lp
self.self_c_lp = self_c_lp
self.self_d_lp = self_d_lp
self.device = device
# Cell and drug encoders
self.cell_encoder = nn.Sequential(
nn.Linear(512, 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.5),
nn.Linear(1024, 512)
).to(device)
self.drug_encoder = nn.Sequential(
nn.Linear(512, 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.5),
nn.Linear(1024, 512)
).to(device)
# Attention mechanism
self.attention = MultiheadAttention(embed_dim=512, num_heads=8, batch_first=True).to(device)
self.residual = nn.Linear(512, 512).to(device)
# Final fully connected layer
self.fc = nn.Sequential(
nn.Linear(1024, 512),
nn.BatchNorm1d(512),
nn.GELU(),
nn.Dropout(0.5)
).to(device)
def forward(self, cell_f: torch.Tensor, drug_f: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Encode cell and drug features using graph structure and attention.
Args:
cell_f (torch.Tensor): Cell features.
drug_f (torch.Tensor): Drug features.
Returns:
Tuple of encoded cell and drug embeddings.
"""
# Aggregate features using Laplacian matrices
cell_agg = torch.mm(self.agg_c_lp, drug_f) + torch.mm(self.self_c_lp, cell_f) # [num_cells x 512]
drug_agg = torch.mm(self.agg_d_lp, cell_f) + torch.mm(self.self_d_lp, drug_f) # [num_drugs x 512]
# Encode aggregated features
cell_fc = self.cell_encoder(cell_agg) # [num_cells x 512]
drug_fc = self.drug_encoder(drug_agg) # [num_drugs x 512]
# Apply attention mechanism
attn_output, _ = self.attention(
query=cell_fc.unsqueeze(0), # [1 x num_cells x 512]
key=drug_fc.unsqueeze(0), # [1 x num_drugs x 512]
value=drug_fc.unsqueeze(0) # [1 x num_drugs x 512]
)
attn_output = attn_output.squeeze(0) # [num_cells x 512]
cell_emb = cell_fc + self.residual(attn_output) # [num_cells x 512]
# Apply final activation
cell_emb = F.gelu(cell_emb) # [num_cells x 512]
drug_emb = F.gelu(drug_fc) # [num_drugs x 512]
return cell_emb, drug_emb
class GDecoder(nn.Module):
"""
Decoder to predict interaction scores between cells and drugs.
"""
def __init__(self, emb_dim: int, gamma: float):
"""
Initialize the decoder.
Args:
emb_dim (int): Embedding dimension (default: 512).
gamma (float): Scaling factor for output scores.
"""
super().__init__()
self.gamma = gamma
self.decoder = nn.Sequential(
nn.Linear(2 * emb_dim, 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.2),
nn.Linear(1024, 1)
)
self.corr_weight = nn.Parameter(torch.tensor(0.5))
def forward(self, cell_emb: torch.Tensor, drug_emb: torch.Tensor) -> torch.Tensor:
"""
Predict interaction scores between cells and drugs.
Args:
cell_emb (torch.Tensor): Cell embeddings.
drug_emb (torch.Tensor): Drug embeddings.
Returns:
torch.Tensor: Predicted interaction scores.
"""
# Expand dimensions for pairwise combination
cell_exp = cell_emb.unsqueeze(1).repeat(1, drug_emb.size(0), 1) # [num_cells x num_drugs x 512]
drug_exp = drug_emb.unsqueeze(0).repeat(cell_emb.size(0), 1, 1) # [num_cells x num_drugs x 512]
combined = torch.cat([cell_exp, drug_exp], dim=-1) # [num_cells x num_drugs x 1024]
# Decode combined features
scores = self.decoder(combined.view(-1, 2 * cell_emb.size(1))).view(cell_emb.size(0), drug_emb.size(0)) # [num_cells x num_drugs]
corr = torch_corr_x_y(cell_emb, drug_emb) # [num_cells x num_drugs]
# Combine scores and correlation with learned weighting
return torch.sigmoid(self.gamma * (self.corr_weight * scores + (1 - self.corr_weight) * corr))
class DeepTraCDR(nn.Module):
"""
DeepTraCDR model for predicting cell-drug interactions.
"""
def __init__(
self,
adj_mat: torch.Tensor | np.ndarray,
cell_exprs: np.ndarray,
drug_finger: List[np.ndarray],
layer_size: List[int],
gamma: float,
device: str = "cpu"
):
"""
Initialize the DeepTraCDR model.
Args:
adj_mat (torch.Tensor | np.ndarray): Adjacency matrix.
cell_exprs (np.ndarray): Cell expression data.
drug_finger (List[np.ndarray]): List of drug fingerprint matrices.
layer_size (List[int]): Sizes of hidden layers.
gamma (float): Scaling factor for decoder.
device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu').
"""
super().__init__()
self.device = device
if isinstance(adj_mat, np.ndarray):
adj_mat = torch.from_numpy(adj_mat).float()
self.adj_mat = adj_mat.to(device)
# Initialize submodules
self.construct_adj = ConstructAdjMatrix(self.adj_mat, device=device)
self.load_feat = LoadFeature(cell_exprs, drug_finger, device=device)
# Compute fixed adjacency matrices
agg_c, agg_d, self_c, self_d = self.construct_adj()
# Initialize encoder and decoder
self.encoder = GEncoder(agg_c, agg_d, self_c, self_d, device=device).to(device)
self.decoder = GDecoder(emb_dim=512, gamma=gamma).to(device)
def forward(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Forward pass of the DeepTraCDR model.
Returns:
Tuple of predicted scores, cell embeddings, and drug embeddings.
"""
cell_f, drug_f = self.load_feat()
cell_emb, drug_emb = self.encoder(cell_f, drug_f)
scores = self.decoder(cell_emb, drug_emb)
return scores, cell_emb, drug_emb
class Optimizer:
"""
Optimizer for training the DeepTraCDR model with early stopping and evaluation.
"""
def __init__(
self,
model: DeepTraCDR,
train_data: torch.Tensor,
test_data: torch.Tensor,
test_mask: torch.Tensor,
train_mask: torch.Tensor,
adj_matrix: torch.Tensor | np.ndarray,
evaluate_fun,
lr: float = 0.001,
wd: float = 1e-05,
epochs: int = 200,
test_freq: int = 20,
patience: int = 200,
device: str = "cpu"
):
"""
Initialize the optimizer.
Args:
model (DeepTraCDR): The DeepTraCDR model to train.
train_data (torch.Tensor): Training data.
test_data (torch.Tensor): Test data.
test_mask (torch.Tensor): Mask for test data.
train_mask (torch.Tensor): Mask for training data.
adj_matrix (torch.Tensor | np.ndarray): Adjacency matrix.
evaluate_fun: Function to evaluate model performance.
lr (float): Learning rate.
wd (float): Weight decay.
epochs (int): Number of training epochs.
test_freq (int): Frequency of evaluation.
patience (int): Patience for early stopping.
device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu').
"""
self.model = model.to(device)
self.train_data = train_data.float().to(device)
self.test_data = test_data.float().to(device)
self.train_mask = train_mask.to(device)
self.test_mask_bool = test_mask.to(device).bool()
self.device = device
# Convert adjacency matrix to tensor
if isinstance(adj_matrix, np.ndarray):
adj_matrix = torch.from_numpy(adj_matrix).float()
self.adj_matrix = adj_matrix.to(device)
self.evaluate_fun = evaluate_fun
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, weight_decay=wd)
self.epochs = epochs
self.test_freq = test_freq
self.patience = patience
self.best_auc = 0.0
self.best_auprc = 0.0
self.best_weights = None
self.counter = 0
def train(self) -> Tuple[np.ndarray, np.ndarray, float, float]:
"""
Train the DeepTraCDR model and evaluate performance.
Returns:
Tuple of true labels, predicted scores, best AUC, and best AUPRC.
"""
true_data = torch.masked_select(self.test_data, self.test_mask_bool).cpu().numpy()
for epoch in range(self.epochs):
self.model.train()
# Forward pass and compute loss
pred_train, cell_emb, drug_emb = self.model()
ce_loss = cross_entropy_loss(self.train_data, pred_train, self.train_mask)
proto_loss = prototypical_loss(cell_emb, drug_emb, self.adj_matrix)
total_loss = 0.7 * ce_loss + 0.3 * proto_loss
# Backward pass
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
# Evaluate model
self.model.eval()
with torch.no_grad():
# Training metrics
train_pred, _, _ = self.model()
train_pred_masked = torch.masked_select(train_pred, self.train_mask).cpu().numpy()
train_true_data = torch.masked_select(self.train_data, self.train_mask).cpu().numpy()
try:
train_auc = roc_auc_score(train_true_data, train_pred_masked)
train_auprc = average_precision_score(train_true_data, train_pred_masked)
except ValueError:
train_auc, train_auprc = 0.0, 0.0
# Test metrics
pred_eval, _, _ = self.model()
pred_masked = torch.masked_select(pred_eval, self.test_mask_bool).cpu().numpy()
try:
auc = roc_auc_score(true_data, pred_masked)
auprc = average_precision_score(true_data, pred_masked)
except ValueError:
auc, auprc = 0.0, 0.0
# Update best metrics and weights
if auc > self.best_auc:
self.best_auc = auc
self.best_auprc = auprc
self.best_weights = self.model.state_dict().copy()
self.counter = 0
else:
self.counter += 1
# Log progress
if epoch % self.test_freq == 0 or epoch == self.epochs - 1:
print(f"Epoch {epoch}: Loss={total_loss.item():.4f}, "
f"Train AUC={train_auc:.4f}, Train AUPRC={train_auprc:.4f}, "
f"Test AUC={auc:.4f}, Test AUPRC={auprc:.4f}")
# Early stopping
if self.counter >= self.patience:
print(f"\nEarly stopping at epoch {epoch}: No AUC improvement for {self.patience} epochs.")
break
# Load best weights
if self.best_weights is not None:
self.model.load_state_dict(self.best_weights)
# Final evaluation
self.model.eval()
with torch.no_grad():
final_pred, _, _ = self.model()
final_pred_masked = torch.masked_select(final_pred, self.test_mask_bool).cpu().numpy()
best_auc = roc_auc_score(true_data, final_pred_masked)
best_auprc = average_precision_score(true_data, final_pred_masked)
print("\nBest Metrics (Test Data):")
print(f"AUC: {best_auc:.4f}")
print(f"AUPRC: {best_auprc:.4f}")
return true_data, final_pred_masked, best_auc, best_auprc

Loading…
Cancel
Save