|
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
import torch
|
|
|
|
|
|
import torch.nn as nn
|
|
|
|
|
|
import torch.nn.functional as F
|
|
|
|
|
|
from torch.nn import MultiheadAttention, TransformerEncoder, TransformerEncoderLayer
|
|
|
|
|
|
from utils import torch_corr_x_y, cross_entropy_loss, prototypical_loss
|
|
|
|
|
|
from sklearn.metrics import roc_auc_score, average_precision_score, precision_score, recall_score, f1_score
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ConstructAdjMatrix(nn.Module):
|
|
|
|
|
|
"""Constructs normalized adjacency matrices for graph-based computations."""
|
|
|
|
|
|
def __init__(self, original_adj_mat, device="cpu"):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self.adj = torch.from_numpy(original_adj_mat).float().to(device) if isinstance(original_adj_mat, np.ndarray) else original_adj_mat.to(device)
|
|
|
|
|
|
self.device = device
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self):
|
|
|
|
|
|
"""Computes normalized Laplacian matrices for cells and drugs."""
|
|
|
|
|
|
with torch.no_grad():
|
|
|
|
|
|
# Compute degree matrices for normalization
|
|
|
|
|
|
d_x = torch.diag(torch.pow(torch.sum(self.adj, dim=1) + 1, -0.5))
|
|
|
|
|
|
d_y = torch.diag(torch.pow(torch.sum(self.adj, dim=0) + 1, -0.5))
|
|
|
|
|
|
|
|
|
|
|
|
# Aggregate cell and drug Laplacian matrices
|
|
|
|
|
|
agg_cell_lp = torch.mm(torch.mm(d_x, self.adj), d_y)
|
|
|
|
|
|
agg_drug_lp = torch.mm(torch.mm(d_y, self.adj.T), d_x)
|
|
|
|
|
|
|
|
|
|
|
|
# Self-loop matrices for cells and drugs
|
|
|
|
|
|
self_cell_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=1) + 1, -1), 1))
|
|
|
|
|
|
self_drug_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=0) + 1, -1), 1))
|
|
|
|
|
|
|
|
|
|
|
|
return agg_cell_lp.to(self.device), agg_drug_lp.to(self.device), self_cell_lp.to(self.device), self_drug_lp.to(self.device)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class LoadFeature(nn.Module):
|
|
|
|
|
|
"""Loads and processes cell expression and drug fingerprint features."""
|
|
|
|
|
|
def __init__(self, cell_exprs, drug_fingerprints, device="cpu"):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self.device = device
|
|
|
|
|
|
self.cell_exprs = torch.from_numpy(cell_exprs).float().to(device)
|
|
|
|
|
|
self.drug_fingerprints = [torch.from_numpy(fp).float().to(device) for fp in drug_fingerprints]
|
|
|
|
|
|
|
|
|
|
|
|
# Drug feature projection layers
|
|
|
|
|
|
self.drug_proj = nn.ModuleList([
|
|
|
|
|
|
nn.Sequential(
|
|
|
|
|
|
nn.Linear(fp.shape[1], 512),
|
|
|
|
|
|
nn.BatchNorm1d(512),
|
|
|
|
|
|
nn.GELU(),
|
|
|
|
|
|
nn.Dropout(0.3)
|
|
|
|
|
|
).to(device) for fp in drug_fingerprints
|
|
|
|
|
|
])
|
|
|
|
|
|
|
|
|
|
|
|
# Transformer for drug feature encoding
|
|
|
|
|
|
self.transformer = TransformerEncoder(
|
|
|
|
|
|
TransformerEncoderLayer(d_model=512, nhead=8, dim_feedforward=2048, batch_first=True),
|
|
|
|
|
|
num_layers=3
|
|
|
|
|
|
).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
# Normalization layers
|
|
|
|
|
|
self.cell_norm = nn.LayerNorm(cell_exprs.shape[1]).to(device)
|
|
|
|
|
|
self.drug_norm = nn.LayerNorm(512).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
# Cell feature encoder
|
|
|
|
|
|
self.cell_encoder = nn.Sequential(
|
|
|
|
|
|
nn.Linear(cell_exprs.shape[1], 1024),
|
|
|
|
|
|
nn.BatchNorm1d(1024),
|
|
|
|
|
|
nn.GELU(),
|
|
|
|
|
|
nn.Dropout(0.3),
|
|
|
|
|
|
nn.Linear(1024, 512)
|
|
|
|
|
|
).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self):
|
|
|
|
|
|
"""Encodes cell and drug features into a unified embedding space."""
|
|
|
|
|
|
cell_feat = self.cell_norm(self.cell_exprs)
|
|
|
|
|
|
cell_encoded = self.cell_encoder(cell_feat)
|
|
|
|
|
|
|
|
|
|
|
|
# Project and transform drug features
|
|
|
|
|
|
projected = [proj(fp) for proj, fp in zip(self.drug_proj, self.drug_fingerprints)]
|
|
|
|
|
|
stacked = torch.stack(projected, dim=1)
|
|
|
|
|
|
drug_feat = self.transformer(stacked)
|
|
|
|
|
|
drug_feat = self.drug_norm(drug_feat.mean(dim=1))
|
|
|
|
|
|
|
|
|
|
|
|
return cell_encoded, drug_feat
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GEncoder(nn.Module):
|
|
|
|
|
|
"""Graph encoder for cell and drug feature aggregation with attention."""
|
|
|
|
|
|
def __init__(self, agg_c_lp, agg_d_lp, self_c_lp, self_d_lp, device="cpu"):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self.agg_c_lp = agg_c_lp
|
|
|
|
|
|
self.agg_d_lp = agg_d_lp
|
|
|
|
|
|
self.self_c_lp = self_c_lp
|
|
|
|
|
|
self.self_d_lp = self_d_lp
|
|
|
|
|
|
self.device = device
|
|
|
|
|
|
|
|
|
|
|
|
# Cell feature encoder
|
|
|
|
|
|
self.cell_encoder = nn.Sequential(
|
|
|
|
|
|
nn.Linear(512, 1024),
|
|
|
|
|
|
nn.BatchNorm1d(1024),
|
|
|
|
|
|
nn.GELU(),
|
|
|
|
|
|
nn.Dropout(0.3),
|
|
|
|
|
|
nn.Linear(1024, 512)
|
|
|
|
|
|
).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
# Drug feature encoder
|
|
|
|
|
|
self.drug_encoder = nn.Sequential(
|
|
|
|
|
|
nn.Linear(512, 1024),
|
|
|
|
|
|
nn.BatchNorm1d(1024),
|
|
|
|
|
|
nn.GELU(),
|
|
|
|
|
|
nn.Dropout(0.3),
|
|
|
|
|
|
nn.Linear(1024, 512)
|
|
|
|
|
|
).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
# Attention mechanism for cross-modal interaction
|
|
|
|
|
|
self.attention = MultiheadAttention(embed_dim=512, num_heads=8, batch_first=True).to(device)
|
|
|
|
|
|
self.residual = nn.Linear(512, 512).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
# Final feature fusion
|
|
|
|
|
|
self.fc = nn.Sequential(
|
|
|
|
|
|
nn.Linear(1024, 512),
|
|
|
|
|
|
nn.BatchNorm1d(512),
|
|
|
|
|
|
nn.GELU(),
|
|
|
|
|
|
nn.Dropout(0.2)
|
|
|
|
|
|
).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self, cell_f, drug_f):
|
|
|
|
|
|
"""Aggregates and encodes cell and drug features using graph convolution and attention."""
|
|
|
|
|
|
# Aggregate features via graph convolution
|
|
|
|
|
|
cell_agg = torch.mm(self.agg_c_lp, drug_f)
|
|
|
|
|
|
drug_agg = torch.mm(self.agg_d_lp, cell_f)
|
|
|
|
|
|
|
|
|
|
|
|
# Encode aggregated features
|
|
|
|
|
|
cell_fc = self.cell_encoder(cell_agg)
|
|
|
|
|
|
drug_fc = self.drug_encoder(drug_agg)
|
|
|
|
|
|
|
|
|
|
|
|
# Apply attention mechanism
|
|
|
|
|
|
attn_output, _ = self.attention(
|
|
|
|
|
|
query=cell_fc.unsqueeze(0),
|
|
|
|
|
|
key=drug_fc.unsqueeze(0),
|
|
|
|
|
|
value=drug_fc.unsqueeze(0)
|
|
|
|
|
|
)
|
|
|
|
|
|
attn_output = attn_output.squeeze(0)
|
|
|
|
|
|
cell_emb = cell_fc + self.residual(attn_output)
|
|
|
|
|
|
|
|
|
|
|
|
# Apply final activation
|
|
|
|
|
|
return F.gelu(cell_emb), F.gelu(drug_fc)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GDecoder(nn.Module):
|
|
|
|
|
|
"""Decodes cell and drug embeddings into interaction scores."""
|
|
|
|
|
|
def __init__(self, emb_dim, gamma):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self.gamma = gamma
|
|
|
|
|
|
self.decoder = nn.Sequential(
|
|
|
|
|
|
nn.Linear(2 * emb_dim, 1024),
|
|
|
|
|
|
nn.BatchNorm1d(1024),
|
|
|
|
|
|
nn.GELU(),
|
|
|
|
|
|
nn.Dropout(0.2),
|
|
|
|
|
|
nn.Linear(1024, 1)
|
|
|
|
|
|
)
|
|
|
|
|
|
self.corr_weight = nn.Parameter(torch.tensor(0.5))
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self, cell_emb, drug_emb):
|
|
|
|
|
|
"""Predicts interaction scores using combined embeddings and correlation."""
|
|
|
|
|
|
cell_exp = cell_emb.unsqueeze(1).repeat(1, drug_emb.size(0), 1)
|
|
|
|
|
|
drug_exp = drug_emb.unsqueeze(0).repeat(cell_emb.size(0), 1, 1)
|
|
|
|
|
|
combined = torch.cat([cell_exp, drug_exp], dim=-1)
|
|
|
|
|
|
scores = self.decoder(combined.view(-1, 2 * cell_emb.size(1))).view(cell_emb.size(0), drug_emb.size(0))
|
|
|
|
|
|
corr = torch_corr_x_y(cell_emb, drug_emb)
|
|
|
|
|
|
return torch.sigmoid(self.gamma * (self.corr_weight * scores + (1 - self.corr_weight) * corr))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class DeepTraCDR(nn.Module):
|
|
|
|
|
|
"""Main model integrating adjacency matrix construction, feature loading, encoding, and decoding."""
|
|
|
|
|
|
def __init__(self, adj_mat, cell_exprs, drug_finger, layer_size, gamma, device="cpu"):
|
|
|
|
|
|
super().__init__()
|
|
|
|
|
|
self.device = device
|
|
|
|
|
|
self.adj_mat = torch.from_numpy(adj_mat).float().to(device) if isinstance(adj_mat, np.ndarray) else adj_mat.to(device)
|
|
|
|
|
|
|
|
|
|
|
|
self.construct_adj = ConstructAdjMatrix(self.adj_mat, device=device)
|
|
|
|
|
|
self.load_feat = LoadFeature(cell_exprs, drug_finger, device=device)
|
|
|
|
|
|
|
|
|
|
|
|
# Precompute adjacency matrices
|
|
|
|
|
|
agg_c, agg_d, self_c, self_d = self.construct_adj()
|
|
|
|
|
|
|
|
|
|
|
|
self.encoder = GEncoder(agg_c, agg_d, self_c, self_d, device=device).to(device)
|
|
|
|
|
|
self.decoder = GDecoder(layer_size[-1], gamma).to(device)
|
|
|
|
|
|
|
|
|
|
|
|
def forward(self):
|
|
|
|
|
|
"""Executes the full forward pass of the model."""
|
|
|
|
|
|
cell_f, drug_f = self.load_feat()
|
|
|
|
|
|
cell_emb, drug_emb = self.encoder(cell_f, drug_f)
|
|
|
|
|
|
return self.decoder(cell_emb, drug_emb), cell_emb, drug_emb
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Optimizer:
|
|
|
|
|
|
"""Handles model training and evaluation with performance metrics."""
|
|
|
|
|
|
def __init__(self, model, train_data, test_data, test_mask, train_mask, adj_matrix, evaluate_fun, lr=0.001, wd=1e-05, epochs=200, test_freq=20, device="cpu"):
|
|
|
|
|
|
self.model = model.to(device)
|
|
|
|
|
|
self.train_data = train_data.float().to(device)
|
|
|
|
|
|
self.test_data = test_data.float().to(device)
|
|
|
|
|
|
self.train_mask = train_mask.to(device)
|
|
|
|
|
|
self.test_mask_bool = test_mask.to(device).bool()
|
|
|
|
|
|
self.adj_matrix = adj_matrix.to(device)
|
|
|
|
|
|
self.evaluate_fun = evaluate_fun
|
|
|
|
|
|
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, weight_decay=wd)
|
|
|
|
|
|
self.epochs = epochs
|
|
|
|
|
|
self.test_freq = test_freq
|
|
|
|
|
|
|
|
|
|
|
|
def train(self):
|
|
|
|
|
|
"""Trains the model and evaluates performance on test data."""
|
|
|
|
|
|
true_data = torch.masked_select(self.test_data, self.test_mask_bool).cpu().numpy()
|
|
|
|
|
|
best_metrics = {'auc': 0.0, 'auprc': 0.0, 'precision': 0.0, 'recall': 0.0, 'f1': 0.0}
|
|
|
|
|
|
best_pred = None
|
|
|
|
|
|
|
|
|
|
|
|
for epoch in range(self.epochs):
|
|
|
|
|
|
self.model.train()
|
|
|
|
|
|
pred, cell_emb, drug_emb = self.model()
|
|
|
|
|
|
|
|
|
|
|
|
# Compute losses
|
|
|
|
|
|
ce_loss = cross_entropy_loss(self.train_data, pred, self.train_mask)
|
|
|
|
|
|
proto_loss = prototypical_loss(cell_emb, drug_emb, self.adj_matrix)
|
|
|
|
|
|
total_loss = 0.7 * ce_loss + 0.3 * proto_loss
|
|
|
|
|
|
|
|
|
|
|
|
# Backpropagation
|
|
|
|
|
|
self.optimizer.zero_grad()
|
|
|
|
|
|
total_loss.backward()
|
|
|
|
|
|
self.optimizer.step()
|
|
|
|
|
|
|
|
|
|
|
|
# Evaluate periodically
|
|
|
|
|
|
if epoch % self.test_freq == 0:
|
|
|
|
|
|
self.model.eval()
|
|
|
|
|
|
with torch.no_grad():
|
|
|
|
|
|
pred_masked = torch.masked_select(pred, self.test_mask_bool).cpu().numpy()
|
|
|
|
|
|
metrics = self._compute_metrics(true_data, pred_masked)
|
|
|
|
|
|
|
|
|
|
|
|
# Update best metrics
|
|
|
|
|
|
if metrics['auc'] > best_metrics['auc']:
|
|
|
|
|
|
best_metrics.update(metrics)
|
|
|
|
|
|
best_pred = pred_masked.copy()
|
|
|
|
|
|
|
|
|
|
|
|
print(f"Epoch {epoch}: Loss={total_loss.item():.4f}, AUC={metrics['auc']:.4f}, "
|
|
|
|
|
|
f"AUPRC={metrics['auprc']:.4f}, Precision={metrics['precision']:.4f}, "
|
|
|
|
|
|
f"Recall={metrics['recall']:.4f}, F1-Score={metrics['f1']:.4f}")
|
|
|
|
|
|
|
|
|
|
|
|
# Print final best metrics
|
|
|
|
|
|
print("\nBest Metrics:")
|
|
|
|
|
|
for metric, value in best_metrics.items():
|
|
|
|
|
|
print(f"{metric.upper()}: {value:.4f}")
|
|
|
|
|
|
|
|
|
|
|
|
return true_data, best_pred, *best_metrics.values()
|
|
|
|
|
|
|
|
|
|
|
|
def _compute_metrics(self, true_data, pred_masked):
|
|
|
|
|
|
"""Computes evaluation metrics for model predictions."""
|
|
|
|
|
|
try:
|
|
|
|
|
|
auc = roc_auc_score(true_data, pred_masked)
|
|
|
|
|
|
auprc = average_precision_score(true_data, pred_masked)
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
auc = auprc = 0.0
|
|
|
|
|
|
|
|
|
|
|
|
pred_labels = (pred_masked >= 0.5).astype(int)
|
|
|
|
|
|
return {
|
|
|
|
|
|
'auc': auc,
|
|
|
|
|
|
'auprc': auprc,
|
|
|
|
|
|
'precision': precision_score(true_data, pred_labels, zero_division=0),
|
|
|
|
|
|
'recall': recall_score(true_data, pred_labels, zero_division=0),
|
|
|
|
|
|
'f1': f1_score(true_data, pred_labels, zero_division=0)
|
|
|
|
|
|
} |