# model.py import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.nn import MultiheadAttention, TransformerEncoder, TransformerEncoderLayer from typing import Tuple, List from sklearn.metrics import roc_auc_score, average_precision_score from utils import torch_corr_x_y, cross_entropy_loss, prototypical_loss class ConstructAdjMatrix(nn.Module): """ Constructs normalized adjacency matrices for graph-based operations. """ def __init__(self, original_adj_mat: torch.Tensor | np.ndarray, device: str = "cpu"): """ Initialize the adjacency matrix construction module. Args: original_adj_mat (torch.Tensor | np.ndarray): Input adjacency matrix. device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu'). """ super().__init__() self.device = device # Convert to tensor if input is NumPy array if isinstance(original_adj_mat, np.ndarray): original_adj_mat = torch.from_numpy(original_adj_mat).float() self.adj = original_adj_mat.to(device) def forward(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """ Compute normalized adjacency matrices for cells and drugs. Returns: Tuple of aggregated and self-loop adjacency matrices for cells and drugs. """ with torch.no_grad(): # Degree normalization for cells and drugs d_x = torch.diag(torch.pow(torch.sum(self.adj, dim=1) + 1, -0.5)) d_y = torch.diag(torch.pow(torch.sum(self.adj, dim=0) + 1, -0.5)) # Aggregated Laplacian matrices agg_cell_lp = torch.mm(torch.mm(d_x, self.adj), d_y) agg_drug_lp = torch.mm(torch.mm(d_y, self.adj.T), d_x) # Self-loop matrices self_cell_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=1) + 1, -1), 1)) self_drug_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=0) + 1, -1), 1)) return ( agg_cell_lp.to(self.device), agg_drug_lp.to(self.device), self_cell_lp.to(self.device), self_drug_lp.to(self.device) ) class LoadFeature(nn.Module): """ Loads and processes cell expression and drug fingerprint features. """ def __init__(self, cell_exprs: np.ndarray, drug_fingerprints: List[np.ndarray], device: str = "cpu"): """ Initialize the feature loading module. Args: cell_exprs (np.ndarray): Cell expression data. drug_fingerprints (List[np.ndarray]): List of drug fingerprint matrices. device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu'). """ super().__init__() self.device = device # Convert inputs to tensors self.cell_exprs = torch.from_numpy(cell_exprs).float().to(device) self.drug_fingerprints = [torch.from_numpy(fp).float().to(device) for fp in drug_fingerprints] # Drug projection layers self.drug_proj = nn.ModuleList([ nn.Sequential( nn.Linear(fp.shape[1], 512), nn.BatchNorm1d(512), nn.GELU(), nn.Dropout(0.5) ).to(device) for fp in drug_fingerprints ]) # Transformer encoder for drug features self.transformer = TransformerEncoder( TransformerEncoderLayer( d_model=512, nhead=8, dim_feedforward=2048, batch_first=True ), num_layers=1 ).to(device) # Normalization layers self.cell_norm = nn.LayerNorm(cell_exprs.shape[1]).to(device) self.drug_norm = nn.LayerNorm(512).to(device) # Cell encoder self.cell_encoder = nn.Sequential( nn.Linear(cell_exprs.shape[1], 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.5), nn.Linear(1024, 512) ).to(device) def forward(self) -> Tuple[torch.Tensor, torch.Tensor]: """ Process cell and drug features. Returns: Tuple of encoded cell features and drug features. """ # Process cell features cell_feat = self.cell_norm(self.cell_exprs) # [num_cells x num_features] cell_encoded = self.cell_encoder(cell_feat) # [num_cells x 512] # Process drug features projected = [proj(fp) for proj, fp in zip(self.drug_proj, self.drug_fingerprints)] # List of [num_samples x 512] stacked = torch.stack(projected, dim=1) # [num_samples x num_drugs x 512] drug_feat = self.transformer(stacked) # [num_samples x num_drugs x 512] drug_feat = self.drug_norm(drug_feat.mean(dim=1)) # [num_samples x 512] return cell_encoded, drug_feat class GEncoder(nn.Module): """ Graph encoder for combining cell and drug features with graph structure. """ def __init__( self, agg_c_lp: torch.Tensor, agg_d_lp: torch.Tensor, self_c_lp: torch.Tensor, self_d_lp: torch.Tensor, device: str = "cpu" ): """ Initialize the graph encoder. Args: agg_c_lp (torch.Tensor): Aggregated cell Laplacian matrix. agg_d_lp (torch.Tensor): Aggregated drug Laplacian matrix. self_c_lp (torch.Tensor): Self-loop cell Laplacian matrix. self_d_lp (torch.Tensor): Self-loop drug Laplacian matrix. device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu'). """ super().__init__() self.agg_c_lp = agg_c_lp self.agg_d_lp = agg_d_lp self.self_c_lp = self_c_lp self.self_d_lp = self_d_lp self.device = device # Cell and drug encoders self.cell_encoder = nn.Sequential( nn.Linear(512, 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.5), nn.Linear(1024, 512) ).to(device) self.drug_encoder = nn.Sequential( nn.Linear(512, 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.5), nn.Linear(1024, 512) ).to(device) # Attention mechanism self.attention = MultiheadAttention(embed_dim=512, num_heads=8, batch_first=True).to(device) self.residual = nn.Linear(512, 512).to(device) # Final fully connected layer self.fc = nn.Sequential( nn.Linear(1024, 512), nn.BatchNorm1d(512), nn.GELU(), nn.Dropout(0.5) ).to(device) def forward(self, cell_f: torch.Tensor, drug_f: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]: """ Encode cell and drug features using graph structure and attention. Args: cell_f (torch.Tensor): Cell features. drug_f (torch.Tensor): Drug features. Returns: Tuple of encoded cell and drug embeddings. """ # Aggregate features using Laplacian matrices cell_agg = torch.mm(self.agg_c_lp, drug_f) + torch.mm(self.self_c_lp, cell_f) # [num_cells x 512] drug_agg = torch.mm(self.agg_d_lp, cell_f) + torch.mm(self.self_d_lp, drug_f) # [num_drugs x 512] # Encode aggregated features cell_fc = self.cell_encoder(cell_agg) # [num_cells x 512] drug_fc = self.drug_encoder(drug_agg) # [num_drugs x 512] # Apply attention mechanism attn_output, _ = self.attention( query=cell_fc.unsqueeze(0), # [1 x num_cells x 512] key=drug_fc.unsqueeze(0), # [1 x num_drugs x 512] value=drug_fc.unsqueeze(0) # [1 x num_drugs x 512] ) attn_output = attn_output.squeeze(0) # [num_cells x 512] cell_emb = cell_fc + self.residual(attn_output) # [num_cells x 512] # Apply final activation cell_emb = F.gelu(cell_emb) # [num_cells x 512] drug_emb = F.gelu(drug_fc) # [num_drugs x 512] return cell_emb, drug_emb class GDecoder(nn.Module): """ Decoder to predict interaction scores between cells and drugs. """ def __init__(self, emb_dim: int, gamma: float): """ Initialize the decoder. Args: emb_dim (int): Embedding dimension (default: 512). gamma (float): Scaling factor for output scores. """ super().__init__() self.gamma = gamma self.decoder = nn.Sequential( nn.Linear(2 * emb_dim, 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.2), nn.Linear(1024, 1) ) self.corr_weight = nn.Parameter(torch.tensor(0.5)) def forward(self, cell_emb: torch.Tensor, drug_emb: torch.Tensor) -> torch.Tensor: """ Predict interaction scores between cells and drugs. Args: cell_emb (torch.Tensor): Cell embeddings. drug_emb (torch.Tensor): Drug embeddings. Returns: torch.Tensor: Predicted interaction scores. """ # Expand dimensions for pairwise combination cell_exp = cell_emb.unsqueeze(1).repeat(1, drug_emb.size(0), 1) # [num_cells x num_drugs x 512] drug_exp = drug_emb.unsqueeze(0).repeat(cell_emb.size(0), 1, 1) # [num_cells x num_drugs x 512] combined = torch.cat([cell_exp, drug_exp], dim=-1) # [num_cells x num_drugs x 1024] # Decode combined features scores = self.decoder(combined.view(-1, 2 * cell_emb.size(1))).view(cell_emb.size(0), drug_emb.size(0)) # [num_cells x num_drugs] corr = torch_corr_x_y(cell_emb, drug_emb) # [num_cells x num_drugs] # Combine scores and correlation with learned weighting return torch.sigmoid(self.gamma * (self.corr_weight * scores + (1 - self.corr_weight) * corr)) class DeepTraCDR(nn.Module): """ DeepTraCDR model for predicting cell-drug interactions. """ def __init__( self, adj_mat: torch.Tensor | np.ndarray, cell_exprs: np.ndarray, drug_finger: List[np.ndarray], layer_size: List[int], gamma: float, device: str = "cpu" ): """ Initialize the DeepTraCDR model. Args: adj_mat (torch.Tensor | np.ndarray): Adjacency matrix. cell_exprs (np.ndarray): Cell expression data. drug_finger (List[np.ndarray]): List of drug fingerprint matrices. layer_size (List[int]): Sizes of hidden layers. gamma (float): Scaling factor for decoder. device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu'). """ super().__init__() self.device = device if isinstance(adj_mat, np.ndarray): adj_mat = torch.from_numpy(adj_mat).float() self.adj_mat = adj_mat.to(device) # Initialize submodules self.construct_adj = ConstructAdjMatrix(self.adj_mat, device=device) self.load_feat = LoadFeature(cell_exprs, drug_finger, device=device) # Compute fixed adjacency matrices agg_c, agg_d, self_c, self_d = self.construct_adj() # Initialize encoder and decoder self.encoder = GEncoder(agg_c, agg_d, self_c, self_d, device=device).to(device) self.decoder = GDecoder(emb_dim=512, gamma=gamma).to(device) def forward(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]: """ Forward pass of the DeepTraCDR model. Returns: Tuple of predicted scores, cell embeddings, and drug embeddings. """ cell_f, drug_f = self.load_feat() cell_emb, drug_emb = self.encoder(cell_f, drug_f) scores = self.decoder(cell_emb, drug_emb) return scores, cell_emb, drug_emb class Optimizer: """ Optimizer for training the DeepTraCDR model with early stopping and evaluation. """ def __init__( self, model: DeepTraCDR, train_data: torch.Tensor, test_data: torch.Tensor, test_mask: torch.Tensor, train_mask: torch.Tensor, adj_matrix: torch.Tensor | np.ndarray, evaluate_fun, lr: float = 0.001, wd: float = 1e-05, epochs: int = 200, test_freq: int = 20, patience: int = 200, device: str = "cpu" ): """ Initialize the optimizer. Args: model (DeepTraCDR): The DeepTraCDR model to train. train_data (torch.Tensor): Training data. test_data (torch.Tensor): Test data. test_mask (torch.Tensor): Mask for test data. train_mask (torch.Tensor): Mask for training data. adj_matrix (torch.Tensor | np.ndarray): Adjacency matrix. evaluate_fun: Function to evaluate model performance. lr (float): Learning rate. wd (float): Weight decay. epochs (int): Number of training epochs. test_freq (int): Frequency of evaluation. patience (int): Patience for early stopping. device (str): Device to run computations on (e.g., 'cuda:0' or 'cpu'). """ self.model = model.to(device) self.train_data = train_data.float().to(device) self.test_data = test_data.float().to(device) self.train_mask = train_mask.to(device) self.test_mask_bool = test_mask.to(device).bool() self.device = device # Convert adjacency matrix to tensor if isinstance(adj_matrix, np.ndarray): adj_matrix = torch.from_numpy(adj_matrix).float() self.adj_matrix = adj_matrix.to(device) self.evaluate_fun = evaluate_fun self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, weight_decay=wd) self.epochs = epochs self.test_freq = test_freq self.patience = patience self.best_auc = 0.0 self.best_auprc = 0.0 self.best_weights = None self.counter = 0 def train(self) -> Tuple[np.ndarray, np.ndarray, float, float]: """ Train the DeepTraCDR model and evaluate performance. Returns: Tuple of true labels, predicted scores, best AUC, and best AUPRC. """ true_data = torch.masked_select(self.test_data, self.test_mask_bool).cpu().numpy() for epoch in range(self.epochs): self.model.train() # Forward pass and compute loss pred_train, cell_emb, drug_emb = self.model() ce_loss = cross_entropy_loss(self.train_data, pred_train, self.train_mask) proto_loss = prototypical_loss(cell_emb, drug_emb, self.adj_matrix) total_loss = 0.7 * ce_loss + 0.3 * proto_loss # Backward pass self.optimizer.zero_grad() total_loss.backward() self.optimizer.step() # Evaluate model self.model.eval() with torch.no_grad(): # Training metrics train_pred, _, _ = self.model() train_pred_masked = torch.masked_select(train_pred, self.train_mask).cpu().numpy() train_true_data = torch.masked_select(self.train_data, self.train_mask).cpu().numpy() try: train_auc = roc_auc_score(train_true_data, train_pred_masked) train_auprc = average_precision_score(train_true_data, train_pred_masked) except ValueError: train_auc, train_auprc = 0.0, 0.0 # Test metrics pred_eval, _, _ = self.model() pred_masked = torch.masked_select(pred_eval, self.test_mask_bool).cpu().numpy() try: auc = roc_auc_score(true_data, pred_masked) auprc = average_precision_score(true_data, pred_masked) except ValueError: auc, auprc = 0.0, 0.0 # Update best metrics and weights if auc > self.best_auc: self.best_auc = auc self.best_auprc = auprc self.best_weights = self.model.state_dict().copy() self.counter = 0 else: self.counter += 1 # Log progress if epoch % self.test_freq == 0 or epoch == self.epochs - 1: print(f"Epoch {epoch}: Loss={total_loss.item():.4f}, " f"Train AUC={train_auc:.4f}, Train AUPRC={train_auprc:.4f}, " f"Test AUC={auc:.4f}, Test AUPRC={auprc:.4f}") # Early stopping if self.counter >= self.patience: print(f"\nEarly stopping at epoch {epoch}: No AUC improvement for {self.patience} epochs.") break # Load best weights if self.best_weights is not None: self.model.load_state_dict(self.best_weights) # Final evaluation self.model.eval() with torch.no_grad(): final_pred, _, _ = self.model() final_pred_masked = torch.masked_select(final_pred, self.test_mask_bool).cpu().numpy() best_auc = roc_auc_score(true_data, final_pred_masked) best_auprc = average_precision_score(true_data, final_pred_masked) print("\nBest Metrics (Test Data):") print(f"AUC: {best_auc:.4f}") print(f"AUPRC: {best_auprc:.4f}") return true_data, final_pred_masked, best_auc, best_auprc