| @@ -1,360 +1,337 @@ | |||
| import torch | |||
| import numpy as np | |||
| import scipy.sparse as sp | |||
| from typing import Tuple, Optional | |||
| from utils import to_coo_matrix, to_tensor, mask | |||
| class RandomSampler: | |||
| """ | |||
| Samples edges from an adjacency matrix to create train/test sets. | |||
| Converts the training set into torch.Tensor format. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| adj_mat_original: np.ndarray, | |||
| train_index: np.ndarray, | |||
| test_index: np.ndarray, | |||
| null_mask: np.ndarray | |||
| ) -> None: | |||
| self.adj_mat = to_coo_matrix(adj_mat_original) | |||
| self.train_index = train_index | |||
| self.test_index = test_index | |||
| self.null_mask = null_mask | |||
| # Sample positive edges | |||
| self.train_pos = self._sample_edges(train_index) | |||
| self.test_pos = self._sample_edges(test_index) | |||
| # Sample negative edges | |||
| self.train_neg, self.test_neg = self._sample_negative_edges() | |||
| # Create masks | |||
| self.train_mask = mask(self.train_pos, self.train_neg, dtype=int) | |||
| self.test_mask = mask(self.test_pos, self.test_neg, dtype=bool) | |||
| # Convert to tensors | |||
| self.train_data = to_tensor(self.train_pos) | |||
| self.test_data = to_tensor(self.test_pos) | |||
| def _sample_edges(self, index: np.ndarray) -> sp.coo_matrix: | |||
| """Samples edges from the adjacency matrix based on provided indices.""" | |||
| row = self.adj_mat.row[index] | |||
| col = self.adj_mat.col[index] | |||
| data = self.adj_mat.data[index] | |||
| return sp.coo_matrix( | |||
| (data, (row, col)), | |||
| shape=self.adj_mat.shape | |||
| ) | |||
| def _sample_negative_edges(self) -> Tuple[sp.coo_matrix, sp.coo_matrix]: | |||
| """ | |||
| Samples negative edges for training and testing. | |||
| Negative edges are those not present in the adjacency matrix. | |||
| """ | |||
| pos_adj_mat = self.null_mask + self.adj_mat.toarray() | |||
| neg_adj_mat = sp.coo_matrix(np.abs(pos_adj_mat - 1)) | |||
| all_row, all_col, all_data = neg_adj_mat.row, neg_adj_mat.col, neg_adj_mat.data | |||
| indices = np.arange(all_data.shape[0]) | |||
| # Sample negative test edges | |||
| test_n = self.test_index.shape[0] | |||
| test_neg_indices = np.random.choice(indices, test_n, replace=False) | |||
| test_row, test_col, test_data = ( | |||
| all_row[test_neg_indices], | |||
| all_col[test_neg_indices], | |||
| all_data[test_neg_indices] | |||
| ) | |||
| test_neg = sp.coo_matrix( | |||
| (test_data, (test_row, test_col)), | |||
| shape=self.adj_mat.shape | |||
| ) | |||
| # Sample negative train edges | |||
| train_neg_indices = np.delete(indices, test_neg_indices) | |||
| train_row, train_col, train_data = ( | |||
| all_row[train_neg_indices], | |||
| all_col[train_neg_indices], | |||
| all_data[train_neg_indices] | |||
| ) | |||
| train_neg = sp.coo_matrix( | |||
| (train_data, (train_row, train_col)), | |||
| shape=self.adj_mat.shape | |||
| ) | |||
| return train_neg, test_neg | |||
| class NewSampler: | |||
| """ | |||
| Samples train/test data and masks for a specific target dimension/index. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| original_adj_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| target_dim: Optional[int], | |||
| target_index: int | |||
| ) -> None: | |||
| self.adj_mat = original_adj_mat | |||
| self.null_mask = null_mask | |||
| self.dim = target_dim | |||
| self.target_index = target_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_target_test_index(self) -> np.ndarray: | |||
| """Samples indices for positive test edges based on target dimension.""" | |||
| if self.dim: | |||
| return np.where(self.adj_mat[:, self.target_index] == 1)[0] | |||
| return np.where(self.adj_mat[self.target_index, :] == 1)[0] | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data based on target indices.""" | |||
| test_data = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| test_index = self._sample_target_test_index() | |||
| if self.dim: | |||
| test_data[test_index, self.target_index] = 1 | |||
| else: | |||
| test_data[self.target_index, test_index] = 1 | |||
| train_data = self.adj_mat - test_data | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks, including negative sampling.""" | |||
| test_index = self._sample_target_test_index() | |||
| neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask | |||
| neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| if self.dim: | |||
| target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0] | |||
| else: | |||
| target_neg_index = np.where(neg_value[self.target_index, :] == 1)[0] | |||
| target_neg_test_index = ( | |||
| np.random.choice(target_neg_index, len(test_index), replace=False) | |||
| if len(test_index) < len(target_neg_index) | |||
| else target_neg_index | |||
| ) | |||
| if self.dim: | |||
| neg_test_mask[target_neg_test_index, self.target_index] = 1 | |||
| neg_value[:, self.target_index] = 0 | |||
| else: | |||
| neg_test_mask[self.target_index, target_neg_test_index] = 1 | |||
| neg_value[self.target_index, :] = 0 | |||
| train_mask = (self.train_data.numpy() + neg_value).astype(bool) | |||
| test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) | |||
| return torch.from_numpy(train_mask), torch.from_numpy(test_mask) | |||
| class SingleSampler: | |||
| """ | |||
| Samples train/test data and masks for a specific target index. | |||
| Returns results as torch.Tensor. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| origin_adj_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| target_index: int, | |||
| train_index: np.ndarray, | |||
| test_index: np.ndarray | |||
| ) -> None: | |||
| self.adj_mat = origin_adj_mat | |||
| self.null_mask = null_mask | |||
| self.target_index = target_index | |||
| self.train_index = train_index | |||
| self.test_index = test_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data for the target index.""" | |||
| test_data = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| test_data[self.test_index, self.target_index] = 1 | |||
| train_data = self.adj_mat - test_data | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks with negative sampling.""" | |||
| neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask | |||
| neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0] | |||
| target_neg_test_index = np.random.choice(target_neg_index, len(self.test_index), replace=False) | |||
| neg_test_mask[target_neg_test_index, self.target_index] = 1 | |||
| neg_value[target_neg_test_index, self.target_index] = 0 | |||
| train_mask = (self.train_data.numpy() + neg_value).astype(bool) | |||
| test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) | |||
| return torch.from_numpy(train_mask), torch.from_numpy(test_mask) | |||
| class TargetSampler: | |||
| """ | |||
| Samples train/test data and masks for multiple target indices. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| response_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| target_indexes: np.ndarray, | |||
| pos_train_index: np.ndarray, | |||
| pos_test_index: np.ndarray | |||
| ) -> None: | |||
| self.response_mat = response_mat | |||
| self.null_mask = null_mask | |||
| self.target_indexes = target_indexes | |||
| self.pos_train_index = pos_train_index | |||
| self.pos_test_index = pos_test_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data for multiple target indices.""" | |||
| n_target = self.target_indexes.shape[0] | |||
| target_response = self.response_mat[:, self.target_indexes].reshape((-1, n_target)) | |||
| train_data = self.response_mat.copy() | |||
| train_data[:, self.target_indexes] = 0 | |||
| target_pos_value = sp.coo_matrix(target_response) | |||
| target_train_data = sp.coo_matrix( | |||
| ( | |||
| target_pos_value.data[self.pos_train_index], | |||
| (target_pos_value.row[self.pos_train_index], target_pos_value.col[self.pos_train_index]) | |||
| ), | |||
| shape=target_response.shape | |||
| ).toarray() | |||
| target_test_data = sp.coo_matrix( | |||
| ( | |||
| target_pos_value.data[self.pos_test_index], | |||
| (target_pos_value.row[self.pos_test_index], target_pos_value.col[self.pos_test_index]) | |||
| ), | |||
| shape=target_response.shape | |||
| ).toarray() | |||
| test_data = np.zeros(self.response_mat.shape, dtype=np.float32) | |||
| for i, value in enumerate(self.target_indexes): | |||
| train_data[:, value] = target_train_data[:, i] | |||
| test_data[:, value] = target_test_data[:, i] | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks with negative sampling for target indices.""" | |||
| target_response = self.response_mat[:, self.target_indexes] | |||
| target_ones = np.ones(target_response.shape, dtype=np.float32) | |||
| target_neg_value = target_ones - target_response - self.null_mask[:, self.target_indexes] | |||
| target_neg_value = sp.coo_matrix(target_neg_value) | |||
| ids = np.arange(target_neg_value.data.shape[0]) | |||
| target_neg_test_index = np.random.choice(ids, self.pos_test_index.shape[0], replace=False) | |||
| target_neg_test_mask = sp.coo_matrix( | |||
| ( | |||
| target_neg_value.data[target_neg_test_index], | |||
| (target_neg_value.row[target_neg_test_index], target_neg_value.col[target_neg_test_index]) | |||
| ), | |||
| shape=target_response.shape | |||
| ).toarray() | |||
| neg_test_mask = np.zeros(self.response_mat.shape, dtype=np.float32) | |||
| for i, value in enumerate(self.target_indexes): | |||
| neg_test_mask[:, value] = target_neg_test_mask[:, i] | |||
| other_neg_value = ( | |||
| np.ones(self.response_mat.shape, dtype=np.float32) | |||
| - neg_test_mask | |||
| - self.response_mat | |||
| - self.null_mask | |||
| ) | |||
| test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) | |||
| train_mask = (self.train_data.numpy() + other_neg_value).astype(bool) | |||
| return torch.from_numpy(test_mask), torch.from_numpy(train_mask) | |||
| class ExterSampler: | |||
| """ | |||
| Samples train/test data and masks based on row indices. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| original_adj_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| train_index: np.ndarray, | |||
| test_index: np.ndarray | |||
| ) -> None: | |||
| self.adj_mat = original_adj_mat | |||
| self.null_mask = null_mask | |||
| self.train_index = train_index | |||
| self.test_index = test_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data based on row indices.""" | |||
| test_data = self.adj_mat.copy() | |||
| test_data[self.train_index, :] = 0 | |||
| train_data = self.adj_mat - test_data | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks with negative sampling.""" | |||
| neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask | |||
| neg_train = neg_value.copy() | |||
| neg_train[self.test_index, :] = 0 | |||
| neg_test = neg_value.copy() | |||
| neg_test[self.train_index, :] = 0 | |||
| train_mask = (self.train_data.numpy() + neg_train).astype(bool) | |||
| test_mask = (self.test_data.numpy() + neg_test).astype(bool) | |||
| return torch.from_numpy(train_mask), torch.from_numpy(test_mask) | |||
| class RegressionSampler(object): | |||
| def __init__(self, adj_mat_original, train_index, test_index, null_mask): | |||
| super(RegressionSampler, self).__init__() | |||
| if isinstance(adj_mat_original, torch.Tensor): | |||
| adj_mat_np = adj_mat_original.cpu().numpy() | |||
| else: | |||
| adj_mat_np = adj_mat_original.copy() | |||
| self.full_data = torch.FloatTensor(adj_mat_np) | |||
| rows, cols = adj_mat_np.shape | |||
| train_mask = np.zeros((rows, cols), dtype=bool) | |||
| test_mask = np.zeros((rows, cols), dtype=bool) | |||
| for idx in train_index: | |||
| row = idx // cols | |||
| col = idx % cols | |||
| if not null_mask[row, col]: | |||
| train_mask[row, col] = True | |||
| for idx in test_index: | |||
| row = idx // cols | |||
| col = idx % cols | |||
| if not null_mask[row, col]: | |||
| test_mask[row, col] = True | |||
| self.train_mask = torch.BoolTensor(train_mask) | |||
| self.test_mask = torch.BoolTensor(test_mask) | |||
| self.train_data = self.full_data.clone() | |||
| self.test_data = self.full_data.clone() | |||
| assert not torch.any(self.train_mask & self.test_mask), "Train and test masks have overlap!" | |||
| def get_train_indices(self): | |||
| indices = torch.nonzero(self.train_mask) | |||
| return indices | |||
| def get_test_indices(self): | |||
| indices = torch.nonzero(self.test_mask) | |||
| return indices | |||
| import torch | |||
| import numpy as np | |||
| import scipy.sparse as sp | |||
| from typing import Tuple, Optional | |||
| from utils import to_coo_matrix, to_tensor, mask | |||
| class RandomSampler: | |||
| """ | |||
| Samples edges from an adjacency matrix to create train/test sets. | |||
| Converts the training set into torch.Tensor format. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| adj_mat_original: np.ndarray, | |||
| train_index: np.ndarray, | |||
| test_index: np.ndarray, | |||
| null_mask: np.ndarray | |||
| ) -> None: | |||
| self.adj_mat = to_coo_matrix(adj_mat_original) | |||
| self.train_index = train_index | |||
| self.test_index = test_index | |||
| self.null_mask = null_mask | |||
| # Sample positive edges | |||
| self.train_pos = self._sample_edges(train_index) | |||
| self.test_pos = self._sample_edges(test_index) | |||
| # Sample negative edges | |||
| self.train_neg, self.test_neg = self._sample_negative_edges() | |||
| # Create masks | |||
| self.train_mask = mask(self.train_pos, self.train_neg, dtype=int) | |||
| self.test_mask = mask(self.test_pos, self.test_neg, dtype=bool) | |||
| # Convert to tensors | |||
| self.train_data = to_tensor(self.train_pos) | |||
| self.test_data = to_tensor(self.test_pos) | |||
| def _sample_edges(self, index: np.ndarray) -> sp.coo_matrix: | |||
| """Samples edges from the adjacency matrix based on provided indices.""" | |||
| row = self.adj_mat.row[index] | |||
| col = self.adj_mat.col[index] | |||
| data = self.adj_mat.data[index] | |||
| return sp.coo_matrix( | |||
| (data, (row, col)), | |||
| shape=self.adj_mat.shape | |||
| ) | |||
| def _sample_negative_edges(self) -> Tuple[sp.coo_matrix, sp.coo_matrix]: | |||
| """ | |||
| Samples negative edges for training and testing. | |||
| Negative edges are those not present in the adjacency matrix. | |||
| """ | |||
| pos_adj_mat = self.null_mask + self.adj_mat.toarray() | |||
| neg_adj_mat = sp.coo_matrix(np.abs(pos_adj_mat - 1)) | |||
| all_row, all_col, all_data = neg_adj_mat.row, neg_adj_mat.col, neg_adj_mat.data | |||
| indices = np.arange(all_data.shape[0]) | |||
| # Sample negative test edges | |||
| test_n = self.test_index.shape[0] | |||
| test_neg_indices = np.random.choice(indices, test_n, replace=False) | |||
| test_row, test_col, test_data = ( | |||
| all_row[test_neg_indices], | |||
| all_col[test_neg_indices], | |||
| all_data[test_neg_indices] | |||
| ) | |||
| test_neg = sp.coo_matrix( | |||
| (test_data, (test_row, test_col)), | |||
| shape=self.adj_mat.shape | |||
| ) | |||
| # Sample negative train edges | |||
| train_neg_indices = np.delete(indices, test_neg_indices) | |||
| train_row, train_col, train_data = ( | |||
| all_row[train_neg_indices], | |||
| all_col[train_neg_indices], | |||
| all_data[train_neg_indices] | |||
| ) | |||
| train_neg = sp.coo_matrix( | |||
| (train_data, (train_row, train_col)), | |||
| shape=self.adj_mat.shape | |||
| ) | |||
| return train_neg, test_neg | |||
| class NewSampler: | |||
| """ | |||
| Samples train/test data and masks for a specific target dimension/index. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| original_adj_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| target_dim: Optional[int], | |||
| target_index: int | |||
| ) -> None: | |||
| self.adj_mat = original_adj_mat | |||
| self.null_mask = null_mask | |||
| self.dim = target_dim | |||
| self.target_index = target_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_target_test_index(self) -> np.ndarray: | |||
| """Samples indices for positive test edges based on target dimension.""" | |||
| if self.dim: | |||
| return np.where(self.adj_mat[:, self.target_index] == 1)[0] | |||
| return np.where(self.adj_mat[self.target_index, :] == 1)[0] | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data based on target indices.""" | |||
| test_data = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| test_index = self._sample_target_test_index() | |||
| if self.dim: | |||
| test_data[test_index, self.target_index] = 1 | |||
| else: | |||
| test_data[self.target_index, test_index] = 1 | |||
| train_data = self.adj_mat - test_data | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks, including negative sampling.""" | |||
| test_index = self._sample_target_test_index() | |||
| neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask | |||
| neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| if self.dim: | |||
| target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0] | |||
| else: | |||
| target_neg_index = np.where(neg_value[self.target_index, :] == 1)[0] | |||
| target_neg_test_index = ( | |||
| np.random.choice(target_neg_index, len(test_index), replace=False) | |||
| if len(test_index) < len(target_neg_index) | |||
| else target_neg_index | |||
| ) | |||
| if self.dim: | |||
| neg_test_mask[target_neg_test_index, self.target_index] = 1 | |||
| neg_value[:, self.target_index] = 0 | |||
| else: | |||
| neg_test_mask[self.target_index, target_neg_test_index] = 1 | |||
| neg_value[self.target_index, :] = 0 | |||
| train_mask = (self.train_data.numpy() + neg_value).astype(bool) | |||
| test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) | |||
| return torch.from_numpy(train_mask), torch.from_numpy(test_mask) | |||
| class SingleSampler: | |||
| """ | |||
| Samples train/test data and masks for a specific target index. | |||
| Returns results as torch.Tensor. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| origin_adj_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| target_index: int, | |||
| train_index: np.ndarray, | |||
| test_index: np.ndarray | |||
| ) -> None: | |||
| self.adj_mat = origin_adj_mat | |||
| self.null_mask = null_mask | |||
| self.target_index = target_index | |||
| self.train_index = train_index | |||
| self.test_index = test_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data for the target index.""" | |||
| test_data = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| test_data[self.test_index, self.target_index] = 1 | |||
| train_data = self.adj_mat - test_data | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks with negative sampling.""" | |||
| neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask | |||
| neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32) | |||
| target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0] | |||
| target_neg_test_index = np.random.choice(target_neg_index, len(self.test_index), replace=False) | |||
| neg_test_mask[target_neg_test_index, self.target_index] = 1 | |||
| neg_value[target_neg_test_index, self.target_index] = 0 | |||
| train_mask = (self.train_data.numpy() + neg_value).astype(bool) | |||
| test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) | |||
| return torch.from_numpy(train_mask), torch.from_numpy(test_mask) | |||
| class TargetSampler(object): | |||
| """ | |||
| Samples train/test data and masks for multiple target indices. | |||
| """ | |||
| def __init__(self, response_mat: np.ndarray, null_mask: np.ndarray, target_indexes: np.ndarray, | |||
| pos_train_index: np.ndarray, pos_test_index: np.ndarray): | |||
| self.response_mat = response_mat | |||
| self.null_mask = null_mask | |||
| self.target_indexes = target_indexes | |||
| self.pos_train_index = pos_train_index | |||
| self.pos_test_index = pos_test_index | |||
| self.train_data, self.test_data = self.sample_train_test_data() | |||
| self.train_mask, self.test_mask = self.sample_train_test_mask() | |||
| def sample_train_test_data(self): | |||
| n_target = self.target_indexes.shape[0] | |||
| target_response = self.response_mat[:, self.target_indexes].reshape((-1, n_target)) | |||
| train_data = self.response_mat.copy() | |||
| train_data[:, self.target_indexes] = 0 | |||
| target_pos_value = sp.coo_matrix(target_response) | |||
| target_train_data = sp.coo_matrix((target_pos_value.data[self.pos_train_index], | |||
| (target_pos_value.row[self.pos_train_index], | |||
| target_pos_value.col[self.pos_train_index])), | |||
| shape=target_response.shape).toarray() | |||
| target_test_data = sp.coo_matrix((target_pos_value.data[self.pos_test_index], | |||
| (target_pos_value.row[self.pos_test_index], | |||
| target_pos_value.col[self.pos_test_index])), | |||
| shape=target_response.shape).toarray() | |||
| test_data = np.zeros(self.response_mat.shape, dtype=np.float32) | |||
| for i, value in enumerate(self.target_indexes): | |||
| train_data[:, value] = target_train_data[:, i] | |||
| test_data[:, value] = target_test_data[:, i] | |||
| train_data = torch.from_numpy(train_data) | |||
| test_data = torch.from_numpy(test_data) | |||
| return train_data, test_data | |||
| def sample_train_test_mask(self): | |||
| target_response = self.response_mat[:, self.target_indexes] | |||
| target_ones = np.ones(target_response.shape, dtype=np.float32) | |||
| target_neg_value = target_ones - target_response - self.null_mask[:, self.target_indexes] | |||
| target_neg_value = sp.coo_matrix(target_neg_value) | |||
| ids = np.arange(target_neg_value.data.shape[0]) | |||
| target_neg_test_index = np.random.choice(ids, self.pos_test_index.shape[0], replace=False) | |||
| target_neg_test_mask = sp.coo_matrix((target_neg_value.data[target_neg_test_index], | |||
| (target_neg_value.row[target_neg_test_index], | |||
| target_neg_value.col[target_neg_test_index])), | |||
| shape=target_response.shape).toarray() | |||
| neg_test_mask = np.zeros(self.response_mat.shape, dtype=np.float32) | |||
| for i, value in enumerate(self.target_indexes): | |||
| neg_test_mask[:, value] = target_neg_test_mask[:, i] | |||
| other_neg_value = np.ones(self.response_mat.shape, | |||
| dtype=np.float32) - neg_test_mask - self.response_mat - self.null_mask | |||
| test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool) | |||
| train_mask = (self.train_data.numpy() + other_neg_value).astype(bool) | |||
| test_mask = torch.from_numpy(test_mask) | |||
| train_mask = torch.from_numpy(train_mask) | |||
| return train_mask, test_mask | |||
| class ExterSampler: | |||
| """ | |||
| Samples train/test data and masks based on row indices. | |||
| """ | |||
| def __init__( | |||
| self, | |||
| original_adj_mat: np.ndarray, | |||
| null_mask: np.ndarray, | |||
| train_index: np.ndarray, | |||
| test_index: np.ndarray | |||
| ) -> None: | |||
| self.adj_mat = original_adj_mat | |||
| self.null_mask = null_mask | |||
| self.train_index = train_index | |||
| self.test_index = test_index | |||
| self.train_data, self.test_data = self._sample_train_test_data() | |||
| self.train_mask, self.test_mask = self._sample_train_test_mask() | |||
| def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Samples train and test data based on row indices.""" | |||
| test_data = self.adj_mat.copy() | |||
| test_data[self.train_index, :] = 0 | |||
| train_data = self.adj_mat - test_data | |||
| return torch.from_numpy(train_data), torch.from_numpy(test_data) | |||
| def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]: | |||
| """Creates train and test masks with negative sampling.""" | |||
| neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask | |||
| neg_train = neg_value.copy() | |||
| neg_train[self.test_index, :] = 0 | |||
| neg_test = neg_value.copy() | |||
| neg_test[self.train_index, :] = 0 | |||
| train_mask = (self.train_data.numpy() + neg_train).astype(bool) | |||
| test_mask = (self.test_data.numpy() + neg_test).astype(bool) | |||
| return torch.from_numpy(train_mask), torch.from_numpy(test_mask) | |||
| class RegressionSampler(object): | |||
| def __init__(self, adj_mat_original, train_index, test_index, null_mask): | |||
| super(RegressionSampler, self).__init__() | |||
| if isinstance(adj_mat_original, torch.Tensor): | |||
| adj_mat_np = adj_mat_original.cpu().numpy() | |||
| else: | |||
| adj_mat_np = adj_mat_original.copy() | |||
| self.full_data = torch.FloatTensor(adj_mat_np) | |||
| rows, cols = adj_mat_np.shape | |||
| train_mask = np.zeros((rows, cols), dtype=bool) | |||
| test_mask = np.zeros((rows, cols), dtype=bool) | |||
| for idx in train_index: | |||
| row = idx // cols | |||
| col = idx % cols | |||
| if not null_mask[row, col]: | |||
| train_mask[row, col] = True | |||
| for idx in test_index: | |||
| row = idx // cols | |||
| col = idx % cols | |||
| if not null_mask[row, col]: | |||
| test_mask[row, col] = True | |||
| self.train_mask = torch.BoolTensor(train_mask) | |||
| self.test_mask = torch.BoolTensor(test_mask) | |||
| self.train_data = self.full_data.clone() | |||
| self.test_data = self.full_data.clone() | |||
| assert not torch.any(self.train_mask & self.test_mask), "Train and test masks have overlap!" | |||
| def get_train_indices(self): | |||
| indices = torch.nonzero(self.train_mask) | |||
| return indices | |||
| def get_test_indices(self): | |||
| indices = torch.nonzero(self.test_mask) | |||
| return indices | |||