123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360 |
- import torch
- import numpy as np
- import scipy.sparse as sp
- from typing import Tuple, Optional
- from utils import to_coo_matrix, to_tensor, mask
-
-
- class RandomSampler:
- """
- Samples edges from an adjacency matrix to create train/test sets.
- Converts the training set into torch.Tensor format.
- """
- def __init__(
- self,
- adj_mat_original: np.ndarray,
- train_index: np.ndarray,
- test_index: np.ndarray,
- null_mask: np.ndarray
- ) -> None:
- self.adj_mat = to_coo_matrix(adj_mat_original)
- self.train_index = train_index
- self.test_index = test_index
- self.null_mask = null_mask
-
- # Sample positive edges
- self.train_pos = self._sample_edges(train_index)
- self.test_pos = self._sample_edges(test_index)
-
- # Sample negative edges
- self.train_neg, self.test_neg = self._sample_negative_edges()
-
- # Create masks
- self.train_mask = mask(self.train_pos, self.train_neg, dtype=int)
- self.test_mask = mask(self.test_pos, self.test_neg, dtype=bool)
-
- # Convert to tensors
- self.train_data = to_tensor(self.train_pos)
- self.test_data = to_tensor(self.test_pos)
-
- def _sample_edges(self, index: np.ndarray) -> sp.coo_matrix:
- """Samples edges from the adjacency matrix based on provided indices."""
- row = self.adj_mat.row[index]
- col = self.adj_mat.col[index]
- data = self.adj_mat.data[index]
- return sp.coo_matrix(
- (data, (row, col)),
- shape=self.adj_mat.shape
- )
-
- def _sample_negative_edges(self) -> Tuple[sp.coo_matrix, sp.coo_matrix]:
- """
- Samples negative edges for training and testing.
- Negative edges are those not present in the adjacency matrix.
- """
- pos_adj_mat = self.null_mask + self.adj_mat.toarray()
- neg_adj_mat = sp.coo_matrix(np.abs(pos_adj_mat - 1))
- all_row, all_col, all_data = neg_adj_mat.row, neg_adj_mat.col, neg_adj_mat.data
- indices = np.arange(all_data.shape[0])
-
- # Sample negative test edges
- test_n = self.test_index.shape[0]
- test_neg_indices = np.random.choice(indices, test_n, replace=False)
- test_row, test_col, test_data = (
- all_row[test_neg_indices],
- all_col[test_neg_indices],
- all_data[test_neg_indices]
- )
- test_neg = sp.coo_matrix(
- (test_data, (test_row, test_col)),
- shape=self.adj_mat.shape
- )
-
- # Sample negative train edges
- train_neg_indices = np.delete(indices, test_neg_indices)
- train_row, train_col, train_data = (
- all_row[train_neg_indices],
- all_col[train_neg_indices],
- all_data[train_neg_indices]
- )
- train_neg = sp.coo_matrix(
- (train_data, (train_row, train_col)),
- shape=self.adj_mat.shape
- )
-
- return train_neg, test_neg
-
-
- class NewSampler:
- """
- Samples train/test data and masks for a specific target dimension/index.
- """
- def __init__(
- self,
- original_adj_mat: np.ndarray,
- null_mask: np.ndarray,
- target_dim: Optional[int],
- target_index: int
- ) -> None:
- self.adj_mat = original_adj_mat
- self.null_mask = null_mask
- self.dim = target_dim
- self.target_index = target_index
- self.train_data, self.test_data = self._sample_train_test_data()
- self.train_mask, self.test_mask = self._sample_train_test_mask()
-
- def _sample_target_test_index(self) -> np.ndarray:
- """Samples indices for positive test edges based on target dimension."""
- if self.dim:
- return np.where(self.adj_mat[:, self.target_index] == 1)[0]
- return np.where(self.adj_mat[self.target_index, :] == 1)[0]
-
- def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Samples train and test data based on target indices."""
- test_data = np.zeros(self.adj_mat.shape, dtype=np.float32)
- test_index = self._sample_target_test_index()
-
- if self.dim:
- test_data[test_index, self.target_index] = 1
- else:
- test_data[self.target_index, test_index] = 1
-
- train_data = self.adj_mat - test_data
- return torch.from_numpy(train_data), torch.from_numpy(test_data)
-
- def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Creates train and test masks, including negative sampling."""
- test_index = self._sample_target_test_index()
- neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask
- neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32)
-
- if self.dim:
- target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0]
- else:
- target_neg_index = np.where(neg_value[self.target_index, :] == 1)[0]
-
- target_neg_test_index = (
- np.random.choice(target_neg_index, len(test_index), replace=False)
- if len(test_index) < len(target_neg_index)
- else target_neg_index
- )
-
- if self.dim:
- neg_test_mask[target_neg_test_index, self.target_index] = 1
- neg_value[:, self.target_index] = 0
- else:
- neg_test_mask[self.target_index, target_neg_test_index] = 1
- neg_value[self.target_index, :] = 0
-
- train_mask = (self.train_data.numpy() + neg_value).astype(bool)
- test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool)
- return torch.from_numpy(train_mask), torch.from_numpy(test_mask)
-
-
- class SingleSampler:
- """
- Samples train/test data and masks for a specific target index.
- Returns results as torch.Tensor.
- """
- def __init__(
- self,
- origin_adj_mat: np.ndarray,
- null_mask: np.ndarray,
- target_index: int,
- train_index: np.ndarray,
- test_index: np.ndarray
- ) -> None:
- self.adj_mat = origin_adj_mat
- self.null_mask = null_mask
- self.target_index = target_index
- self.train_index = train_index
- self.test_index = test_index
- self.train_data, self.test_data = self._sample_train_test_data()
- self.train_mask, self.test_mask = self._sample_train_test_mask()
-
- def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Samples train and test data for the target index."""
- test_data = np.zeros(self.adj_mat.shape, dtype=np.float32)
- test_data[self.test_index, self.target_index] = 1
- train_data = self.adj_mat - test_data
- return torch.from_numpy(train_data), torch.from_numpy(test_data)
-
- def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Creates train and test masks with negative sampling."""
- neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask
- neg_test_mask = np.zeros(self.adj_mat.shape, dtype=np.float32)
-
- target_neg_index = np.where(neg_value[:, self.target_index] == 1)[0]
- target_neg_test_index = np.random.choice(target_neg_index, len(self.test_index), replace=False)
- neg_test_mask[target_neg_test_index, self.target_index] = 1
- neg_value[target_neg_test_index, self.target_index] = 0
-
- train_mask = (self.train_data.numpy() + neg_value).astype(bool)
- test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool)
- return torch.from_numpy(train_mask), torch.from_numpy(test_mask)
-
-
- class TargetSampler:
- """
- Samples train/test data and masks for multiple target indices.
- """
- def __init__(
- self,
- response_mat: np.ndarray,
- null_mask: np.ndarray,
- target_indexes: np.ndarray,
- pos_train_index: np.ndarray,
- pos_test_index: np.ndarray
- ) -> None:
- self.response_mat = response_mat
- self.null_mask = null_mask
- self.target_indexes = target_indexes
- self.pos_train_index = pos_train_index
- self.pos_test_index = pos_test_index
- self.train_data, self.test_data = self._sample_train_test_data()
- self.train_mask, self.test_mask = self._sample_train_test_mask()
-
- def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Samples train and test data for multiple target indices."""
- n_target = self.target_indexes.shape[0]
- target_response = self.response_mat[:, self.target_indexes].reshape((-1, n_target))
- train_data = self.response_mat.copy()
- train_data[:, self.target_indexes] = 0
-
- target_pos_value = sp.coo_matrix(target_response)
- target_train_data = sp.coo_matrix(
- (
- target_pos_value.data[self.pos_train_index],
- (target_pos_value.row[self.pos_train_index], target_pos_value.col[self.pos_train_index])
- ),
- shape=target_response.shape
- ).toarray()
- target_test_data = sp.coo_matrix(
- (
- target_pos_value.data[self.pos_test_index],
- (target_pos_value.row[self.pos_test_index], target_pos_value.col[self.pos_test_index])
- ),
- shape=target_response.shape
- ).toarray()
-
- test_data = np.zeros(self.response_mat.shape, dtype=np.float32)
- for i, value in enumerate(self.target_indexes):
- train_data[:, value] = target_train_data[:, i]
- test_data[:, value] = target_test_data[:, i]
-
- return torch.from_numpy(train_data), torch.from_numpy(test_data)
-
- def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Creates train and test masks with negative sampling for target indices."""
- target_response = self.response_mat[:, self.target_indexes]
- target_ones = np.ones(target_response.shape, dtype=np.float32)
- target_neg_value = target_ones - target_response - self.null_mask[:, self.target_indexes]
- target_neg_value = sp.coo_matrix(target_neg_value)
-
- ids = np.arange(target_neg_value.data.shape[0])
- target_neg_test_index = np.random.choice(ids, self.pos_test_index.shape[0], replace=False)
- target_neg_test_mask = sp.coo_matrix(
- (
- target_neg_value.data[target_neg_test_index],
- (target_neg_value.row[target_neg_test_index], target_neg_value.col[target_neg_test_index])
- ),
- shape=target_response.shape
- ).toarray()
-
- neg_test_mask = np.zeros(self.response_mat.shape, dtype=np.float32)
- for i, value in enumerate(self.target_indexes):
- neg_test_mask[:, value] = target_neg_test_mask[:, i]
-
- other_neg_value = (
- np.ones(self.response_mat.shape, dtype=np.float32)
- - neg_test_mask
- - self.response_mat
- - self.null_mask
- )
-
- test_mask = (self.test_data.numpy() + neg_test_mask).astype(bool)
- train_mask = (self.train_data.numpy() + other_neg_value).astype(bool)
- return torch.from_numpy(test_mask), torch.from_numpy(train_mask)
-
-
- class ExterSampler:
- """
- Samples train/test data and masks based on row indices.
- """
- def __init__(
- self,
- original_adj_mat: np.ndarray,
- null_mask: np.ndarray,
- train_index: np.ndarray,
- test_index: np.ndarray
- ) -> None:
- self.adj_mat = original_adj_mat
- self.null_mask = null_mask
- self.train_index = train_index
- self.test_index = test_index
- self.train_data, self.test_data = self._sample_train_test_data()
- self.train_mask, self.test_mask = self._sample_train_test_mask()
-
- def _sample_train_test_data(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Samples train and test data based on row indices."""
- test_data = self.adj_mat.copy()
- test_data[self.train_index, :] = 0
- train_data = self.adj_mat - test_data
- return torch.from_numpy(train_data), torch.from_numpy(test_data)
-
- def _sample_train_test_mask(self) -> Tuple[torch.Tensor, torch.Tensor]:
- """Creates train and test masks with negative sampling."""
- neg_value = np.ones(self.adj_mat.shape, dtype=np.float32) - self.adj_mat - self.null_mask
- neg_train = neg_value.copy()
- neg_train[self.test_index, :] = 0
- neg_test = neg_value.copy()
- neg_test[self.train_index, :] = 0
-
- train_mask = (self.train_data.numpy() + neg_train).astype(bool)
- test_mask = (self.test_data.numpy() + neg_test).astype(bool)
- return torch.from_numpy(train_mask), torch.from_numpy(test_mask)
-
-
- class RegressionSampler(object):
- def __init__(self, adj_mat_original, train_index, test_index, null_mask):
- super(RegressionSampler, self).__init__()
-
- if isinstance(adj_mat_original, torch.Tensor):
- adj_mat_np = adj_mat_original.cpu().numpy()
- else:
- adj_mat_np = adj_mat_original.copy()
-
- self.full_data = torch.FloatTensor(adj_mat_np)
-
- rows, cols = adj_mat_np.shape
- train_mask = np.zeros((rows, cols), dtype=bool)
- test_mask = np.zeros((rows, cols), dtype=bool)
-
- for idx in train_index:
- row = idx // cols
- col = idx % cols
- if not null_mask[row, col]:
- train_mask[row, col] = True
-
- for idx in test_index:
- row = idx // cols
- col = idx % cols
- if not null_mask[row, col]:
- test_mask[row, col] = True
-
- self.train_mask = torch.BoolTensor(train_mask)
- self.test_mask = torch.BoolTensor(test_mask)
-
- self.train_data = self.full_data.clone()
- self.test_data = self.full_data.clone()
-
- assert not torch.any(self.train_mask & self.test_mask), "Train and test masks have overlap!"
-
- def get_train_indices(self):
- indices = torch.nonzero(self.train_mask)
- return indices
-
- def get_test_indices(self):
- indices = torch.nonzero(self.test_mask)
- return indices
-
|