import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.nn import MultiheadAttention, TransformerEncoder, TransformerEncoderLayer from scipy.stats import pearsonr, spearmanr from utils import mse_loss, torch_corr_x_y class ConstructAdjMatrix(nn.Module): """Constructs adjacency matrices for graph-based operations.""" def __init__(self, original_adj_mat, device="cpu"): super().__init__() # Convert numpy array to torch tensor if needed self.adj = torch.from_numpy(original_adj_mat).float() if isinstance(original_adj_mat, np.ndarray) else original_adj_mat self.adj = self.adj.to(device) self.device = device def forward(self): """Computes Laplacian matrices for cells and drugs.""" with torch.no_grad(): # Compute degree matrices for normalization d_x = torch.diag(torch.pow(torch.sum(self.adj, dim=1) + 1, -0.5)) d_y = torch.diag(torch.pow(torch.sum(self.adj, dim=0) + 1, -0.5)) # Compute aggregated Laplacian matrices agg_cell_lp = torch.mm(torch.mm(d_x, self.adj), d_y) agg_drug_lp = torch.mm(torch.mm(d_y, self.adj.T), d_x) # Compute self-loop Laplacian matrices self_cell_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=1) + 1, -1), 1)) self_drug_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=0) + 1, -1), 1)) return agg_cell_lp.to(self.device), agg_drug_lp.to(self.device), self_cell_lp.to(self.device), self_drug_lp.to(self.device) class LoadFeature(nn.Module): """Loads and processes cell and drug features.""" def __init__(self, cell_exprs, drug_fingerprints, device="cpu"): super().__init__() self.device = device # Convert input data to torch tensors self.cell_exprs = torch.from_numpy(cell_exprs).float().to(device) self.drug_fingerprints = [torch.from_numpy(fp).float().to(device) for fp in drug_fingerprints] # Define projection layers for drug fingerprints self.drug_proj = nn.ModuleList([ nn.Sequential( nn.Linear(fp.shape[1], 512), nn.BatchNorm1d(512), nn.GELU(), nn.Dropout(0.3) ).to(device) for fp in drug_fingerprints ]) # Initialize transformer encoder for drug features self.transformer = TransformerEncoder( TransformerEncoderLayer(d_model=512, nhead=8, dim_feedforward=2048, batch_first=True), num_layers=3 ).to(device) # Normalization layers self.cell_norm = nn.LayerNorm(cell_exprs.shape[1]).to(device) self.drug_norm = nn.LayerNorm(512).to(device) # Cell feature encoder self.cell_encoder = nn.Sequential( nn.Linear(cell_exprs.shape[1], 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.3), nn.Linear(1024, 512) ).to(device) def forward(self): """Processes cell and drug features to generate encoded representations.""" # Normalize and encode cell features cell_feat = self.cell_norm(self.cell_exprs) cell_encoded = self.cell_encoder(cell_feat) # Project and process drug fingerprints projected = [proj(fp) for proj, fp in zip(self.drug_proj, self.drug_fingerprints)] stacked = torch.stack(projected, dim=1) drug_feat = self.transformer(stacked) drug_feat = self.drug_norm(drug_feat.mean(dim=1)) return cell_encoded, drug_feat class GEncoder(nn.Module): """Encodes cell and drug features using graph-based operations.""" def __init__(self, agg_c_lp, agg_d_lp, self_c_lp, self_d_lp, device="cpu"): super().__init__() self.agg_c_lp = agg_c_lp self.agg_d_lp = agg_d_lp self.self_c_lp = self_c_lp self.self_d_lp = self_d_lp self.device = device # Cell feature encoder self.cell_encoder = nn.Sequential( nn.Linear(512, 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.3), nn.Linear(1024, 512) ).to(device) # Drug feature encoder self.drug_encoder = nn.Sequential( nn.Linear(512, 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.3), nn.Linear(1024, 512) ).to(device) # Attention mechanism and residual connection self.attention = MultiheadAttention(embed_dim=512, num_heads=8, batch_first=True).to(device) self.residual = nn.Linear(512, 512).to(device) self.fc = nn.Sequential( nn.Linear(1024, 512), nn.BatchNorm1d(512), nn.GELU(), nn.Dropout(0.2) ).to(device) def forward(self, cell_f, drug_f): """Encodes cell and drug features with graph-based attention.""" # Aggregate features using Laplacian matrices cell_agg = torch.mm(self.agg_c_lp, drug_f) + torch.mm(self.self_c_lp, cell_f) drug_agg = torch.mm(self.agg_d_lp, cell_f) + torch.mm(self.self_d_lp, drug_f) # Encode aggregated features cell_fc = self.cell_encoder(cell_agg) drug_fc = self.drug_encoder(drug_agg) # Apply attention mechanism attn_output, _ = self.attention( query=cell_fc.unsqueeze(0), key=drug_fc.unsqueeze(0), value=drug_fc.unsqueeze(0) ) attn_output = attn_output.squeeze(0) cell_emb = cell_fc + self.residual(attn_output) # Apply final activation cell_emb = F.gelu(cell_emb) drug_emb = F.gelu(drug_fc) return cell_emb, drug_emb class GDecoder(nn.Module): """Decodes combined cell and drug embeddings to predict scores.""" def __init__(self, emb_dim, gamma): super().__init__() self.gamma = gamma # Decoder network self.decoder = nn.Sequential( nn.Linear(2 * emb_dim, 1024), nn.BatchNorm1d(1024), nn.GELU(), nn.Dropout(0.2), nn.Linear(1024, 1) ) # Learnable correlation weight self.corr_weight = nn.Parameter(torch.tensor(0.5)) def forward(self, cell_emb, drug_emb): """Generates prediction scores from cell and drug embeddings.""" # Combine cell and drug embeddings cell_exp = cell_emb.unsqueeze(1).repeat(1, drug_emb.size(0), 1) drug_exp = drug_emb.unsqueeze(0).repeat(cell_emb.size(0), 1, 1) combined = torch.cat([cell_exp, drug_exp], dim=-1) # Decode combined features scores = self.decoder(combined.view(-1, 2 * cell_emb.size(1))).view(cell_emb.size(0), drug_emb.size(0)) corr = torch_corr_x_y(cell_emb, drug_emb) # Combine scores and correlation return self.gamma * (self.corr_weight * scores + (1 - self.corr_weight) * corr) class DeepTraCDR(nn.Module): """Graph Convolutional Network for cell-drug interaction prediction.""" def __init__(self, adj_mat, cell_exprs, drug_finger, layer_size, gamma, device="cpu"): super().__init__() self.device = device # Convert adjacency matrix to tensor if needed self.adj_mat = torch.from_numpy(adj_mat).float().to(device) if isinstance(adj_mat, np.ndarray) else adj_mat.to(device) # Initialize components self.construct_adj = ConstructAdjMatrix(self.adj_mat, device=device) self.load_feat = LoadFeature(cell_exprs, drug_finger, device=device) # Precompute adjacency matrices agg_c, agg_d, self_c, self_d = self.construct_adj() # Initialize encoder and decoder self.encoder = GEncoder(agg_c, agg_d, self_c, self_d, device=device).to(device) self.decoder = GDecoder(layer_size[-1], gamma).to(device) def forward(self): """Generates predictions and embeddings for cell-drug interactions.""" cell_f, drug_f = self.load_feat() cell_emb, drug_emb = self.encoder(cell_f, drug_f) return self.decoder(cell_emb, drug_emb), cell_emb, drug_emb class Optimizer: """Handles training and evaluation of the DeepTraCDR model.""" def __init__(self, model, train_data, test_data, test_mask, train_mask, adj_matrix, lr=0.001, wd=1e-05, epochs=200, test_freq=20, device="cpu", patience=50): self.model = model.to(device) self.train_data = train_data.float().to(device) self.test_data = test_data.float().to(device) self.train_mask = train_mask.to(device) self.test_mask = test_mask.to(device).bool() self.adj_matrix = adj_matrix.to(device) self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, weight_decay=wd) self.epochs = epochs self.test_freq = test_freq self.patience = patience self.best_rmse = float('inf') self.epochs_no_improve = 0 self.true_masked_np = torch.masked_select(self.test_data, self.test_mask).cpu().numpy() def evaluate_metrics(self, pred_tensor): """Computes RMSE, PCC, and SCC for model evaluation.""" pred_masked_np = torch.masked_select(pred_tensor, self.test_mask).cpu().numpy() rmse = np.sqrt(np.mean((self.true_masked_np - pred_masked_np)**2)) pcc, _ = pearsonr(self.true_masked_np, pred_masked_np) scc, _ = spearmanr(self.true_masked_np, pred_masked_np) return rmse, pcc, scc, pred_masked_np def train(self): """Trains the model with early stopping based on RMSE.""" best_rmse = float('inf') best_pcc = 0.0 best_scc = 0.0 best_pred_np = None for epoch in range(self.epochs): self.model.train() pred, cell_emb, drug_emb = self.model() # Compute and optimize loss mse_loss_val = mse_loss(self.train_data, pred, self.train_mask) total_loss = mse_loss_val self.optimizer.zero_grad() total_loss.backward() self.optimizer.step() if epoch % self.test_freq == 0: self.model.eval() with torch.no_grad(): pred_eval, _, _ = self.model() rmse, pcc, scc, pred_masked = self.evaluate_metrics(pred_eval) # Update early stopping criteria if rmse < self.best_rmse: self.best_rmse = rmse self.epochs_no_improve = 0 else: self.epochs_no_improve += 1 # Track best results if rmse < best_rmse: best_rmse = rmse best_pcc = pcc best_scc = scc best_pred_np = pred_masked.copy() print(f"Epoch {epoch}: Loss = {total_loss.item():.4f}, RMSE = {rmse:.4f}, PCC = {pcc:.4f}, SCC = {scc:.4f}") # Early stopping if self.epochs_no_improve >= self.patience: print(f"Early stopping at epoch {epoch} (no improvement for {self.patience} epochs).") break print("\nBest Results:") print(f"RMSE: {best_rmse:.4f}") print(f"PCC: {best_pcc:.4f}") print(f"SCC: {best_scc:.4f}") return self.true_masked_np, best_pred_np, best_rmse, best_pcc, best_scc