Browse Source

Upload files to 'Scenario1/Regression'

master
Zahra Asgari 6 days ago
parent
commit
59515385b6
2 changed files with 397 additions and 0 deletions
  1. 278
    0
      Scenario1/Regression/DeepTraCDR_model.py
  2. 119
    0
      Scenario1/Regression/train_random.py

+ 278
- 0
Scenario1/Regression/DeepTraCDR_model.py View File

@@ -0,0 +1,278 @@
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import MultiheadAttention, TransformerEncoder, TransformerEncoderLayer
from scipy.stats import pearsonr, spearmanr
from utils import mse_loss, torch_corr_x_y
class ConstructAdjMatrix(nn.Module):
"""Constructs adjacency matrices for graph-based operations."""
def __init__(self, original_adj_mat, device="cpu"):
super().__init__()
# Convert numpy array to torch tensor if needed
self.adj = torch.from_numpy(original_adj_mat).float() if isinstance(original_adj_mat, np.ndarray) else original_adj_mat
self.adj = self.adj.to(device)
self.device = device
def forward(self):
"""Computes Laplacian matrices for cells and drugs."""
with torch.no_grad():
# Compute degree matrices for normalization
d_x = torch.diag(torch.pow(torch.sum(self.adj, dim=1) + 1, -0.5))
d_y = torch.diag(torch.pow(torch.sum(self.adj, dim=0) + 1, -0.5))
# Compute aggregated Laplacian matrices
agg_cell_lp = torch.mm(torch.mm(d_x, self.adj), d_y)
agg_drug_lp = torch.mm(torch.mm(d_y, self.adj.T), d_x)
# Compute self-loop Laplacian matrices
self_cell_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=1) + 1, -1), 1))
self_drug_lp = torch.diag(torch.add(torch.pow(torch.sum(self.adj, dim=0) + 1, -1), 1))
return agg_cell_lp.to(self.device), agg_drug_lp.to(self.device), self_cell_lp.to(self.device), self_drug_lp.to(self.device)
class LoadFeature(nn.Module):
"""Loads and processes cell and drug features."""
def __init__(self, cell_exprs, drug_fingerprints, device="cpu"):
super().__init__()
self.device = device
# Convert input data to torch tensors
self.cell_exprs = torch.from_numpy(cell_exprs).float().to(device)
self.drug_fingerprints = [torch.from_numpy(fp).float().to(device) for fp in drug_fingerprints]
# Define projection layers for drug fingerprints
self.drug_proj = nn.ModuleList([
nn.Sequential(
nn.Linear(fp.shape[1], 512),
nn.BatchNorm1d(512),
nn.GELU(),
nn.Dropout(0.3)
).to(device) for fp in drug_fingerprints
])
# Initialize transformer encoder for drug features
self.transformer = TransformerEncoder(
TransformerEncoderLayer(d_model=512, nhead=8, dim_feedforward=2048, batch_first=True),
num_layers=3
).to(device)
# Normalization layers
self.cell_norm = nn.LayerNorm(cell_exprs.shape[1]).to(device)
self.drug_norm = nn.LayerNorm(512).to(device)
# Cell feature encoder
self.cell_encoder = nn.Sequential(
nn.Linear(cell_exprs.shape[1], 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.3),
nn.Linear(1024, 512)
).to(device)
def forward(self):
"""Processes cell and drug features to generate encoded representations."""
# Normalize and encode cell features
cell_feat = self.cell_norm(self.cell_exprs)
cell_encoded = self.cell_encoder(cell_feat)
# Project and process drug fingerprints
projected = [proj(fp) for proj, fp in zip(self.drug_proj, self.drug_fingerprints)]
stacked = torch.stack(projected, dim=1)
drug_feat = self.transformer(stacked)
drug_feat = self.drug_norm(drug_feat.mean(dim=1))
return cell_encoded, drug_feat
class GEncoder(nn.Module):
"""Encodes cell and drug features using graph-based operations."""
def __init__(self, agg_c_lp, agg_d_lp, self_c_lp, self_d_lp, device="cpu"):
super().__init__()
self.agg_c_lp = agg_c_lp
self.agg_d_lp = agg_d_lp
self.self_c_lp = self_c_lp
self.self_d_lp = self_d_lp
self.device = device
# Cell feature encoder
self.cell_encoder = nn.Sequential(
nn.Linear(512, 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.3),
nn.Linear(1024, 512)
).to(device)
# Drug feature encoder
self.drug_encoder = nn.Sequential(
nn.Linear(512, 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.3),
nn.Linear(1024, 512)
).to(device)
# Attention mechanism and residual connection
self.attention = MultiheadAttention(embed_dim=512, num_heads=8, batch_first=True).to(device)
self.residual = nn.Linear(512, 512).to(device)
self.fc = nn.Sequential(
nn.Linear(1024, 512),
nn.BatchNorm1d(512),
nn.GELU(),
nn.Dropout(0.2)
).to(device)
def forward(self, cell_f, drug_f):
"""Encodes cell and drug features with graph-based attention."""
# Aggregate features using Laplacian matrices
cell_agg = torch.mm(self.agg_c_lp, drug_f) + torch.mm(self.self_c_lp, cell_f)
drug_agg = torch.mm(self.agg_d_lp, cell_f) + torch.mm(self.self_d_lp, drug_f)
# Encode aggregated features
cell_fc = self.cell_encoder(cell_agg)
drug_fc = self.drug_encoder(drug_agg)
# Apply attention mechanism
attn_output, _ = self.attention(
query=cell_fc.unsqueeze(0),
key=drug_fc.unsqueeze(0),
value=drug_fc.unsqueeze(0)
)
attn_output = attn_output.squeeze(0)
cell_emb = cell_fc + self.residual(attn_output)
# Apply final activation
cell_emb = F.gelu(cell_emb)
drug_emb = F.gelu(drug_fc)
return cell_emb, drug_emb
class GDecoder(nn.Module):
"""Decodes combined cell and drug embeddings to predict scores."""
def __init__(self, emb_dim, gamma):
super().__init__()
self.gamma = gamma
# Decoder network
self.decoder = nn.Sequential(
nn.Linear(2 * emb_dim, 1024),
nn.BatchNorm1d(1024),
nn.GELU(),
nn.Dropout(0.2),
nn.Linear(1024, 1)
)
# Learnable correlation weight
self.corr_weight = nn.Parameter(torch.tensor(0.5))
def forward(self, cell_emb, drug_emb):
"""Generates prediction scores from cell and drug embeddings."""
# Combine cell and drug embeddings
cell_exp = cell_emb.unsqueeze(1).repeat(1, drug_emb.size(0), 1)
drug_exp = drug_emb.unsqueeze(0).repeat(cell_emb.size(0), 1, 1)
combined = torch.cat([cell_exp, drug_exp], dim=-1)
# Decode combined features
scores = self.decoder(combined.view(-1, 2 * cell_emb.size(1))).view(cell_emb.size(0), drug_emb.size(0))
corr = torch_corr_x_y(cell_emb, drug_emb)
# Combine scores and correlation
return self.gamma * (self.corr_weight * scores + (1 - self.corr_weight) * corr)
class DeepTraCDR(nn.Module):
"""Graph Convolutional Network for cell-drug interaction prediction."""
def __init__(self, adj_mat, cell_exprs, drug_finger, layer_size, gamma, device="cpu"):
super().__init__()
self.device = device
# Convert adjacency matrix to tensor if needed
self.adj_mat = torch.from_numpy(adj_mat).float().to(device) if isinstance(adj_mat, np.ndarray) else adj_mat.to(device)
# Initialize components
self.construct_adj = ConstructAdjMatrix(self.adj_mat, device=device)
self.load_feat = LoadFeature(cell_exprs, drug_finger, device=device)
# Precompute adjacency matrices
agg_c, agg_d, self_c, self_d = self.construct_adj()
# Initialize encoder and decoder
self.encoder = GEncoder(agg_c, agg_d, self_c, self_d, device=device).to(device)
self.decoder = GDecoder(layer_size[-1], gamma).to(device)
def forward(self):
"""Generates predictions and embeddings for cell-drug interactions."""
cell_f, drug_f = self.load_feat()
cell_emb, drug_emb = self.encoder(cell_f, drug_f)
return self.decoder(cell_emb, drug_emb), cell_emb, drug_emb
class Optimizer:
"""Handles training and evaluation of the DeepTraCDR model."""
def __init__(self, model, train_data, test_data, test_mask, train_mask, adj_matrix,
lr=0.001, wd=1e-05, epochs=200, test_freq=20, device="cpu", patience=50):
self.model = model.to(device)
self.train_data = train_data.float().to(device)
self.test_data = test_data.float().to(device)
self.train_mask = train_mask.to(device)
self.test_mask = test_mask.to(device).bool()
self.adj_matrix = adj_matrix.to(device)
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr, weight_decay=wd)
self.epochs = epochs
self.test_freq = test_freq
self.patience = patience
self.best_rmse = float('inf')
self.epochs_no_improve = 0
self.true_masked_np = torch.masked_select(self.test_data, self.test_mask).cpu().numpy()
def evaluate_metrics(self, pred_tensor):
"""Computes RMSE, PCC, and SCC for model evaluation."""
pred_masked_np = torch.masked_select(pred_tensor, self.test_mask).cpu().numpy()
rmse = np.sqrt(np.mean((self.true_masked_np - pred_masked_np)**2))
pcc, _ = pearsonr(self.true_masked_np, pred_masked_np)
scc, _ = spearmanr(self.true_masked_np, pred_masked_np)
return rmse, pcc, scc, pred_masked_np
def train(self):
"""Trains the model with early stopping based on RMSE."""
best_rmse = float('inf')
best_pcc = 0.0
best_scc = 0.0
best_pred_np = None
for epoch in range(self.epochs):
self.model.train()
pred, cell_emb, drug_emb = self.model()
# Compute and optimize loss
mse_loss_val = mse_loss(self.train_data, pred, self.train_mask)
total_loss = mse_loss_val
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
if epoch % self.test_freq == 0:
self.model.eval()
with torch.no_grad():
pred_eval, _, _ = self.model()
rmse, pcc, scc, pred_masked = self.evaluate_metrics(pred_eval)
# Update early stopping criteria
if rmse < self.best_rmse:
self.best_rmse = rmse
self.epochs_no_improve = 0
else:
self.epochs_no_improve += 1
# Track best results
if rmse < best_rmse:
best_rmse = rmse
best_pcc = pcc
best_scc = scc
best_pred_np = pred_masked.copy()
print(f"Epoch {epoch}: Loss = {total_loss.item():.4f}, RMSE = {rmse:.4f}, PCC = {pcc:.4f}, SCC = {scc:.4f}")
# Early stopping
if self.epochs_no_improve >= self.patience:
print(f"Early stopping at epoch {epoch} (no improvement for {self.patience} epochs).")
break
print("\nBest Results:")
print(f"RMSE: {best_rmse:.4f}")
print(f"PCC: {best_pcc:.4f}")
print(f"SCC: {best_scc:.4f}")
return self.true_masked_np, best_pred_np, best_rmse, best_pcc, best_scc

