Browse Source

Upload files to ''

master
Zahra Asgari 4 weeks ago
parent
commit
2039c7c647
5 changed files with 976 additions and 2 deletions
  1. 44
    2
      README.md
  2. 163
    0
      data_loader.py
  3. 360
    0
      data_sampler.py
  4. 8
    0
      requirements.txt
  5. 401
    0
      utils.py

+ 44
- 2
README.md View File

@@ -1,2 +1,44 @@
# DeepTraCDR

# DeepTraCDR: Prediction Cancer Drug Response using multimodal deep learning with Transformers
<!--
## Abstract
<div align="justify">
</div> -->
<!-- ## Method
<img width="810" alt="image" src="https://github.com/akianfar/Deep-CBN/blob/main/assest/Artboard%202.jpg">
<img width="810" alt="image" src="https://github.com/akianfar/Deep-CBN/blob/main/assest/Artboard%203.jpg"> -->
### Requirements
To run this project, you need to install the required dependencies first. Execute the following command in your terminal or command prompt:
```bash
pip install -r requirements.txt
```
## DeepTraCDR Model Overview
DeepTraCDR is a modular model consisting of **Common Modules** and **Experimental Modules**.
### Common Modules
- **Data**: Includes datasets for model training and evaluation:
- **GDSC**: Contains `cell_drug.csv` (log IC50 matrix), `cell_drug_binary.csv` (binary matrix), `cell_exprs.csv` (gene expression), `drug_feature.csv` (drug fingerprints), `null_mask.csv` (null values), and `threshold.csv` (sensitivity threshold).
- **CCLE**: Similar to GDSC with `cell_drug.csv`, `cell_drug_binary.csv`, `cell_exprs.csv`, and `drug_feature.csv`.
- **PDX**: Includes `pdx_response.csv` (binary patient-drug matrix), `pdx_exprs.csv` (gene expression), `pdx_null_mask.csv` (null values), and `drug_feature.csv`.
- **TCGA**: Contains `patient_drug_binary.csv` (binary matrix), `tcga_exprs.csv` (gene expression), `tcga_null_mask.csv` (null values), and `drug_feature.csv`.
### Experimental Modules
The experimental modules are organized into the following directories, each containing a `main.py` script to run the respective experiment:
- **`case_study`**: Contains scripts for case study experiments (e.g., `main_case_study.py`).
- **`Scenario1`**: Includes experiments for random clearing cross-validation (`Random`) and regression (`Regression`).
- **`Scenario2`**: Includes experiments for single row/column clearing (`new`) and targeted drug experiments (`Target`).
- **`Scenario3`**: Includes external validation experiments from in vitro to in vivo (`External`).
Each `main.py` script outputs true and predicted test data values after multiple cross-validations. The `utils.py` file supports performance analysis with metrics like AUC, AUPRC, ACC, F1, and MCC. The model is built using PyTorch with CUDA support.

+ 163
- 0
data_loader.py View File

