import scipy.io as sio import numpy as np import torch class DataLoader: """Data Loader class""" def __init__(self, S_train, S_val, S_test, data_dir, mat_file_path): self.S_train = S_train self.S_val = S_val self.S_test = S_test self.data_root_dir = data_dir self.mat_file_path = mat_file_path self.saved_content = sio.loadmat(self.data_root_dir + self.mat_file_path) self.T = np.max(self.saved_content["A_labels_subs"][0, :]) + 1 print("Number of total graphs: {}".format(self.T)) self.N = max(np.max(self.saved_content["A_labels_subs"][1, :]), np.max(self.saved_content["A_labels_subs"][2, :])) + 1 print("Number of nodes in each graph: {}".format(self.N)) self.A_size = torch.Size([self.T, self.N, self.N]) # size of the adjacency matrix self.C_size = torch.Size([self.T, self.N, self.N]) # similar to the adjacency matrix, C tensor has TxNxN shape # labels of the edges self.A_labels = torch.sparse.FloatTensor(torch.tensor(self.saved_content["A_labels_subs"], dtype=torch.long), torch.squeeze(torch.tensor(self.saved_content["A_labels_vals"])), self.A_size).coalesce() # Laplacian Transformed of adjacency matrix self.C = torch.sparse.FloatTensor(torch.tensor(self.saved_content["C_subs"], dtype=torch.long), torch.squeeze(torch.tensor(self.saved_content["C_vals"])), self.C_size).coalesce() # adjacency matrix self.A = torch.sparse.FloatTensor(self.A_labels._indices(), torch.ones(self.A_labels._values().shape), self.A_size).coalesce() # create node features self.X = self.create_node_features() def split_data(self): C_train = [] for j in range(self.S_train): idx = self.C._indices()[0] == j C_train.append(torch.sparse.FloatTensor(self.C._indices()[1:3, idx], self.C._values()[idx])) C_val = [] for j in range(self.S_train, self.S_train + self.S_val): idx = self.C._indices()[0] == j C_val.append(torch.sparse.FloatTensor(self.C._indices()[1:3, idx], self.C._values()[idx])) C_test = [] for j in range(self.S_train+self.S_test, self.S_train + self.S_val + self.S_test): idx = self.C._indices()[0] == j C_test.append(torch.sparse.FloatTensor(self.C._indices()[1:3, idx], self.C._values()[idx])) C = {'C_train': C_train, 'C_val': C_val, 'C_test': C_test} X_train = self.X[0:self.S_train].double() X_val = self.X[self.S_train:self.S_train + self.S_val].double() X_test = self.X[self.S_train + self.S_val:].double() data = {'X_train' : X_train, 'X_val': X_val, 'X_test': X_test} return data, C def get_edges_and_labels(self): # training subs_train = self.A_labels._indices()[0] < self.S_train edges_train = self.A_labels._indices()[:, subs_train] labels_train = torch.sign(self.A_labels._values()[subs_train]) target_train = (labels_train != -1).long() # element = 0 if class = -1; and 1 if class is 0 or +1 # validation subs_val = (self.A_labels._indices()[0] >= self.S_train) & (self.A_labels._indices()[0] < self.S_train + self.S_val) edges_val = self.A_labels._indices()[:, subs_val] edges_val[0] -= self.S_train labels_val = torch.sign(self.A_labels._values()[subs_val]) target_val = (labels_val != -1).long() # Testing subs_test = (self.A_labels._indices()[0] >= self.S_train + self.S_val) edges_test = self.A_labels._indices()[:, subs_test] edges_test[0] -= (self.S_train + self.S_val) labels_test = torch.sign(self.A_labels._values()[subs_test]) target_test = (labels_test != -1).long() targets = {'target_train': target_train, 'target_val': target_val, 'target_test': target_test} edges = {'edges_train': edges_train, 'edges_val': edges_val, 'edges_test': edges_test} return targets, edges def create_node_features(self): X = torch.zeros(self.A.shape[0], self.A.shape[1], 2) X[:, :, 0] = torch.sparse.sum(self.A, 1).to_dense() # number of outgoing edges X[:, :, 1] = torch.sparse.sum(self.A, 2).to_dense() # number of in coming edges return X def load_data(self): print("Loading the data...") data, C = self.split_data() targets, edges = self.get_edges_and_labels() print("======================") return data, C, targets, edges