import torch import numpy as np import scipy.sparse as sp from typing import Tuple, Optional from utils import to_coo_matrix, to_tensor, mask class RandomSampler: """ Samples edges from an adjacency matrix to create train/test sets. Converts the training set into torch.Tensor format. """ def __init__( self, adj_mat_original: np.ndarray, train_index: np.ndarray, test_index: np.ndarray, null_mask: np.ndarray ) -> None: self.adj_mat = to_coo_matrix(adj_mat_original) self.train_index = train_index self.test_index = test_index self.null_mask = null_mask # Sample positive edges self.train_pos = self._sample_edges(train_index) self.test_pos = self._sample_edges(test_index) # Sample negative edges self.train_neg, self.test_neg = self._sample_negative_edges() # Create masks self.train_mask = mask(self.train_pos, self.train_neg, dtype=int) self.test_mask = mask(self.test_pos, self.test_neg, dtype=bool) # Convert to tensors self.train_data = to_tensor(self.train_pos) self.test_data = to_tensor(self.test_pos) def _sample_edges(self, index: np.ndarray) -> sp.coo_matrix: """Samples edges from the adjacency matrix based on provided indices.""" row = self.adj_mat.row[index] col = self.adj_mat.col[index] data = self.adj_mat.data[index] return sp.coo_matrix( (data, (row, col)), shape=self.adj_mat.shape ) def _sample_negative_edges(self) -> Tuple[sp.coo_matrix, sp.coo_matrix]: """ Samples negative edges for training and testing. Negative edges are those not present in the adjacency matrix. """ pos_adj_mat = self.null_mask + self.adj_mat.toarray() neg_adj_mat = sp.coo_matrix(np.abs(pos_adj_mat - 1)) all_row, all_col, all_data = neg_adj_mat.row, neg_adj_mat.col, neg_adj_mat.data indices = np.arange(all_data.shape[0]) # Sample negative test edges test_n = self.test_index.shape[0] test_neg_indices = np.random.choice(indices, test_n, replace=False) test_row, test_col, test_data = ( all_row[test_neg_indices], all_col[test_neg_indices], all_data[test_neg_indices] ) test_neg = sp.coo_matrix( (test_data, (test_row, test_col)), shape=self.adj_mat.shape ) # Sample negative train edges train_neg_indices = np.delete(indices, test_neg_indices) train_row, train_col, train_data = ( all_row[train_neg_indices], all_col[train_neg_indices], all_data[train_neg_indices] ) train_neg = sp.coo_matrix( (train_data, (train_row, train_col)), shape=self.adj_mat.shape ) return train_neg, test_neg class NewSampler: """ Samples train/test data and masks for a specific target dimension/index. """ def __init__( self, original_adj_mat: np.ndarray, null_mask: np.ndarray, target_dim: Optional[int], target_index: int ) -> None: self.adj_mat = original_adj_mat self.null_mask = null_mask self.dim = target_dim self.target_index = target_index self.train_data, self.test_data = self._sample_train_test_data() self.train_mask, self.test_mask = self._sample_train_test_mask() def _sample_target_test_index(self) -> np.ndarray: """Samples indices for positive test edges based on target dimension.""" if self.dim: return np.where(self.adj_mat[:, self.target_index] == 1)[0] return np.where(self.adj_mat[self.target_index, :] == 1)[0] def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: """Samples train and test data based on target indices.""" test_data = np.zeros(self.adj_mat.shape, dtype=np.float32) test_index = self._sample_target_test_index() if self.dim: test_data[test_index, self.target_index] = 1 else: test_data[self.target_index, test_index] = 1 train_data = self.adj_mat - test_data return torch.from_numpy(train_data), torch.from_numpy(test_data) def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: """Creates train and test masks, including negative sampling.""" test_index = self._sample_target_test_index() neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32) if self.dim: target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0] else: target_neg_index = np.where(neg_value[self.target_index, :] == 1)[0] target_neg_test_index = ( np.random.choice(target_neg_index, len(test_index), replace=False) if len(test_index) < len(target_neg_index) else target_neg_index ) if self.dim: neg_test_mask[target_neg_test_index, self.target_index] = 1 neg_value[:, self.target_index] = 0 else: neg_test_mask[self.target_index, target_neg_test_index] = 1 neg_value[self.target_index, :] = 0 train_mask = (self.train_data.numpy() + neg_value).astype(bool) test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) return torch.from_numpy(train_mask), torch.from_numpy(test_mask) class SingleSampler: """ Samples train/test data and masks for a specific target index. Returns results as torch.Tensor. """ def __init__( self, origin_adj_mat: np.ndarray, null_mask: np.ndarray, target_index: int, train_index: np.ndarray, test_index: np.ndarray ) -> None: self.adj_mat = origin_adj_mat self.null_mask = null_mask self.target_index = target_index self.train_index = train_index self.test_index = test_index self.train_data, self.test_data = self._sample_train_test_data() self.train_mask, self.test_mask = self._sample_train_test_mask() def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: """Samples train and test data for the target index.""" test_data = np.zeros(self.adj_mat.shape, dtype=np.float32) test_data[self.test_index, self.target_index] = 1 train_data = self.adj_mat - test_data return torch.from_numpy(train_data), torch.from_numpy(test_data) def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: """Creates train and test masks with negative sampling.""" neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32) target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0] target_neg_test_index = np.random.choice(target_neg_index, len(self.test_index), replace=False) neg_test_mask[target_neg_test_index, self.target_index] = 1 neg_value[target_neg_test_index, self.target_index] = 0 train_mask = (self.train_data.numpy() + neg_value).astype(bool) test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) return torch.from_numpy(train_mask), torch.from_numpy(test_mask) class TargetSampler(object): """ Samples train/test data and masks for multiple target indices. """ def __init__(self, response_mat: np.ndarray, null_mask: np.ndarray, target_indexes: np.ndarray, pos_train_index: np.ndarray, pos_test_index: np.ndarray): self.response_mat = response_mat self.null_mask = null_mask self.target_indexes = target_indexes self.pos_train_index = pos_train_index self.pos_test_index = pos_test_index self.train_data, self.test_data = self.sample_train_test_data() self.train_mask, self.test_mask = self.sample_train_test_mask() def sample_train_test_data(self): n_target = self.target_indexes.shape[0] target_response = self.response_mat[:, self.target_indexes].reshape((-1, n_target)) train_data = self.response_mat.copy() train_data[:, self.target_indexes] = 0 target_pos_value = sp.coo_matrix(target_response) target_train_data = sp.coo_matrix((target_pos_value.data[self.pos_train_index], (target_pos_value.row[self.pos_train_index], target_pos_value.col[self.pos_train_index])), shape=target_response.shape).toarray() target_test_data = sp.coo_matrix((target_pos_value.data[self.pos_test_index], (target_pos_value.row[self.pos_test_index], target_pos_value.col[self.pos_test_index])), shape=target_response.shape).toarray() test_data = np.zeros(self.response_mat.shape, dtype=np.float32) for i, value in enumerate(self.target_indexes): train_data[:, value] = target_train_data[:, i] test_data[:, value] = target_test_data[:, i] train_data = torch.from_numpy(train_data) test_data = torch.from_numpy(test_data) return train_data, test_data def sample_train_test_mask(self): target_response = self.response_mat[:, self.target_indexes] target_ones = np.ones(target_response.shape, dtype=np.float32) target_neg_value = target_ones - target_response - self.null_mask[:, self.target_indexes] target_neg_value = sp.coo_matrix(target_neg_value) ids = np.arange(target_neg_value.data.shape[0]) target_neg_test_index = np.random.choice(ids, self.pos_test_index.shape[0], replace=False) target_neg_test_mask = sp.coo_matrix((target_neg_value.data[target_neg_test_index], (target_neg_value.row[target_neg_test_index], target_neg_value.col[target_neg_test_index])), shape=target_response.shape).toarray() neg_test_mask = np.zeros(self.response_mat.shape, dtype=np.float32) for i, value in enumerate(self.target_indexes): neg_test_mask[:, value] = target_neg_test_mask[:, i] other_neg_value = np.ones(self.response_mat.shape, dtype=np.float32) - neg_test_mask - self.response_mat - self.null_mask test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) train_mask = (self.train_data.numpy() + other_neg_value).astype(bool) test_mask = torch.from_numpy(test_mask) train_mask = torch.from_numpy(train_mask) return train_mask, test_mask class ExterSampler: """ Samples train/test data and masks based on row indices. """ def __init__(self, original_adj_mat, null_mask, train_index, test_index): super(ExterSampler, self).__init__() self.adj_mat = original_adj_mat self.null_mask = null_mask self.train_index = train_index self.test_index = test_index self.train_data, self.test_data = self.sample_train_test_data() self.train_mask, self.test_mask = self.sample_train_test_mask() def sample_train_test_data(self): # If self.adj_mat is already a Tensor, convert it to a numpy array if isinstance(self.adj_mat, torch.Tensor): adj = self.adj_mat.cpu().numpy().copy() else: adj = self.adj_mat.copy() # Create test_data by copying and modifying the rows corresponding to train_index test_data = adj.copy() test_data[self.train_index, :] = 0 train_data = adj - test_data # Convert back to Tensor (since subsequent operations are performed on Tensors) train_data = torch.from_numpy(train_data) test_data = torch.from_numpy(test_data) return train_data, test_data def sample_train_test_mask(self): # Ensure that adj_mat and null_mask are numpy arrays if isinstance(self.adj_mat, torch.Tensor): adj = self.adj_mat.cpu().numpy() else: adj = self.adj_mat if isinstance(self.null_mask, torch.Tensor): null_mask = self.null_mask.cpu().numpy() else: null_mask = self.null_mask neg_value = np.ones(adj.shape, dtype=np.float32) neg_train = neg_value - adj - null_mask neg_train[self.test_index, :] = 0 neg_test = neg_value - adj - null_mask neg_test[self.train_index, :] = 0 train_data_np = self.train_data.cpu().numpy() if isinstance(self.train_data, torch.Tensor) else self.train_data test_data_np = self.test_data.cpu().numpy() if isinstance(self.test_data, torch.Tensor) else self.test_data train_mask = (train_data_np + neg_train).astype(bool) test_mask = (test_data_np + neg_test).astype(bool) train_mask = torch.from_numpy(train_mask) test_mask = torch.from_numpy(test_mask) return train_mask, test_mask class RegressionSampler(object): def __init__(self, adj_mat_original, train_index, test_index, null_mask): super(RegressionSampler, self).__init__() if isinstance(adj_mat_original, torch.Tensor): adj_mat_np = adj_mat_original.cpu().numpy() else: adj_mat_np = adj_mat_original.copy() self.full_data = torch.FloatTensor(adj_mat_np) rows, cols = adj_mat_np.shape train_mask = np.zeros((rows, cols), dtype=bool) test_mask = np.zeros((rows, cols), dtype=bool) for idx in train_index: row = idx // cols col = idx % cols if not null_mask[row, col]: train_mask[row, col] = True for idx in test_index: row = idx // cols col = idx % cols if not null_mask[row, col]: test_mask[row, col] = True self.train_mask = torch.BoolTensor(train_mask) self.test_mask = torch.BoolTensor(test_mask) self.train_data = self.full_data.clone() self.test_data = self.full_data.clone() assert not torch.any(self.train_mask & self.test_mask), "Train and test masks have overlap!" def get_train_indices(self): indices = torch.nonzero(self.train_mask) return indices def get_test_indices(self): indices = torch.nonzero(self.test_mask) return indices