@@ -0,0 +1,163 @@
import pandas as pd
import numpy as np
import scipy.sparse as sp
from utils import *
def load_data(args):
"""
Loads dataset based on the specified data type.
Args:
args: Object containing configuration parameters, including the dataset type.
Returns:
Tuple containing adjacency matrix, drug fingerprints, expression data, null mask, positive edge count, and updated args.
Raises:
NotImplementedError: If the specified dataset is not supported.
"""
if args.data == 'gdsc':
return _load_gdsc(args)
elif args.data == 'ccle':
return _load_ccle(args)
elif args.data == 'pdx':
return _load_pdx(args)
elif args.data == 'tcga':
return _load_tcga(args)
else:
raise NotImplementedError(f"Dataset {args.data} is not supported.")
def _load_gdsc(args):
"""
Loads GDSC dataset, including cell-drug response, drug fingerprints, gene expression, and null mask.
Args:
args: Configuration object to be updated with dataset-specific parameters.
Returns:
Tuple of response matrix, drug fingerprints, expression data, null mask, positive edge count, and updated args.
"""
args.alpha = 0.25
args.layer_size = [512, 512]
# Load drug fingerprints
drug_fingerprints = [
pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/ECFP6_fingerprints_GDSC.csv", index_col=0).values.astype(np.float32)
]
# Load response, expression, and null mask data
res = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/cell_drug_binary.csv", index_col=0).values.astype(np.float32)
exprs = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/cell_gene/merged_file_GDSC.csv", index_col=0).values.astype(np.float32)
null_mask = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/null_mask.csv", index_col=0).values.astype(np.float32)
pos_num = sp.coo_matrix(res).data.shape[0]
return res, drug_fingerprints, exprs, null_mask, pos_num, args
def _load_ccle(args):
"""
Loads CCLE dataset, including cell-drug response, drug fingerprints, gene expression, and null mask.
Args:
args: Configuration object to be updated with dataset-specific parameters.
Returns:
Tuple of response matrix, drug fingerprints, expression data, null mask, positive edge count, and updated args.
"""
args.alpha = 0.45
args.layer_size = [512, 512]
# Load drug fingerprints
drug_fingerprints = [
pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/CCLE/ECFP6_fingerprints.csv", index_col=0).values.astype(np.float32)
]
# Load response and expression data, initialize null mask
res = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/CCLE/cell_drug_binary.csv", index_col=0).values.astype(np.float32)
exprs = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/CCLE/merged_file.csv", index_col=0).values.astype(np.float32)
null_mask = np.zeros(res.shape, dtype=np.float32)
pos_num = sp.coo_matrix(res).data.shape[0]
return res, drug_fingerprints, exprs, null_mask, pos_num, args
def _load_pdx(args):
"""
Loads PDX dataset by merging GDSC and PDX data, aligning gene expression by common genes.
Args:
args: Configuration object to be updated with dataset-specific parameters.
Returns:
Tuple of merged response matrix, drug fingerprints, merged expression data, merged null mask, training row count, and updated args.
"""
args.alpha = 0.15
args.layer_size = [1024, 1024]
# Load response matrices
gdsc_res = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/cell_drug_binary.csv", index_col=0).values.astype(np.float32)
pdx_res = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/PDX/pdx_response.csv", index_col=0).values.astype(np.float32)
res = np.concatenate((gdsc_res, pdx_res), axis=0)
train_row = gdsc_res.shape[0]
# Load drug fingerprints
drug_finger = [
pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/ECFP6_fingerprints_GDSC.csv", index_col=0).values.astype(np.float32)
]
# Load and align gene expression data
gdsc_exprs_df = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/cell_gene/merged_file_GDSC.csv", index_col=0)
pdx_exprs_df = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/PDX/pdx_exprs.csv", index_col=0)
common_genes = gdsc_exprs_df.columns.intersection(pdx_exprs_df.columns)
gdsc_exprs_filtered = gdsc_exprs_df[common_genes].values.astype(np.float32)
pdx_exprs_filtered = pdx_exprs_df[common_genes].values.astype(np.float32)
exprs = np.concatenate((gdsc_exprs_filtered, pdx_exprs_filtered), axis=0)
# Load and merge null masks
gdsc_null_mask = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/null_mask.csv", index_col=0).values.astype(np.float32)
pdx_null_mask = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/PDX/pdx_null_mask.csv", index_col=0).values.astype(np.float32)
null_mask = np.concatenate((gdsc_null_mask, pdx_null_mask), axis=0)
return res, drug_finger, exprs, null_mask, train_row, args
def _load_tcga(args):
"""
Loads TCGA dataset by merging GDSC and TCGA data, aligning gene expression by common genes.
Args:
args: Configuration object to be updated with dataset-specific parameters.
Returns:
Tuple of merged response matrix, drug fingerprints, merged expression data, merged null mask, training row count, and updated args.
"""
args.alpha = 0.1
args.layer_size = [1024, 1024]
# Load response matrices
gdsc_res = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/cell_drug_binary.csv", index_col=0).values.astype(np.float32)
tcga_res = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/TCGA/patient_drug_binary.csv", index_col=0).values.astype(np.float32)
res = np.concatenate((gdsc_res, tcga_res), axis=0)
train_row = gdsc_res.shape[0]
# Load drug fingerprints
drug_finger = [
pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/ECFP6_fingerprints_GDSC.csv", index_col=0).values.astype(np.float32)
]
# Load and align gene expression data
gdsc_exprs = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/cell_gene/merged_file_GDSC.csv", index_col=0)
patient_gene = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/TCGA/tcga_gene_exprs.csv", index_col=0)
common_genes = gdsc_exprs.columns.intersection(patient_gene.columns)
gdsc_exprs_filtered = gdsc_exprs[common_genes].values.astype(np.float32)
tcga_exprs_filtered = patient_gene[common_genes].values.astype(np.float32)
exprs = np.concatenate((gdsc_exprs_filtered, tcga_exprs_filtered), axis=0)
# Load and merge null masks
gdsc_null_mask = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/GDSC/null_mask.csv", index_col=0).values.astype(np.float32)
tcga_null_mask = pd.read_csv("/media/external_16TB_1/ali_kianfar/Data/TCGA/null_mask.csv", index_col=0).values.astype(np.float32)
null_mask = np.concatenate((gdsc_null_mask, tcga_null_mask), axis=0)
return res, drug_finger, exprs, null_mask, train_row, args

+ 360
- 0
data_sampler.py View File

