|  |  | @@ -0,0 +1,426 @@ | 
		
	
		
			
			|  |  |  | # model.py | 
		
	
		
			
			|  |  |  | import numpy as np | 
		
	
		
			
			|  |  |  | import torch | 
		
	
		
			
			|  |  |  | import torch.nn as nn | 
		
	
		
			
			|  |  |  | import torch.nn.functional as F | 
		
	
		
			
			|  |  |  | from torch.nn import MultiheadAttention, TransformerEncoder, TransformerEncoderLayer | 
		
	
		
			
			|  |  |  | from utils import torch_corr_x_y, cross_entropy_loss, prototypical_loss | 
		
	
		
			
			|  |  |  | from sklearn.metrics import roc_auc_score, average_precision_score | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | class ConstructAdjMatrix(nn.Module): | 
		
	
		
			
			|  |  |  | """Module to construct normalized adjacency matrices for graph-based operations.""" | 
		
	
		
			
			|  |  |  | def __init__(self, original_adj_mat, device="cpu"): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Initialize the adjacency matrix constructor. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | original_adj_mat (np.ndarray or torch.Tensor): Original adjacency matrix. | 
		
	
		
			
			|  |  |  | device (str): Device to perform computations on (e.g., 'cpu' or 'cuda:0'). | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | super().__init__() | 
		
	
		
			
			|  |  |  | # Convert NumPy array to PyTorch tensor if necessary | 
		
	
		
			
			|  |  |  | if isinstance(original_adj_mat, np.ndarray): | 
		
	
		
			
			|  |  |  | original_adj_mat = torch.from_numpy(original_adj_mat).float() | 
		
	
		
			
			|  |  |  | self.adj = original_adj_mat.to(device) | 
		
	
		
			
			|  |  |  | self.device = device | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def forward(self): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Compute normalized adjacency matrices for cells and drugs. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | tuple: (agg_cell_lp, agg_drug_lp, self_cell_lp, self_drug_lp) normalized matrices. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | with torch.no_grad(): | 
		
	
		
			
			|  |  |  | # Compute degree-normalized matrices | 
		
	
		
			
			|  |  |  | d_x = torch.diag(torch.pow(torch.sum(self.adj, dim=1) + 1, -0.5)) | 
		
	
		
			
			|  |  |  | d_y = torch.diag(torch.pow(torch.sum(self.adj, dim=0) + 1, -0.5)) | 
		
	
		
			
			|  |  |  | agg_cell_lp = torch.mm(torch.mm(d_x, self.adj), d_y) | 
		
	
		
			
			|  |  |  | agg_drug_lp = torch.mm(torch.mm(d_y, self.adj.T), d_x) | 
		
	
		
			
			|  |  |  | self_cell_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=1) + 1, -1), 1)) | 
		
	
		
			
			|  |  |  | self_drug_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=0) + 1, -1), 1)) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | return ( | 
		
	
		
			
			|  |  |  | agg_cell_lp.to(self.device), | 
		
	
		
			
			|  |  |  | agg_drug_lp.to(self.device), | 
		
	
		
			
			|  |  |  | self_cell_lp.to(self.device), | 
		
	
		
			
			|  |  |  | self_drug_lp.to(self.device) | 
		
	
		
			
			|  |  |  | ) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | class LoadFeature(nn.Module): | 
		
	
		
			
			|  |  |  | """Module to load and preprocess cell and drug features.""" | 
		
	
		
			
			|  |  |  | def __init__(self, cell_exprs, drug_fingerprints, device="cpu"): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Initialize feature loading and preprocessing layers. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | cell_exprs (np.ndarray): Cell expression data. | 
		
	
		
			
			|  |  |  | drug_fingerprints (list): List of drug fingerprint arrays. | 
		
	
		
			
			|  |  |  | device (str): Device to perform computations on. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | super().__init__() | 
		
	
		
			
			|  |  |  | self.device = device | 
		
	
		
			
			|  |  |  | self.cell_exprs = torch.from_numpy(cell_exprs).float().to(device) | 
		
	
		
			
			|  |  |  | self.drug_fingerprints = [torch.from_numpy(fp).float().to(device) for fp in drug_fingerprints] | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Projection layers for drug fingerprints | 
		
	
		
			
			|  |  |  | self.drug_proj = nn.ModuleList([ | 
		
	
		
			
			|  |  |  | nn.Sequential( | 
		
	
		
			
			|  |  |  | nn.Linear(fp.shape[1], 512), | 
		
	
		
			
			|  |  |  | nn.BatchNorm1d(512), | 
		
	
		
			
			|  |  |  | nn.GELU(), | 
		
	
		
			
			|  |  |  | nn.Dropout(0.3) | 
		
	
		
			
			|  |  |  | ).to(device) for fp in drug_fingerprints | 
		
	
		
			
			|  |  |  | ]) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Transformer encoder for drug features | 
		
	
		
			
			|  |  |  | self.transformer = TransformerEncoder( | 
		
	
		
			
			|  |  |  | TransformerEncoderLayer( | 
		
	
		
			
			|  |  |  | d_model=512, | 
		
	
		
			
			|  |  |  | nhead=8, | 
		
	
		
			
			|  |  |  | dim_feedforward=2048, | 
		
	
		
			
			|  |  |  | batch_first=True | 
		
	
		
			
			|  |  |  | ), | 
		
	
		
			
			|  |  |  | num_layers=3 | 
		
	
		
			
			|  |  |  | ).to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Normalization layers | 
		
	
		
			
			|  |  |  | self.cell_norm = nn.LayerNorm(cell_exprs.shape[1]).to(device) | 
		
	
		
			
			|  |  |  | self.drug_norm = nn.LayerNorm(512).to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Encoder for cell features | 
		
	
		
			
			|  |  |  | self.cell_encoder = nn.Sequential( | 
		
	
		
			
			|  |  |  | nn.Linear(cell_exprs.shape[1], 1024), | 
		
	
		
			
			|  |  |  | nn.BatchNorm1d(1024), | 
		
	
		
			
			|  |  |  | nn.GELU(), | 
		
	
		
			
			|  |  |  | nn.Dropout(0.3), | 
		
	
		
			
			|  |  |  | nn.Linear(1024, 512) | 
		
	
		
			
			|  |  |  | ).to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def forward(self): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Process cell and drug features through encoding and transformation. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | tuple: (cell_encoded, drug_feat) encoded cell and drug features. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | # Normalize and encode cell features | 
		
	
		
			
			|  |  |  | cell_feat = self.cell_norm(self.cell_exprs) | 
		
	
		
			
			|  |  |  | cell_encoded = self.cell_encoder(cell_feat) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Project and transform drug fingerprints | 
		
	
		
			
			|  |  |  | projected = [proj(fp) for proj, fp in zip(self.drug_proj, self.drug_fingerprints)] | 
		
	
		
			
			|  |  |  | stacked = torch.stack(projected, dim=1) | 
		
	
		
			
			|  |  |  | drug_feat = self.transformer(stacked) | 
		
	
		
			
			|  |  |  | drug_feat = self.drug_norm(drug_feat.mean(dim=1)) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | return cell_encoded, drug_feat | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | class GEncoder(nn.Module): | 
		
	
		
			
			|  |  |  | """Graph encoder module for processing cell and drug embeddings.""" | 
		
	
		
			
			|  |  |  | def __init__(self, agg_c_lp, agg_d_lp, self_c_lp, self_d_lp, device="cpu"): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Initialize the graph encoder. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | agg_c_lp (torch.Tensor): Aggregated cell Laplacian matrix. | 
		
	
		
			
			|  |  |  | agg_d_lp (torch.Tensor): Aggregated drug Laplacian matrix. | 
		
	
		
			
			|  |  |  | self_c_lp (torch.Tensor): Self-loop cell Laplacian matrix. | 
		
	
		
			
			|  |  |  | self_d_lp (torch.Tensor): Self-loop drug Laplacian matrix. | 
		
	
		
			
			|  |  |  | device (str): Device to perform computations on. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | super().__init__() | 
		
	
		
			
			|  |  |  | self.agg_c_lp = agg_c_lp | 
		
	
		
			
			|  |  |  | self.agg_d_lp = agg_d_lp | 
		
	
		
			
			|  |  |  | self.self_c_lp = self_c_lp | 
		
	
		
			
			|  |  |  | self.self_d_lp = self_d_lp | 
		
	
		
			
			|  |  |  | self.device = device | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Cell feature encoder | 
		
	
		
			
			|  |  |  | self.cell_encoder = nn.Sequential( | 
		
	
		
			
			|  |  |  | nn.Linear(512, 1024), | 
		
	
		
			
			|  |  |  | nn.BatchNorm1d(1024), | 
		
	
		
			
			|  |  |  | nn.GELU(), | 
		
	
		
			
			|  |  |  | nn.Dropout(0.3), | 
		
	
		
			
			|  |  |  | nn.Linear(1024, 512) | 
		
	
		
			
			|  |  |  | ).to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Drug feature encoder | 
		
	
		
			
			|  |  |  | self.drug_encoder = nn.Sequential( | 
		
	
		
			
			|  |  |  | nn.Linear(512, 1024), | 
		
	
		
			
			|  |  |  | nn.BatchNorm1d(1024), | 
		
	
		
			
			|  |  |  | nn.GELU(), | 
		
	
		
			
			|  |  |  | nn.Dropout(0.3), | 
		
	
		
			
			|  |  |  | nn.Linear(1024, 512) | 
		
	
		
			
			|  |  |  | ).to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Attention mechanism | 
		
	
		
			
			|  |  |  | self.attention = MultiheadAttention(embed_dim=512, num_heads=8, batch_first=True).to(device) | 
		
	
		
			
			|  |  |  | self.residual = nn.Linear(512, 512).to(device) | 
		
	
		
			
			|  |  |  | self.fc = nn.Sequential( | 
		
	
		
			
			|  |  |  | nn.Linear(1024, 512), | 
		
	
		
			
			|  |  |  | nn.BatchNorm1d(512), | 
		
	
		
			
			|  |  |  | nn.GELU(), | 
		
	
		
			
			|  |  |  | nn.Dropout(0.2) | 
		
	
		
			
			|  |  |  | ).to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def forward(self, cell_f, drug_f): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Encode cell and drug features using graph-based aggregation and attention. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | cell_f (torch.Tensor): Cell features. | 
		
	
		
			
			|  |  |  | drug_f (torch.Tensor): Drug features. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | tuple: (cell_emb, drug_emb) encoded embeddings. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | # Aggregate features using Laplacian matrices | 
		
	
		
			
			|  |  |  | cell_agg = torch.mm(self.agg_c_lp, drug_f) + torch.mm(self.self_c_lp, cell_f) | 
		
	
		
			
			|  |  |  | drug_agg = torch.mm(self.agg_d_lp, cell_f) + torch.mm(self.self_d_lp, drug_f) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Encode aggregated features | 
		
	
		
			
			|  |  |  | cell_fc = self.cell_encoder(cell_agg) | 
		
	
		
			
			|  |  |  | drug_fc = self.drug_encoder(drug_agg) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Apply attention mechanism | 
		
	
		
			
			|  |  |  | attn_output, _ = self.attention( | 
		
	
		
			
			|  |  |  | query=cell_fc.unsqueeze(0), | 
		
	
		
			
			|  |  |  | key=drug_fc.unsqueeze(0), | 
		
	
		
			
			|  |  |  | value=drug_fc.unsqueeze(0) | 
		
	
		
			
			|  |  |  | ) | 
		
	
		
			
			|  |  |  | attn_output = attn_output.squeeze(0) | 
		
	
		
			
			|  |  |  | cell_emb = cell_fc + self.residual(attn_output) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Apply final activation | 
		
	
		
			
			|  |  |  | cell_emb = F.gelu(cell_emb) | 
		
	
		
			
			|  |  |  | drug_emb = F.gelu(drug_fc) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | return cell_emb, drug_emb | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | class GDecoder(nn.Module): | 
		
	
		
			
			|  |  |  | """Decoder module to predict interaction scores from embeddings.""" | 
		
	
		
			
			|  |  |  | def __init__(self, emb_dim, gamma): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Initialize the decoder. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | emb_dim (int): Embedding dimension. | 
		
	
		
			
			|  |  |  | gamma (float): Scaling factor for output scores. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | super().__init__() | 
		
	
		
			
			|  |  |  | self.gamma = gamma | 
		
	
		
			
			|  |  |  | self.decoder = nn.Sequential( | 
		
	
		
			
			|  |  |  | nn.Linear(2 * emb_dim, 1024), | 
		
	
		
			
			|  |  |  | nn.BatchNorm1d(1024), | 
		
	
		
			
			|  |  |  | nn.GELU(), | 
		
	
		
			
			|  |  |  | nn.Dropout(0.2), | 
		
	
		
			
			|  |  |  | nn.Linear(1024, 1) | 
		
	
		
			
			|  |  |  | ) | 
		
	
		
			
			|  |  |  | self.corr_weight = nn.Parameter(torch.tensor(0.5)) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def forward(self, cell_emb, drug_emb): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Decode embeddings to predict interaction scores. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | cell_emb (torch.Tensor): Cell embeddings. | 
		
	
		
			
			|  |  |  | drug_emb (torch.Tensor): Drug embeddings. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | torch.Tensor: Predicted interaction scores. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | # Expand embeddings for pairwise combinations | 
		
	
		
			
			|  |  |  | cell_exp = cell_emb.unsqueeze(1).repeat(1, drug_emb.size(0), 1) | 
		
	
		
			
			|  |  |  | drug_exp = drug_emb.unsqueeze(0).repeat(cell_emb.size(0), 1, 1) | 
		
	
		
			
			|  |  |  | combined = torch.cat([cell_exp, drug_exp], dim=-1) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Decode combined embeddings | 
		
	
		
			
			|  |  |  | scores = self.decoder(combined.view(-1, 2 * cell_emb.size(1))).view(cell_emb.size(0), drug_emb.size(0)) | 
		
	
		
			
			|  |  |  | corr = torch_corr_x_y(cell_emb, drug_emb) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Combine scores with correlation and apply sigmoid | 
		
	
		
			
			|  |  |  | return torch.sigmoid(self.gamma * (self.corr_weight * scores + (1 - self.corr_weight) * corr)) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | class DeepTraCDR(nn.Module): | 
		
	
		
			
			|  |  |  | """DeepTraCDR model for cell-drug interaction prediction.""" | 
		
	
		
			
			|  |  |  | def __init__(self, adj_mat, cell_exprs, drug_finger, layer_size, gamma, device="cpu"): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Initialize the DeepTraCDR model. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | adj_mat (np.ndarray or torch.Tensor): Adjacency matrix. | 
		
	
		
			
			|  |  |  | cell_exprs (np.ndarray): Cell expression data. | 
		
	
		
			
			|  |  |  | drug_finger (list): Drug fingerprints. | 
		
	
		
			
			|  |  |  | layer_size (list): Layer sizes for the model. | 
		
	
		
			
			|  |  |  | gamma (float): Scaling factor for decoder. | 
		
	
		
			
			|  |  |  | device (str): Device to perform computations on. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | super().__init__() | 
		
	
		
			
			|  |  |  | self.device = device | 
		
	
		
			
			|  |  |  | if isinstance(adj_mat, np.ndarray): | 
		
	
		
			
			|  |  |  | adj_mat = torch.from_numpy(adj_mat).float() | 
		
	
		
			
			|  |  |  | self.adj_mat = adj_mat.to(device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Initialize submodules | 
		
	
		
			
			|  |  |  | self.construct_adj = ConstructAdjMatrix(self.adj_mat, device=device) | 
		
	
		
			
			|  |  |  | self.load_feat = LoadFeature(cell_exprs, drug_finger, device=device) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Compute fixed adjacency matrices | 
		
	
		
			
			|  |  |  | agg_c, agg_d, self_c, self_d = self.construct_adj() | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | self.encoder = GEncoder(agg_c, agg_d, self_c, self_d, device=device).to(device) | 
		
	
		
			
			|  |  |  | self.decoder = GDecoder(512, gamma).to(device)  # Fixed emb_dim to 512 | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def forward(self): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Forward pass through the DeepTraCDR model. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | tuple: (predicted_scores, cell_emb, drug_emb) predicted scores and embeddings. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | cell_f, drug_f = self.load_feat() | 
		
	
		
			
			|  |  |  | cell_emb, drug_emb = self.encoder(cell_f, drug_f) | 
		
	
		
			
			|  |  |  | return self.decoder(cell_emb, drug_emb), cell_emb, drug_emb | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def get_cell_embeddings(self): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Retrieve cell embeddings from the model. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | torch.Tensor: Cell embeddings. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | cell_f, drug_f = self.load_feat() | 
		
	
		
			
			|  |  |  | cell_emb, _ = self.encoder(cell_f, drug_f) | 
		
	
		
			
			|  |  |  | return cell_emb | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | class Optimizer: | 
		
	
		
			
			|  |  |  | """Optimizer class for training the DeepTraCDR model with early stopping.""" | 
		
	
		
			
			|  |  |  | def __init__(self, model, train_data, test_data, test_mask, train_mask, adj_matrix, evaluate_fun, | 
		
	
		
			
			|  |  |  | lr=0.001, wd=1e-05, epochs=200, test_freq=20, patience=50, device="cpu"): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Initialize the optimizer. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Args: | 
		
	
		
			
			|  |  |  | model (nn.Module): DeepTraCDR model to optimize. | 
		
	
		
			
			|  |  |  | train_data (torch.Tensor): Training data. | 
		
	
		
			
			|  |  |  | test_data (torch.Tensor): Test data. | 
		
	
		
			
			|  |  |  | test_mask (torch.Tensor): Test mask. | 
		
	
		
			
			|  |  |  | train_mask (torch.Tensor): Training mask. | 
		
	
		
			
			|  |  |  | adj_matrix (torch.Tensor): Adjacency matrix. | 
		
	
		
			
			|  |  |  | evaluate_fun (callable): Evaluation function. | 
		
	
		
			
			|  |  |  | lr (float): Learning rate. | 
		
	
		
			
			|  |  |  | wd (float): Weight decay. | 
		
	
		
			
			|  |  |  | epochs (int): Number of training epochs. | 
		
	
		
			
			|  |  |  | test_freq (int): Frequency of evaluation. | 
		
	
		
			
			|  |  |  | patience (int): Patience for early stopping. | 
		
	
		
			
			|  |  |  | device (str): Device to perform computations on. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | self.model = model.to(device) | 
		
	
		
			
			|  |  |  | self.train_data = train_data.float().to(device) | 
		
	
		
			
			|  |  |  | self.test_data = test_data.float().to(device) | 
		
	
		
			
			|  |  |  | self.train_mask = train_mask.to(device) | 
		
	
		
			
			|  |  |  | self.test_mask_bool = test_mask.to(device).bool() | 
		
	
		
			
			|  |  |  | self.adj_matrix = adj_matrix.to(device) | 
		
	
		
			
			|  |  |  | self.evaluate_fun = evaluate_fun | 
		
	
		
			
			|  |  |  | self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, weight_decay=wd) | 
		
	
		
			
			|  |  |  | self.epochs = epochs | 
		
	
		
			
			|  |  |  | self.test_freq = test_freq | 
		
	
		
			
			|  |  |  | self.patience = patience | 
		
	
		
			
			|  |  |  | self.best_auc = 0.0 | 
		
	
		
			
			|  |  |  | self.best_auprc = 0.0 | 
		
	
		
			
			|  |  |  | self.best_weights = None | 
		
	
		
			
			|  |  |  | self.counter = 0 | 
		
	
		
			
			|  |  |  | self.device = device | 
		
	
		
			
			|  |  |  | self.best_epoch_auc = None | 
		
	
		
			
			|  |  |  | self.best_epoch_auprc = None | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | def train(self): | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | Train the model with early stopping and evaluate performance. | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | Returns: | 
		
	
		
			
			|  |  |  | tuple: (true_data, final_pred_masked, best_auc, best_auprc) evaluation results. | 
		
	
		
			
			|  |  |  | """ | 
		
	
		
			
			|  |  |  | true_data = torch.masked_select(self.test_data, self.test_mask_bool).cpu().numpy() | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | for epoch in range(self.epochs): | 
		
	
		
			
			|  |  |  | self.model.train() | 
		
	
		
			
			|  |  |  | # Forward pass and compute loss | 
		
	
		
			
			|  |  |  | pred_train, cell_emb, drug_emb = self.model() | 
		
	
		
			
			|  |  |  | ce_loss = cross_entropy_loss(self.train_data, pred_train, self.train_mask) | 
		
	
		
			
			|  |  |  | proto_loss = prototypical_loss(cell_emb, drug_emb, self.adj_matrix) | 
		
	
		
			
			|  |  |  | total_loss = 0.7 * ce_loss + 0.3 * proto_loss | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Backpropagation | 
		
	
		
			
			|  |  |  | self.optimizer.zero_grad() | 
		
	
		
			
			|  |  |  | total_loss.backward() | 
		
	
		
			
			|  |  |  | self.optimizer.step() | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | self.model.eval() | 
		
	
		
			
			|  |  |  | with torch.no_grad(): | 
		
	
		
			
			|  |  |  | # Evaluate training performance | 
		
	
		
			
			|  |  |  | train_pred, _, _ = self.model() | 
		
	
		
			
			|  |  |  | train_pred_masked = torch.masked_select(train_pred, self.train_mask).cpu().numpy() | 
		
	
		
			
			|  |  |  | train_true_data = torch.masked_select(self.train_data, self.train_mask).cpu().numpy() | 
		
	
		
			
			|  |  |  | try: | 
		
	
		
			
			|  |  |  | train_auc = roc_auc_score(train_true_data, train_pred_masked) | 
		
	
		
			
			|  |  |  | train_auprc = average_precision_score(train_true_data, train_pred_masked) | 
		
	
		
			
			|  |  |  | except ValueError: | 
		
	
		
			
			|  |  |  | train_auc, train_auprc = 0.0, 0.0 | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Evaluate test performance | 
		
	
		
			
			|  |  |  | pred_eval, _, _ = self.model() | 
		
	
		
			
			|  |  |  | pred_masked = torch.masked_select(pred_eval, self.test_mask_bool).cpu().numpy() | 
		
	
		
			
			|  |  |  | try: | 
		
	
		
			
			|  |  |  | auc = roc_auc_score(true_data, pred_masked) | 
		
	
		
			
			|  |  |  | auprc = average_precision_score(true_data, pred_masked) | 
		
	
		
			
			|  |  |  | except ValueError: | 
		
	
		
			
			|  |  |  | auc, auprc = 0.0, 0.0 | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Update best metrics and weights | 
		
	
		
			
			|  |  |  | if auc > self.best_auc: | 
		
	
		
			
			|  |  |  | self.best_auc = auc | 
		
	
		
			
			|  |  |  | self.best_auprc = auprc | 
		
	
		
			
			|  |  |  | self.best_weights = self.model.state_dict().copy() | 
		
	
		
			
			|  |  |  | self.counter = 0 | 
		
	
		
			
			|  |  |  | self.best_epoch_auc = auc | 
		
	
		
			
			|  |  |  | self.best_epoch_auprc = auprc | 
		
	
		
			
			|  |  |  | else: | 
		
	
		
			
			|  |  |  | self.counter += 1 | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Log progress | 
		
	
		
			
			|  |  |  | if epoch % self.test_freq == 0 or epoch == self.epochs - 1: | 
		
	
		
			
			|  |  |  | print(f"Epoch {epoch}: Loss={total_loss.item():.4f}, Train AUC={train_auc:.4f}, " | 
		
	
		
			
			|  |  |  | f"Train AUPRC={train_auprc:.4f}, Test AUC={auc:.4f}, Test AUPRC={auprc:.4f}") | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Check early stopping | 
		
	
		
			
			|  |  |  | if self.counter >= self.patience: | 
		
	
		
			
			|  |  |  | print(f"\nEarly stopping triggered at epoch {epoch}!") | 
		
	
		
			
			|  |  |  | print(f"No improvement in AUC for {self.patience} consecutive epochs.") | 
		
	
		
			
			|  |  |  | break | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Restore best weights | 
		
	
		
			
			|  |  |  | if self.best_weights is not None: | 
		
	
		
			
			|  |  |  | self.model.load_state_dict(self.best_weights) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Final evaluation | 
		
	
		
			
			|  |  |  | self.model.eval() | 
		
	
		
			
			|  |  |  | with torch.no_grad(): | 
		
	
		
			
			|  |  |  | final_pred, _, _ = self.model() | 
		
	
		
			
			|  |  |  | final_pred_masked = torch.masked_select(final_pred, self.test_mask_bool).cpu().numpy() | 
		
	
		
			
			|  |  |  | best_auc = roc_auc_score(true_data, final_pred_masked) | 
		
	
		
			
			|  |  |  | best_auprc = average_precision_score(true_data, final_pred_masked) | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | # Log final results | 
		
	
		
			
			|  |  |  | print("\nBest Metrics After Training (on Test Data):") | 
		
	
		
			
			|  |  |  | print(f"AUC: {self.best_auc:.4f}") | 
		
	
		
			
			|  |  |  | print(f"AUPRC: {self.best_auprc:.4f}") | 
		
	
		
			
			|  |  |  |  | 
		
	
		
			
			|  |  |  | return true_data, final_pred_masked, self.best_auc, self.best_auprc |