+ 119
- 0
Scenario1/Regression/train_random.py View File

@@ -0,0 +1,119 @@
import argparse
import numpy as np
import torch
from sklearn.model_selection import KFold
from Regression.DeepTraCDR_model import DeepTraCDR, Optimizer
from data_sampler import RegressionSampler
from data_loader import load_data
def parse_arguments() -> argparse.Namespace:
"""
Parses command-line arguments for the DeepTraCDR regression task.
Returns:
Parsed arguments as a Namespace object.
"""
parser = argparse.ArgumentParser(description="DeepTraCDR Regression Task")
parser.add_argument('-device', type=str, default="cuda:0" if torch.cuda.is_available() else "cpu",
help="Device to run the model on (e.g., 'cuda:0' or 'cpu')")
parser.add_argument('-data', type=str, default='gdsc', help="Dataset to use (default: gdsc)")
parser.add_argument('--wd', type=float, default=1e-5, help="Weight decay for optimizer")
parser.add_argument('--layer_size', nargs='+', type=int, default=[512], help="Layer sizes for the model")
parser.add_argument('--gamma', type=float, default=15, help="Gamma parameter for decoder")
parser.add_argument('--epochs', type=int, default=1000, help="Number of training epochs")
parser.add_argument('--test_freq', type=int, default=50, help="Frequency of evaluation during training")
parser.add_argument('--lr', type=float, default=0.0001, help="Learning rate for optimizer")
parser.add_argument('--patience', type=int, default=20, help="Patience for early stopping")
return parser.parse_args()
def normalize_adj_matrix(adj_matrix: np.ndarray) -> torch.Tensor:
"""
Normalizes the adjacency matrix using min-shift normalization and converts it to a torch tensor.
Args:
adj_matrix: Input adjacency matrix as a NumPy array.
Returns:
Normalized adjacency matrix as a torch tensor.
"""
adj_matrix = adj_matrix - np.min(adj_matrix)
if isinstance(adj_matrix, np.ndarray):
adj_matrix = torch.from_numpy(adj_matrix).float()
return adj_matrix
def main():
"""
Main function to run the DeepTraCDR regression task with k-fold cross-validation.
"""
# Set precision for matrix multiplication
torch.set_float32_matmul_precision('high')
# Parse command-line arguments
args = parse_arguments()
# Load dataset
full_adj, drug_fingerprints, exprs, null_mask, pos_num, args = load_data(args)
print(f"Original full_adj shape: {full_adj.shape}")
print(f"Normalized full_adj shape: {full_adj.shape}")
print("\n--- Data Shapes ---")
print(f"Expression data shape: {exprs.shape}")
print(f"Null mask shape: {null_mask.shape}")
# Normalize adjacency matrix
full_adj = normalize_adj_matrix(full_adj)
# Initialize k-fold cross-validation parameters
k = 5
n_kfolds = 5
all_metrics = {'rmse': [], 'pcc': [], 'scc': []}
# Perform k-fold cross-validation
for n_kfold in range(n_kfolds):
kfold = KFold(n_splits=k, shuffle=True, random_state=n_kfold)
for fold, (train_idx, test_idx) in enumerate(kfold.split(np.arange(pos_num))):
# Initialize data sampler
sampler = RegressionSampler(full_adj, train_idx, test_idx, null_mask)
# Initialize model
model = DeepTraCDR(
adj_mat=full_adj,
cell_exprs=exprs,
drug_finger=drug_fingerprints,
layer_size=args.layer_size,
gamma=args.gamma,
device=args.device
)
# Initialize optimizer
opt = Optimizer(
model=model,
train_data=sampler.train_data,
test_data=sampler.test_data,
test_mask=sampler.test_mask,
train_mask=sampler.train_mask,
adj_matrix=full_adj,
lr=args.lr,
wd=args.wd,
epochs=args.epochs,
test_freq=args.test_freq,
device=args.device,
patience=args.patience
)
# Train model and collect metrics
true, pred, best_rmse, best_pcc, best_scc = opt.train()
all_metrics['rmse'].append(best_rmse)
all_metrics['pcc'].append(best_pcc)
all_metrics['scc'].append(best_scc)
print(f"Fold {n_kfold * k + fold + 1}: RMSE={best_rmse:.4f}, PCC={best_pcc:.4f}, SCC={best_scc:.4f}")
# Compute and display final average metrics
print("\nFinal Average Metrics:")
for metric, values in all_metrics.items():
mean = np.mean(values)
std = np.std(values)
print(f"{metric.upper()}: {mean:.4f} ± {std:.4f}")
if __name__ == "__main__":
main()

Loading…
Cancel
Save