@@ -0,0 +1,360 @@
import torch
import numpy as np
import scipy.sparse as sp
from typing import Tuple, Optional
from utils import to_coo_matrix, to_tensor, mask
class RandomSampler:
"""
Samples edges from an adjacency matrix to create train/test sets.
Converts the training set into torch.Tensor format.
"""
def __init__(
self,
adj_mat_original: np.ndarray,
train_index: np.ndarray,
test_index: np.ndarray,
null_mask: np.ndarray
) -> None:
self.adj_mat = to_coo_matrix(adj_mat_original)
self.train_index = train_index
self.test_index = test_index
self.null_mask = null_mask
# Sample positive edges
self.train_pos = self._sample_edges(train_index)
self.test_pos = self._sample_edges(test_index)
# Sample negative edges
self.train_neg, self.test_neg = self._sample_negative_edges()
# Create masks
self.train_mask = mask(self.train_pos, self.train_neg, dtype=int)
self.test_mask = mask(self.test_pos, self.test_neg, dtype=bool)
# Convert to tensors
self.train_data = to_tensor(self.train_pos)
self.test_data = to_tensor(self.test_pos)
def _sample_edges(self, index: np.ndarray) -> sp.coo_matrix:
"""Samples edges from the adjacency matrix based on provided indices."""
row = self.adj_mat.row[index]
col = self.adj_mat.col[index]
data = self.adj_mat.data[index]
return sp.coo_matrix(
(data, (row, col)),
shape=self.adj_mat.shape
)
def _sample_negative_edges(self) -> Tuple[sp.coo_matrix, sp.coo_matrix]:
"""
Samples negative edges for training and testing.
Negative edges are those not present in the adjacency matrix.
"""
pos_adj_mat = self.null_mask + self.adj_mat.toarray()
neg_adj_mat = sp.coo_matrix(np.abs(pos_adj_mat - 1))
all_row, all_col, all_data = neg_adj_mat.row, neg_adj_mat.col, neg_adj_mat.data
indices = np.arange(all_data.shape[0])
# Sample negative test edges
test_n = self.test_index.shape[0]
test_neg_indices = np.random.choice(indices, test_n, replace=False)
test_row, test_col, test_data = (
all_row[test_neg_indices],
all_col[test_neg_indices],
all_data[test_neg_indices]
)
test_neg = sp.coo_matrix(
(test_data, (test_row, test_col)),
shape=self.adj_mat.shape
)
# Sample negative train edges
train_neg_indices = np.delete(indices, test_neg_indices)
train_row, train_col, train_data = (
all_row[train_neg_indices],
all_col[train_neg_indices],
all_data[train_neg_indices]
)
train_neg = sp.coo_matrix(
(train_data, (train_row, train_col)),
shape=self.adj_mat.shape
)
return train_neg, test_neg
class NewSampler:
"""
Samples train/test data and masks for a specific target dimension/index.
"""
def __init__(
self,
original_adj_mat: np.ndarray,
null_mask: np.ndarray,
target_dim: Optional[int],
target_index: int
) -> None:
self.adj_mat = original_adj_mat
self.null_mask = null_mask
self.dim = target_dim
self.target_index = target_index
self.train_data, self.test_data = self._sample_train_test_data()
self.train_mask, self.test_mask = self._sample_train_test_mask()
def _sample_target_test_index(self) -> np.ndarray:
"""Samples indices for positive test edges based on target dimension."""
if self.dim:
return np.where(self.adj_mat[:, self.target_index] == 1)[0]
return np.where(self.adj_mat[self.target_index, :] == 1)[0]
def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Samples train and test data based on target indices."""
test_data = np.zeros(self.adj_mat.shape, dtype=np.float32)
test_index = self._sample_target_test_index()
if self.dim:
test_data[test_index, self.target_index] = 1
else:
test_data[self.target_index, test_index] = 1
train_data = self.adj_mat - test_data
return torch.from_numpy(train_data), torch.from_numpy(test_data)
def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Creates train and test masks, including negative sampling."""
test_index = self._sample_target_test_index()
neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask
neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32)
if self.dim:
target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0]
else:
target_neg_index = np.where(neg_value[self.target_index, :] == 1)[0]
target_neg_test_index = (
np.random.choice(target_neg_index, len(test_index), replace=False)
if len(test_index) < len(target_neg_index)
else target_neg_index
)
if self.dim:
neg_test_mask[target_neg_test_index, self.target_index] = 1
neg_value[:, self.target_index] = 0
else:
neg_test_mask[self.target_index, target_neg_test_index] = 1
neg_value[self.target_index, :] = 0
train_mask = (self.train_data.numpy() + neg_value).astype(bool)
test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool)
return torch.from_numpy(train_mask), torch.from_numpy(test_mask)
class SingleSampler:
"""
Samples train/test data and masks for a specific target index.
Returns results as torch.Tensor.
"""
def __init__(
self,
origin_adj_mat: np.ndarray,
null_mask: np.ndarray,
target_index: int,
train_index: np.ndarray,
test_index: np.ndarray
) -> None:
self.adj_mat = origin_adj_mat
self.null_mask = null_mask
self.target_index = target_index
self.train_index = train_index
self.test_index = test_index
self.train_data, self.test_data = self._sample_train_test_data()
self.train_mask, self.test_mask = self._sample_train_test_mask()
def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Samples train and test data for the target index."""
test_data = np.zeros(self.adj_mat.shape, dtype=np.float32)
test_data[self.test_index, self.target_index] = 1
train_data = self.adj_mat - test_data
return torch.from_numpy(train_data), torch.from_numpy(test_data)
def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Creates train and test masks with negative sampling."""
neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask
neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32)
target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0]
target_neg_test_index = np.random.choice(target_neg_index, len(self.test_index), replace=False)
neg_test_mask[target_neg_test_index, self.target_index] = 1
neg_value[target_neg_test_index, self.target_index] = 0
train_mask = (self.train_data.numpy() + neg_value).astype(bool)
test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool)
return torch.from_numpy(train_mask), torch.from_numpy(test_mask)
class TargetSampler:
"""
Samples train/test data and masks for multiple target indices.
"""
def __init__(
self,
response_mat: np.ndarray,
null_mask: np.ndarray,
target_indexes: np.ndarray,
pos_train_index: np.ndarray,
pos_test_index: np.ndarray
) -> None:
self.response_mat = response_mat
self.null_mask = null_mask
self.target_indexes = target_indexes
self.pos_train_index = pos_train_index
self.pos_test_index = pos_test_index
self.train_data, self.test_data = self._sample_train_test_data()
self.train_mask, self.test_mask = self._sample_train_test_mask()
def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Samples train and test data for multiple target indices."""
n_target = self.target_indexes.shape[0]
target_response = self.response_mat[:, self.target_indexes].reshape((-1, n_target))
train_data = self.response_mat.copy()
train_data[:, self.target_indexes] = 0
target_pos_value = sp.coo_matrix(target_response)
target_train_data = sp.coo_matrix(
(
target_pos_value.data[self.pos_train_index],
(target_pos_value.row[self.pos_train_index], target_pos_value.col[self.pos_train_index])
),
shape=target_response.shape
).toarray()
target_test_data = sp.coo_matrix(
(
target_pos_value.data[self.pos_test_index],
(target_pos_value.row[self.pos_test_index], target_pos_value.col[self.pos_test_index])
),
shape=target_response.shape
).toarray()
test_data = np.zeros(self.response_mat.shape, dtype=np.float32)
for i, value in enumerate(self.target_indexes):
train_data[:, value] = target_train_data[:, i]
test_data[:, value] = target_test_data[:, i]
return torch.from_numpy(train_data), torch.from_numpy(test_data)
def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Creates train and test masks with negative sampling for target indices."""
target_response = self.response_mat[:, self.target_indexes]
target_ones = np.ones(target_response.shape, dtype=np.float32)
target_neg_value = target_ones - target_response - self.null_mask[:, self.target_indexes]
target_neg_value = sp.coo_matrix(target_neg_value)
ids = np.arange(target_neg_value.data.shape[0])
target_neg_test_index = np.random.choice(ids, self.pos_test_index.shape[0], replace=False)
target_neg_test_mask = sp.coo_matrix(
(
target_neg_value.data[target_neg_test_index],
(target_neg_value.row[target_neg_test_index], target_neg_value.col[target_neg_test_index])
),
shape=target_response.shape
).toarray()
neg_test_mask = np.zeros(self.response_mat.shape, dtype=np.float32)
for i, value in enumerate(self.target_indexes):
neg_test_mask[:, value] = target_neg_test_mask[:, i]
other_neg_value = (
np.ones(self.response_mat.shape, dtype=np.float32)
- neg_test_mask
- self.response_mat
- self.null_mask
)
test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool)
train_mask = (self.train_data.numpy() + other_neg_value).astype(bool)
return torch.from_numpy(test_mask), torch.from_numpy(train_mask)
class ExterSampler:
"""
Samples train/test data and masks based on row indices.
"""
def __init__(
self,
original_adj_mat: np.ndarray,
null_mask: np.ndarray,
train_index: np.ndarray,
test_index: np.ndarray
) -> None:
self.adj_mat = original_adj_mat
self.null_mask = null_mask
self.train_index = train_index
self.test_index = test_index
self.train_data, self.test_data = self._sample_train_test_data()
self.train_mask, self.test_mask = self._sample_train_test_mask()
def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Samples train and test data based on row indices."""
test_data = self.adj_mat.copy()
test_data[self.train_index, :] = 0
train_data = self.adj_mat - test_data
return torch.from_numpy(train_data), torch.from_numpy(test_data)
def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
"""Creates train and test masks with negative sampling."""
neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask
neg_train = neg_value.copy()
neg_train[self.test_index, :] = 0
neg_test = neg_value.copy()
neg_test[self.train_index, :] = 0
train_mask = (self.train_data.numpy() + neg_train).astype(bool)
test_mask = (self.test_data.numpy() + neg_test).astype(bool)
return torch.from_numpy(train_mask), torch.from_numpy(test_mask)
class RegressionSampler(object):
def __init__(self, adj_mat_original, train_index, test_index, null_mask):
super(RegressionSampler, self).__init__()
if isinstance(adj_mat_original, torch.Tensor):
adj_mat_np = adj_mat_original.cpu().numpy()
else:
adj_mat_np = adj_mat_original.copy()
self.full_data = torch.FloatTensor(adj_mat_np)
rows, cols = adj_mat_np.shape
train_mask = np.zeros((rows, cols), dtype=bool)
test_mask = np.zeros((rows, cols), dtype=bool)
for idx in train_index:
row = idx // cols
col = idx % cols
if not null_mask[row, col]:
train_mask[row, col] = True
for idx in test_index:
row = idx // cols
col = idx % cols
if not null_mask[row, col]:
test_mask[row, col] = True
self.train_mask = torch.BoolTensor(train_mask)
self.test_mask = torch.BoolTensor(test_mask)
self.train_data = self.full_data.clone()
self.test_data = self.full_data.clone()
assert not torch.any(self.train_mask & self.test_mask), "Train and test masks have overlap!"
def get_train_indices(self):
indices = torch.nonzero(self.train_mask)
return indices
def get_test_indices(self):
indices = torch.nonzero(self.test_mask)
return indices

+ 8
- 0
requirements.txt View File

@@ -0,0 +1,8 @@
pubchempy
torch==1.13.0
numpy
scipy
pandas
scikit-learn
seaborn
hickle

+ 401
- 0
utils.py View File

@@ -0,0 +1,401 @@
import os
import time
from typing import Tuple, List, Union, Optional
import torch
import numpy as np
import pandas as pd
import torch.nn as nn
import seaborn as sns
import pubchempy as pcp
import scipy.sparse as sp
from sklearn.metrics import roc_auc_score, average_precision_score
import itertools as it
import torch.nn.functional as F
# ----------------------------------------------------------------------------
# Model Evaluation Functions
# ----------------------------------------------------------------------------
def roc_auc(true_data: torch.Tensor, predict_data: torch.Tensor) -> float:
"""Calculate ROC-AUC score for binary classification."""
assert torch.all((true_data >= 0) & (true_data <= 1)), "True labels must be 0 or 1"
return roc_auc_score(true_data.cpu().numpy(), predict_data.cpu().numpy())
def ap_score(true_data: torch.Tensor, predict_data: torch.Tensor) -> float:
"""Calculate Average Precision (area under Precision-Recall curve)."""
assert torch.all((true_data >= 0) & (true_data <= 1)), "True labels must be 0 or 1"
return average_precision_score(true_data.cpu().numpy(), predict_data.cpu().numpy())
def f1_score_binary(true_data: torch.Tensor, predict_data: torch.Tensor) -> Tuple[float, float]:
"""Calculate F1 score and the optimal threshold for binary classification."""
thresholds = torch.unique(predict_data)
n_samples = true_data.size(0)
ones = torch.ones((thresholds.size(0), n_samples), device=true_data.device)
zeros = torch.zeros((thresholds.size(0), n_samples), device=true_data.device)
predict_value = torch.where(predict_data.view(1, -1) >= thresholds.view(-1, 1), ones, zeros)
tpn = torch.sum(torch.where(predict_value == true_data.view(1, -1), ones, zeros), dim=1)
tp = torch.sum(predict_value * true_data.view(1, -1), dim=1)
scores = (2 * tp) / (n_samples + 2 * tp - tpn)
max_f1_score = torch.max(scores)
threshold = thresholds[torch.argmax(scores)]
return max_f1_score.item(), threshold.item()
def accuracy_binary(true_data: torch.Tensor, predict_data: torch.Tensor, threshold: float) -> float:
"""Calculate accuracy using the specified threshold."""
predict_value = torch.where(predict_data >= threshold, 1.0, 0.0)
correct = torch.sum(predict_value == true_data).float()
return (correct / true_data.size(0)).item()
def precision_binary(true_data: torch.Tensor, predict_data: torch.Tensor, threshold: float) -> float:
"""Calculate precision using the specified threshold."""
predict_value = torch.where(predict_data >= threshold, 1.0, 0.0)
tp = torch.sum(true_data * predict_value)
fp = torch.sum((1 - true_data) * predict_value)
return (tp / (tp + fp + 1e-8)).item()
def recall_binary(true_data: torch.Tensor, predict_data: torch.Tensor, threshold: float) -> float:
"""Calculate recall using the specified threshold."""
predict_value = torch.where(predict_data >= threshold, 1.0, 0.0)
tp = torch.sum(true_data * predict_value)
fn = torch.sum(true_data * (1 - predict_value))
return (tp / (tp + fn + 1e-8)).item()
def mcc_binary(true_data: torch.Tensor, predict_data: torch.Tensor, threshold: float) -> float:
"""Calculate Matthews Correlation Coefficient (MCC) using the specified threshold."""
predict_value = torch.where(predict_data >= threshold, 1.0, 0.0)
true_neg = 1 - true_data
predict_neg = 1 - predict_value
tp = torch.sum(true_data * predict_value)
tn = torch.sum(true_neg * predict_neg)
fp = torch.sum(true_neg * predict_value)
fn = torch.sum(true_data * predict_neg)
denominator = torch.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn) + 1e-8)
return ((tp * tn - fp * fn) / denominator).item()
def evaluate_all(true_data: torch.Tensor, predict_data: torch.Tensor) -> Tuple[float, ...]:
"""Evaluate multiple metrics: ROC-AUC, AP, accuracy, F1, and MCC."""
assert torch.all((true_data >= 0) & (true_data <= 1)), "True labels must be 0 or 1"
auc = roc_auc(true_data, predict_data)
ap = ap_score(true_data, predict_data)
f1, threshold = f1_score_binary(true_data, predict_data)
acc = accuracy_binary(true_data, predict_data, threshold)
precision = precision_binary(true_data, predict_data, threshold)
recall = recall_binary(true_data, predict_data, threshold)
mcc = mcc_binary(true_data, predict_data, threshold)
return auc, ap, acc, f1, mcc, precision, recall
def evaluate_auc(true_data: torch.Tensor, predict_data: torch.Tensor) -> Tuple[float, float]:
"""Calculate ROC-AUC and Average Precision."""
assert torch.all((true_data >= 0) & (true_data <= 1)), "True labels must be 0 or 1"
auc = roc_auc_score(true_data.cpu().numpy(), predict_data.cpu().numpy())
ap = average_precision_score(true_data.cpu().numpy(), predict_data.cpu().numpy())
return auc, ap
# ----------------------------------------------------------------------------
# Loss Functions
# ----------------------------------------------------------------------------
def cross_entropy_loss(true_data: torch.Tensor, predict_data: torch.Tensor, masked: torch.Tensor) -> torch.Tensor:
"""Calculate masked binary cross-entropy loss."""
masked = masked.to(torch.bool)
true_data = torch.masked_select(true_data, masked)
pred_data = torch.masked_select(predict_data, masked)
return nn.BCELoss()(pred_data, true_data)
def mse_loss(true_data: torch.Tensor, predict_data: torch.Tensor, masked: torch.Tensor) -> torch.Tensor:
"""Calculate masked mean squared error loss."""
true_data = true_data * masked
predict_data = predict_data * masked
return nn.MSELoss()(predict_data, true_data)
def prototypical_loss(
cell_emb: torch.Tensor,
drug_emb: torch.Tensor,
adj_matrix: Union[torch.Tensor, np.ndarray],
margin: float = 2.0
) -> torch.Tensor:
"""Calculate prototypical loss for positive and negative pairs."""
if isinstance(adj_matrix, torch.Tensor):
adj_matrix = sp.coo_matrix(adj_matrix.detach().cpu().numpy())
pos_pairs = torch.sum(cell_emb[adj_matrix.row] * drug_emb[adj_matrix.col], dim=1)
n_pos = len(adj_matrix.row)
cell_neg = torch.randint(0, cell_emb.size(0), (n_pos,), device=cell_emb.device)
drug_neg = torch.randint(0, drug_emb.size(0), (n_pos,), device=drug_emb.device)
neg_pairs = torch.sum(cell_emb[cell_neg] * drug_emb[drug_neg], dim=1)
labels = torch.ones_like(pos_pairs, device=cell_emb.device)
return F.margin_ranking_loss(pos_pairs, neg_pairs, labels, margin=margin)
# ----------------------------------------------------------------------------
# Correlation and Normalization Functions
# ----------------------------------------------------------------------------
def torch_z_normalized(tensor: torch.Tensor, dim: int = 0) -> torch.Tensor:
"""Apply z-normalization along the specified dimension."""
mean = tensor.mean(dim=1 - dim, keepdim=True)
std = tensor.std(dim=1 - dim, keepdim=True) + 1e-8
return (tensor - mean) / std
def torch_corr_x_y(x: torch.Tensor, y: torch.Tensor) -> torch.Tensor:
"""Compute correlation matrix between row vectors of two matrices."""
x_center = x - x.mean(dim=1, keepdim=True)
y_center = y - y.mean(dim=1, keepdim=True)
x_std = x.std(dim=1, keepdim=True) + 1e-8
y_std = y.std(dim=1, keepdim=True) + 1e-8
x_norm = x_center / x_std
y_norm = y_center / y_std
corr_matrix = x_norm @ y_norm.t() / (x.size(1) - 1)
return corr_matrix
# ----------------------------------------------------------------------------
# Distance and Similarity Functions
# ----------------------------------------------------------------------------
def torch_euclidean_dist(tensor: torch.Tensor, dim: int = 0) -> torch.Tensor:
"""Calculate Euclidean distance between rows or columns of a tensor."""
tensor_mul = torch.mm(tensor.t(), tensor) if dim else torch.mm(tensor, tensor.t())
diag = torch.diag(tensor_mul)
n_diag = diag.size(0)
tensor_diag = diag.repeat(n_diag, 1)
diag = diag.view(n_diag, -1)
dist = torch.sqrt(tensor_diag + diag - 2 * tensor_mul)
return dist
def exp_similarity(tensor: torch.Tensor, sigma: torch.Tensor, normalize: bool = True) -> torch.Tensor:
"""Calculate exponential similarity based on Euclidean distance."""
if normalize:
tensor = torch_z_normalized(tensor, dim=1)
tensor_dist = torch_euclidean_dist(tensor, dim=0)
return torch.exp(-tensor_dist / (2 * sigma.pow(2)))
def full_kernel(exp_dist: torch.Tensor) -> torch.Tensor:
"""Calculate full kernel matrix from exponential similarity."""
n = exp_dist.shape[0]
ones = torch.ones(n, n, device=exp_dist.device)
diag = torch.diag(ones)
mask_diag = (ones - diag) * exp_dist
mask_diag_sum = mask_diag.sum(dim=1, keepdim=True)
mask_diag = mask_diag / (2 * mask_diag_sum) + 0.5 * diag
return mask_diag
def sparse_kernel(exp_dist: torch.Tensor, k: int) -> torch.Tensor:
"""Calculate sparse kernel using k-nearest neighbors."""
n = exp_dist.shape[0]
maxk = torch.topk(exp_dist, k, dim=1)
mink_indices = torch.topk(exp_dist, n - k, dim=1, largest=False).indices
exp_dist[torch.arange(n, device=exp_dist.device).view(n, -1), mink_indices] = 0
knn_sum = maxk.values.sum(dim=1, keepdim=True)
return exp_dist / knn_sum
def scale_sigmoid(tensor: torch.Tensor, alpha: float) -> torch.Tensor:
"""Apply scaled sigmoid transformation."""
alpha = torch.tensor(alpha, dtype=torch.float32, device=tensor.device)
return torch.sigmoid(alpha * tensor)
# ----------------------------------------------------------------------------
# Data Processing and Helper Functions
# ----------------------------------------------------------------------------
def init_seeds(seed: int = 0) -> None:
"""Initialize random seeds for reproducibility."""
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def distribute_compute(
lr_list: List[float],
wd_list: List[float],
scale_list: List[float],
layer_size: List[int],
sigma_list: List[float],
beta_list: List[float],
workers: int,
id: int
) -> np.ndarray:
"""Distribute hyperparameter combinations across workers."""
all_combinations = [
[lr, wd, sc, la, sg, bt]
for lr, wd, sc, la, sg, bt in it.product(lr_list, wd_list, scale_list, layer_size, sigma_list, beta_list)
]
return np.array_split(all_combinations, workers)[id]
def get_fingerprint(cid: int) -> np.ndarray:
"""Retrieve PubChem fingerprint for a given compound CID."""
compound = pcp.Compound.from_cid(cid)
fingerprint = "".join(f"{int(bit, 16):04b}" for bit in compound.fingerprint)
return np.array([int(b) for b in fingerprint], dtype=np.int32)
def save_fingerprint(cid_list: List[int], last_cid: int, fpath: str) -> None:
"""Save fingerprints for a list of compound CIDs to disk."""
start_idx = np.where(np.array(cid_list) == last_cid)[0][0] + 1 if last_cid > 0 else 0
for cid in cid_list[start_idx:]:
fingerprint = get_fingerprint(cid)
np.save(os.path.join(fpath, str(cid)), fingerprint)
print(f"CID {cid} processed successfully.")
time.sleep(1)
if start_idx >= len(cid_list):
print("All compounds have been processed!")
def read_fingerprint_cid(path: str) -> Tuple[np.ndarray, List[int]]:
"""Read fingerprints from .npy files in the specified directory."""
fingerprint = []
cids = []
for file_name in sorted(os.listdir(path)):
if file_name.endswith(".npy"):
cid = int(file_name.split(".")[0])
fing = np.load(os.path.join(path, file_name))
fingerprint.append(fing)
cids.append(cid)
fingerprint = np.array(fingerprint).reshape(-1, 920)
return fingerprint, cids
def common_data_index(data_for_index: np.ndarray, data_for_cmp: np.ndarray) -> np.ndarray:
"""Find indices of elements in data_for_index that exist in data_for_cmp."""
return np.where(np.isin(data_for_index, data_for_cmp))[0]
def to_coo_matrix(adj_mat: Union[np.ndarray, sp.coo_matrix]) -> sp.coo_matrix:
"""Convert input matrix to scipy.sparse.coo_matrix format."""
if not sp.isspmatrix_coo(adj_mat):
adj_mat = sp.coo_matrix(adj_mat)
return adj_mat
def mask(positive: sp.coo_matrix, negative: sp.coo_matrix, dtype: type = int) -> torch.Tensor:
"""Create a mask combining positive and negative edges."""
row = np.hstack((positive.row, negative.row))
col = np.hstack((positive.col, negative.col))
data = np.ones_like(row)
masked = sp.coo_matrix((data, (row, col)), shape=positive.shape).toarray().astype(dtype)
return torch.from_numpy(masked)
def to_tensor(positive: sp.coo_matrix, identity: bool = False) -> torch.Tensor:
"""Convert sparse matrix to torch.Tensor, optionally adding identity matrix."""
data = positive + sp.identity(positive.shape[0]) if identity else positive
return torch.from_numpy(data.toarray()).float()
def np_delete_value(arr: np.ndarray, obj: np.ndarray) -> np.ndarray:
"""Remove specified values from a NumPy array."""
indices = [np.where(arr == x)[0][0] for x in obj if x in arr]
return np.delete(arr, indices)
def translate_result(tensor: Union[torch.Tensor, np.ndarray]) -> pd.DataFrame:
"""Convert tensor or array to a pandas DataFrame."""
if isinstance(tensor, torch.Tensor):
tensor = tensor.detach().cpu().numpy()
return pd.DataFrame(tensor.reshape(1, -1))
def calculate_train_test_index(
response: np.ndarray,
pos_train_index: np.ndarray,
pos_test_index: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Calculate train and test indices combining positive and negative samples."""
neg_response_index = np.where(response == 0)[0]
neg_test_index = np.random.choice(neg_response_index, pos_test_index.shape[0], replace=False)
neg_train_index = np_delete_value(neg_response_index, neg_test_index)
test_index = np.hstack((pos_test_index, neg_test_index))
train_index = np.hstack((pos_train_index, neg_train_index))
return train_index, test_index
def dir_path(k: int = 1) -> str:
"""Get directory path by traversing k levels up from current file."""
fpath = os.path.realpath(__file__)
dir_name = os.path.dirname(fpath).replace("\\", "/")
for _ in range(k):
dir_name = os.path.dirname(dir_name)
return dir_name
def extract_row_data(data: pd.DataFrame, row: int) -> np.ndarray:
"""Extract non-NaN data from a specific row of a DataFrame."""
target = np.array(data.iloc[row], dtype=np.float32)
return target[~np.isnan(target)]
def transfer_data(data: pd.DataFrame, label: str) -> pd.DataFrame:
"""Add a label column to a DataFrame."""
data = data.copy()
data["label"] = label
return data
def link_data_frame(*data: pd.DataFrame) -> pd.DataFrame:
"""Concatenate multiple DataFrames vertically."""
return pd.concat(data, ignore_index=True)
def calculate_limit(*data: pd.DataFrame, key: Union[str, int]) -> Tuple[float, float]:
"""Calculate min and max values of a key across multiple DataFrames."""
temp = pd.concat(data, ignore_index=True)
return temp[key].min() - 0.1, temp[key].max() + 0.1
def delete_all_sub_str(string: str, sub: str, join_str: str = "") -> str:
"""Remove all occurrences of a substring and join with specified string."""
parts = string.split(sub)
parts = [p for p in parts if p]
return join_str.join(parts)
def get_best_index(fname: str) -> int:
"""Find the index of the AUC closest to the average AUC from a results file."""
with open(fname, "r") as file:
content = file.read().replace("\n", "")
auc_str = content.split("accs")[0].split(":")[1]
auc_str = delete_all_sub_str(auc_str, " ", ",").replace(",]", "]")
aucs = np.array(eval(auc_str))
avg_auc = float(content.split("avg_aucs")[1].split(":")[1].split()[0])
return np.argmin(np.abs(aucs - avg_auc))
def gather_color_code(*string: str) -> List[Tuple[float, float, float]]:
"""Map color names to seaborn color palette codes."""
color_str = ["bluea0"] = ["blue", "orange", "green", "red", "purple", "brown", "pink", "grey", "yellow", "cyan"]
palette = sns.color_palette()
color_map = dict(zip(color_str, palette))
return [color_map[color] for color in string]

Loading…
Cancel
Save