from models import * import os import sys import shutil import logging import numpy as np import random import copy from operator import itemgetter import gc class Trainer: def __init__(self, data_loaders, itemnum, parameter,user_train_num,user_train): # print(user_train) self.parameter = parameter # data loader self.train_data_loader = data_loaders[0] self.dev_data_loader = data_loaders[1] self.test_data_loader = data_loaders[2] # parameters self.batch_size = parameter['batch_size'] self.learning_rate = parameter['learning_rate'] self.epoch = parameter['epoch'] self.device = parameter['device'] self.MetaTL = MetaTL(itemnum, parameter) self.MetaTL.to(parameter['device']) self.optimizer = torch.optim.Adam(self.MetaTL.parameters(), self.learning_rate) if parameter['eval_epoch']: self.eval_epoch = parameter['eval_epoch'] else: self.eval_epoch = 1000 self.varset_size = parameter['varset_size'] self.user_train = user_train self.warmup = parameter['warmup'] self.alpha = parameter['alpha'] self.S1 = parameter['S1'] self.S2_div_S1 = parameter['S2_div_S1'] self.temperature = parameter['temperature'] self.itemnum = itemnum self.user_train_num = user_train_num # init the two candidate sets for monitoring variance self.candidate_cur = np.random.choice(itemnum, [user_train_num + 1, self.varset_size]) # for i in range(1,user_train_num+1): # for j in range(self.varset_size): # while self.candidate_cur[i, j] in user_train[i]: # self.candidate_cur[i, j] = random.randint(1, itemnum) # self.candidate_nxt = [np.random.choice(itemnum, [user_train_num+1, self.varset_size]) for _ in range(5)] # for c in range(5): # for i in range(1,user_train_num+1): # for j in range(self.varset_size): # while self.candidate_nxt[c][i, j] in user_train[i]: # self.candidate_nxt[c][i, j] = random.randint(1, itemnum) self.Mu_idx = {} for i in range(user_train_num + 1): Mu_idx_tmp = random.sample(list(range(self.varset_size)), self.S1) self.Mu_idx[i] = Mu_idx_tmp # todo : calculate score of positive items self.score_cand_cur = {} self.score_pos_cur = {} # final candidate after execution of change_mu (after one_step) (for later epochs) self.final_negative_items = {} def change_mu(self, p_score, n_score, epoch_cur, users, train_task): negitems = {} negitems_candidates_all = {} # for i in users: # negitems_candidates_all[i] = self.Mu_idx[i] negitems_candidates_all = self.Mu_idx.copy() ratings_positems = p_score.cpu().detach().numpy() ratings_positems = np.reshape(ratings_positems, [-1]) # added cnt = 0 for i in users: self.score_pos_cur[i] = ratings_positems[cnt] cnt += 1 Mu_items_all = {index: value[negitems_candidates_all[i]] for index, value in enumerate(self.candidate_cur)} task = np.array(train_task[2]) task = np.tile(task, reps=(1, self.S1, 1)) task[:, :, 2] = np.array(itemgetter(*users)(Mu_items_all)) ratings_candidates_all = self.MetaTL.fast_forward(task, users) hisscore_candidates_all = [self.score_cand_cur[i][:, negitems_candidates_all[i]] for user in users] hisscore_pos_all = ratings_positems.copy() hisscore_candidates_all = np.array(hisscore_candidates_all).transpose((1, 0, 2)) hisscore_pos_all = np.array(hisscore_pos_all) hisscore_pos_all = hisscore_pos_all[:, np.newaxis] hisscore_pos_all = np.tile(hisscore_pos_all, (hisscore_candidates_all.shape[0], 1, 1)) hislikelihood_candidates_all = 1 / (1 + np.exp(hisscore_pos_all - hisscore_candidates_all)) mean_candidates_all = np.mean(hislikelihood_candidates_all[:, :], axis=0) variance_candidates_all = np.zeros(mean_candidates_all.shape) for i in range(hislikelihood_candidates_all.shape[0]): variance_candidates_all += (hislikelihood_candidates_all[i, :, :] - mean_candidates_all) ** 2 variance_candidates_all = np.sqrt(variance_candidates_all / hislikelihood_candidates_all.shape[0]) likelihood_candidates_all = \ 1 / (1 + np.exp(np.expand_dims(ratings_positems, -1) - ratings_candidates_all)) # Top sampling strategy by score + alpha * std item_arg_all = None if self.alpha >= 0: # item_arg_all = np.argmax(likelihood_candidates_all + # self.alpha * min(1, epoch_cur / self.warmup) # * variance_candidates_all, axis=1) a = likelihood_candidates_all + self.alpha * min(1, epoch_cur / self.warmup) * variance_candidates_all item_arg_all = np.argpartition(a, kth=(-2), axis=1) item_arg_all = np.array(item_arg_all)[:, -2:] else: item_arg_all = np.argmax(variance_candidates_all, axis=1) # negitems = { user : self.candidate_cur[user][negitems_candidates_all[user][item_arg_all[index]]] for index,user in enumerate(users)} negitems0 = { user : self.candidate_cur[user][negitems_candidates_all[user][item_arg_all[index][0]]] for index,user in enumerate(users)} negitems1 = { user : self.candidate_cur[user][negitems_candidates_all[user][item_arg_all[index][1]]] for index,user in enumerate(users)} ############################### for i in users: self.final_negative_items[i] = [negitems0[i],negitems1[i]] ############################### # update Mu negitems_mu_candidates = {} for i in users: Mu_set = set(self.Mu_idx[i]) while len(self.Mu_idx[i]) < self.S1 * (1 + self.S2_div_S1): random_item = random.randint(0, self.candidate_cur.shape[1] - 1) while random_item in Mu_set: random_item = random.randint(0, self.candidate_cur.shape[1] - 1) self.Mu_idx[i].append(random_item) negitems_mu_candidates[i] = self.Mu_idx[i] negitems_mu = {} negitems_mu = {user:self.candidate_cur[user][negitems_mu_candidates[user]] for user in users} task = np.array(train_task[2]) task = np.tile(task, reps=(1, self.S1 * (1 + self.S2_div_S1), 1)) task[:, :, 2] = np.array(itemgetter(*users)(negitems_mu)) ratings_mu_candidates = self.MetaTL.fast_forward(task, users) ratings_mu_candidates = ratings_mu_candidates / self.temperature if np.any(np.isnan(ratings_mu_candidates)): print("nan happend in ratings_mu_candidates") ratings_mu_candidates = np.nan_to_num(ratings_mu_candidates) ratings_mu_candidates = np.exp(ratings_mu_candidates) / np.reshape( np.sum(np.exp(ratings_mu_candidates), axis=1), [-1, 1]) if np.any(np.isnan(ratings_mu_candidates)): print("nan happend__2 in ratings_mu_candidates") ratings_mu_candidates = self.MetaTL.fast_forward(task, users) ratings_mu_candidates = ratings_mu_candidates / self.temperature ratings_mu_candidates = ratings_mu_candidates + 10 ratings_mu_candidates = np.exp(ratings_mu_candidates) / np.reshape( np.sum(np.exp(ratings_mu_candidates), axis=1), [-1, 1]) user_set = set() cnt = 0 for i in users: if i in user_set: continue else: user_set.add(i) cache_arg = np.random.choice(self.S1 * (1 + self.S2_div_S1), self.S1, p=ratings_mu_candidates[cnt], replace=False) self.Mu_idx[i] = np.array(self.Mu_idx[i])[cache_arg].tolist() cnt += 1 second_cand = 0 del negitems, ratings_positems, Mu_items_all, task, ratings_candidates_all, hisscore_candidates_all, hisscore_pos_all del hislikelihood_candidates_all, mean_candidates_all, variance_candidates_all, likelihood_candidates_all, second_cand del negitems_mu, ratings_mu_candidates, user_set gc.collect() def change_candidate(self, epoch_count): score_1epoch_nxt = [] for c in range(5): # todo: implement proper funciton pred = self.MetaTL(self.MetaTL.rel_q_sharing.keys(), self.candidate_nxt[c]) score_1epoch_nxt.append(np.array(pred)) # score_1epoch_nxt.append(np.array(/ # [EvalUser.predict_fast(model, sess, num_user, num_item, parallel_users=100, # predict_data=candidate_nxt[c])])) # score_1epoch_pos = np.array( # [EvalUser.predict_pos(model, sess, num_user, max_posid, parallel_users=100, predict_data=train_pos)]) # todo: implement proper function score_1epoch_pos = self.MetaTL(user_train, train_data) # delete the score_cand_cur[0,:,:] at the earlist timestamp if epoch_count >= 5 or epoch_count == 0: self.score_pos_cur = np.delete(self.score_pos_cur, 0, 0) for c in range(5): self.score_cand_nxt[c] = np.concatenate([self.score_cand_nxt[c], score_1epoch_nxt[c]], axis=0) self.score_pos_cur = np.concatenate([self.score_pos_cur, score_1epoch_pos], axis=0) score_cand_cur = np.copy(self.score_cand_nxt[0]) candidate_cur = np.copy(self.candidate_nxt[0]) for c in range(4): self.candidate_nxt[c] = np.copy(self.candidate_nxt[c + 1]) self.score_cand_nxt[c] = np.copy(self.score_cand_nxt[c + 1]) self.candidate_nxt[4] = np.random.choice(list(range(1, self.itemnum)), [self.user_train_num, self.varset_size]) for i in range(self.user_train_num): for j in range(self.varset_size): while self.candidate_nxt[4][i, j] in self.user_train[i]: self.candidate_nxt[4][i, j] = random.randint(0, self.itemnum - 1) self.score_cand_nxt[4] = np.delete(self.score_cand_nxt[4], list(range(5)), 0) def rank_predict(self, data, x, ranks): # query_idx is the idx of positive score query_idx = x.shape[0] - 1 # sort all scores with descending, because more plausible triple has higher score _, idx = torch.sort(x, descending=True) rank = list(idx.cpu().numpy()).index(query_idx) + 1 ranks.append(rank) # update data if rank <= 10: data['Hits@10'] += 1 data['NDCG@10'] += 1 / np.log2(rank + 1) if rank <= 5: data['Hits@5'] += 1 data['NDCG@5'] += 1 / np.log2(rank + 1) if rank == 1: data['Hits@1'] += 1 data['NDCG@1'] += 1 / np.log2(rank + 1) data['MRR'] += 1.0 / rank def do_one_step(self, task, iseval=False, curr_rel='', epoch=None, train_task=None, epoch_count=None): loss, p_score, n_score = 0, 0, 0 if not iseval: task_new = copy.deepcopy(np.array(task[2])) cnt = 0 for user in curr_rel: if user in self.final_negative_items: for index, t in enumerate(task[1][cnt]): if index % 2 == 0: t[2] = self.final_negative_items[user][0] else: t[2] = self.final_negative_items[user][1] cnt += 1 self.optimizer.zero_grad() p_score, n_score = self.MetaTL(task, iseval, curr_rel) y = torch.Tensor([1]).to(self.device) loss = self.MetaTL.loss_func(p_score, n_score, y) loss.backward() self.optimizer.step() # task_new = np.array(task[2]) task_new = np.tile(task_new, reps=(1, self.varset_size, 1)) task_new[:, :, 2] = np.array(itemgetter(*curr_rel)(self.candidate_cur)) data = self.MetaTL.fast_forward(task_new, curr_rel) # prepare score_cand_cur (make all users to have the same number of history scores) temp = min(epoch_count, 4) for index, user in enumerate(curr_rel): if (not user in self.score_cand_cur): self.score_cand_cur[user] = np.array([data[index]]) elif len(self.score_cand_cur[user]) <= temp: self.score_cand_cur[user] = np.concatenate( [self.score_cand_cur[user], np.array([data[index]])], axis=0) self.change_mu(p_score, n_score, epoch_count, curr_rel, task) elif curr_rel != '': p_score, n_score = self.MetaTL(task, iseval, curr_rel) y = torch.Tensor([1]).to(self.device) loss = self.MetaTL.loss_func(p_score, n_score, y) return loss, p_score, n_score def train(self): # initialization best_epoch = 0 best_value = 0 bad_counts = 0 epoch_count = 0 # training by epoch for e in range(self.epoch): if e % 10 == 0: print("epoch:", e) # sample one batch from data_loader train_task, curr_rel = self.train_data_loader.next_batch() # change task negative samples using mu_idx loss, _, _ = self.do_one_step(train_task, iseval=False, curr_rel=curr_rel, epoch=e, train_task=train_task, epoch_count=epoch_count) # after ten epoch epoch if (e % 2500 == 0) and e != 0: # init the two candidate sets for monitoring variance self.candidate_cur = np.random.choice(self.itemnum, [self.user_train_num + 1, self.varset_size]) for i in range(1, self.user_train_num + 1): for j in range(self.varset_size): while self.candidate_cur[i, j] in self.user_train[i]: self.candidate_cur[i, j] = random.randint(1, self.itemnum) self.Mu_idx = {} for i in range(self.user_train_num + 1): Mu_idx_tmp = random.sample(list(range(self.varset_size)), self.S1) self.Mu_idx[i] = Mu_idx_tmp self.score_cand_cur = {} self.score_pos_cur = {} self.final_negative_items = {} # reset epoch_count has many effects on the chnage_mu and one_step and train function epoch_count = 0 # after one epoch elif e % 25 == 0 and e != 0: self.check_complenteness(epoch_count) print("epoch_count:", epoch_count) print("=========================\n\n") epoch_count += 1 # do evaluation on specific epoch if e % self.eval_epoch == 0 and e != 0: loss_num = loss.detach().item() print("Epoch: {}\tLoss: {:.4f}".format(e, loss_num)) print('Epoch {} Validating...'.format(e)) valid_data = self.eval(istest=False, epoch=e) print('Epoch {} Testing...'.format(e)) test_data = self.eval(istest=True, epoch=e) # original = r'/content/results.txt' # target = r'/content/drive/MyDrive/MetaTL/MetaTL_v3/results.txt' # shutil.copyfile(original, target) # print(self.candidate_cur[curr_rel[0]],self.score_cand_cur[curr_rel[0]]) print('Finish') def eval(self, istest=False, epoch=None): torch.backends.cudnn.enabled = False self.MetaTL.eval() self.MetaTL.rel_q_sharing = dict() if istest: data_loader = self.test_data_loader else: data_loader = self.dev_data_loader data_loader.curr_tri_idx = 0 # initial return data of validation data = {'MRR': 0, 'Hits@1': 0, 'Hits@5': 0, 'Hits@10': 0, 'NDCG@1': 0, 'NDCG@5': 0, 'NDCG@10': 0} ranks = [] t = 0 temp = dict() total_loss = 0 while True: # sample all the eval tasks eval_task, curr_rel = data_loader.next_one_on_eval() # at the end of sample tasks, a symbol 'EOT' will return if eval_task == 'EOT': break t += 1 loss, p_score, n_score = self.do_one_step(eval_task, iseval=True, curr_rel=curr_rel) total_loss += loss x = torch.cat([n_score, p_score], 1).squeeze() self.rank_predict(data, x, ranks) # print current temp data dynamically for k in data.keys(): temp[k] = data[k] / t # print overall evaluation result and return it for k in data.keys(): data[k] = round(data[k] / t, 3) print("\n") if istest: print("TEST: \t test_loss: ", total_loss.detach().item()) print( "TEST: \tMRR: {:.3f}\tNDCG@10: {:.3f}\tNDCG@5: {:.3f}\tNDCG@1: {:.3f}\tHits@10: {:.3f}\tHits@5: {:.3f}\tHits@1: {:.3f}\r".format( temp['MRR'], temp['NDCG@10'], temp['NDCG@5'], temp['NDCG@1'], temp['Hits@10'], temp['Hits@5'], temp['Hits@1'])) with open('results2.txt', 'a') as f: f.writelines( "TEST: \tMRR: {:.3f}\tNDCG@10: {:.3f}\tNDCG@5: {:.3f}\tNDCG@1: {:.3f}\tHits@10: {:.3f}\tHits@5: {:.3f}\tHits@1: {:.3f}\r\n\n".format( temp['MRR'], temp['NDCG@10'], temp['NDCG@5'], temp['NDCG@1'], temp['Hits@10'], temp['Hits@5'], temp['Hits@1'])) else: print("VALID: \t validation_loss: ", total_loss.detach().item()) print( "VALID: \tMRR: {:.3f}\tNDCG@10: {:.3f}\tNDCG@5: {:.3f}\tNDCG@1: {:.3f}\tHits@10: {:.3f}\tHits@5: {:.3f}\tHits@1: {:.3f}\r".format( temp['MRR'], temp['NDCG@10'], temp['NDCG@5'], temp['NDCG@1'], temp['Hits@10'], temp['Hits@5'], temp['Hits@1'])) with open("results2.txt", 'a') as f: f.writelines( "VALID: \tMRR: {:.3f}\tNDCG@10: {:.3f}\tNDCG@5: {:.3f}\tNDCG@1: {:.3f}\tHits@10: {:.3f}\tHits@5: {:.3f}\tHits@1: {:.3f}\r".format( temp['MRR'], temp['NDCG@10'], temp['NDCG@5'], temp['NDCG@1'], temp['Hits@10'], temp['Hits@5'], temp['Hits@1'])) print("\n") del total_loss, p_score, n_score gc.collect() self.MetaTL.train() torch.backends.cudnn.enabled = True return data def check_complenteness(self, epoch_count): # un_users = set() for user in list(self.user_train.keys()): if not user in self.score_cand_cur: self.score_cand_cur[user] = np.array([np.zeros(self.varset_size)]) num = epoch_count - len(self.score_cand_cur[user]) + 1 if num > 0 and len(self.score_cand_cur[user]) < 5: # if num!=1 : print("bug happend1") # un_users.add(user) self.score_cand_cur[user] = np.concatenate( [self.score_cand_cur[user], np.array([self.score_cand_cur[user][-1]])], axis=0) if epoch_count >= 4: t = 0 for user in list(self.score_cand_cur.keys()): t = user # self.score_cand_cur[user] = np.delete(self.score_cand_cur[user], 0, 0) self.score_cand_cur[user] = self.score_cand_cur[user][-4:]