123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- from collections import OrderedDict
- import torch
- import torch.nn as nn
- from torch.nn import functional as F
-
- class Embedding(nn.Module):
- def __init__(self, num_ent, parameter):
- super(Embedding, self).__init__()
- # self.device = torch.device('cuda:0')
- self.device = torch.device(parameter['device'])
- self.es = parameter['embed_dim']
-
- self.embedding = nn.Embedding(num_ent + 1, self.es)
- nn.init.xavier_uniform_(self.embedding.weight)
-
- def forward(self, triples):
- idx = [[[t[0], t[2]] for t in batch] for batch in triples]
- idx = torch.LongTensor(idx).to(self.device)
- return self.embedding(idx)
-
- class MetaLearner(nn.Module):
- def __init__(self, K, embed_size=100, num_hidden1=500, num_hidden2=200, out_size=100, dropout_p=0.5):
- super(MetaLearner, self).__init__()
- self.embed_size = embed_size
- self.K = K
- # self.out_size = out_size
- # self.hidden_size = out_size
- self.out_size = embed_size
- self.hidden_size = embed_size
- self.rnn = nn.LSTM(embed_size,self.hidden_size,2,dropout=0.2)
- # nn.init.xavier_normal_(self.rnn.all_weights)
-
- def forward(self, inputs):
- size = inputs.shape
- x = torch.stack([inputs[:,0,0,:],inputs[:,0,1,:],inputs[:,1,1,:]],dim=1)
- x = x.transpose(0,1)
-
- _,(x,c) = self.rnn(x)
- x = x[-1]
- x = x.squeeze(0)
-
- return x.view(size[0], 1, 1, self.out_size)
-
-
- class EmbeddingLearner(nn.Module):
- def __init__(self):
- super(EmbeddingLearner, self).__init__()
-
- def forward(self, h, t, r, pos_num):
- score = -torch.norm(h + r - t, 2, -1).squeeze(2)
- p_score = score[:, :pos_num]
- n_score = score[:, pos_num:]
- return p_score, n_score
-
-
- class MetaTL(nn.Module):
- def __init__(self, itemnum, parameter):
- super(MetaTL, self).__init__()
- self.device = torch.device(parameter['device'])
- self.beta = parameter['beta']
- # self.dropout_p = parameter['dropout_p']
- self.embed_dim = parameter['embed_dim']
- self.margin = parameter['margin']
- self.embedding = Embedding(itemnum, parameter)
-
- self.relation_learner = MetaLearner(parameter['K'] - 1, embed_size=self.embed_dim, num_hidden1=500,
- num_hidden2=200, out_size=100, dropout_p=0)
-
- self.embedding_learner = EmbeddingLearner()
- self.loss_func = nn.MarginRankingLoss(self.margin)
- self.rel_q_sharing = dict()
-
- def split_concat(self, positive, negative):
- pos_neg_e1 = torch.cat([positive[:, :, 0, :],
- negative[:, :, 0, :]], 1).unsqueeze(2)
- pos_neg_e2 = torch.cat([positive[:, :, 1, :],
- negative[:, :, 1, :]], 1).unsqueeze(2)
- return pos_neg_e1, pos_neg_e2
-
- def forward(self, task, iseval=False, curr_rel=''):
- # transfer task string into embedding
- support, support_negative, query, negative = [self.embedding(t) for t in task]
-
- K = support.shape[1] # num of K
- num_sn = support_negative.shape[1] # num of support negative
- num_q = query.shape[1] # num of query
- num_n = negative.shape[1] # num of query negative
-
- rel = self.relation_learner(support)
- rel.retain_grad()
-
- rel_s = rel.expand(-1, K+num_sn, -1, -1)
-
- if iseval and curr_rel != '' and curr_rel in self.rel_q_sharing.keys():
- rel_q = self.rel_q_sharing[curr_rel]
- else:
- sup_neg_e1, sup_neg_e2 = self.split_concat(support, support_negative)
-
- p_score, n_score = self.embedding_learner(sup_neg_e1, sup_neg_e2, rel_s, K)
-
- y = torch.Tensor([1]).to(self.device)
- self.zero_grad()
- loss = self.loss_func(p_score, n_score, y)
- loss.backward(retain_graph=True)
-
- grad_meta = rel.grad
- rel_q = rel - self.beta*grad_meta
-
-
- self.rel_q_sharing[curr_rel] = rel_q
-
- rel_q = rel_q.expand(-1, num_q + num_n, -1, -1)
-
- que_neg_e1, que_neg_e2 = self.split_concat(query, negative)
- p_score, n_score = self.embedding_learner(que_neg_e1, que_neg_e2, rel_q, num_q)
-
- return p_score, n_score
|