Word2vec (CBOW, Skip-gram) word vector training based on sentencepiece tool and unicode encoding word segmentation, combined with TextCNN model, replacing the initial word vector for text classification tasks
The experiment done by the blogger this time is difficult, but the idea is very good. I think those with poor foundation may not understand my question. In this blog, the blogger will attach my code so that everyone can study it. .
This experiment is divided into the following parts:
1. Sentencepiece word segmentation model training
2. Use the sentencepiece word segmentation model to segment the text, and use the unicode encoding method to segment the text
3. Use CBOW and Skip-gram to train our word vectors
4. Use the trained word vector to replace the initial word vector of the TextCNN model and apply it to the text classification task
Note: This experiment is difficult. If you want to run the blogger’s code, you need to have a good foundation. I hope you can try it easily. If you really want to run my code, you can contact me.
1. Sentencepiece word segmentation model training and word segmentation
Note: word2vecdata.txt is text, each line is a sentence.
The training part code is as follows, and at the same time
import os import sentencepiece as spm # train sentencepiece model from `botchan.txt` and makes `m.model` and `m.vocab` # `m.vocab` is just a reference. not used in the segmentation. spm.SentencePieceTrainer.train('--input="D:\data\1018\word2vecdata.txt --model_prefix=m --vocab_size=8000 --model_type=bpe' ) os.system('pause')
2. Use the sentencepiece word segmentation model to segment the text, and use the unicode encoding method to segment the text
This part is relatively simple
The code to call the trained sentencepiece word segmentation model and perform word segmentation is as follows:
import sentencepiece as spm sp = spm.SentencePieceProcessor() sp.Load("C:\Users\gaoxing\source\repos\wordvec_to_TextCNN\wordvec_to_TextCNN\m.model") #Load Trained model a=sp.EncodeAsPieces(test_text) #Word segmentation
Unicode encoding word segmentation is very simple:
word=[i for i in strz]
3. Use CBOW and Skip-gram to train our word vectors
This part is more difficult
(1) First is the code of CBOW training word vector:
cbow.py
#coding=gbk import os import jieba import torch from torch import nn, optim from torch.autograd import Variable import torch.nn.functional as F import sentencepiece as spm import matplotlib.pyplot as plt loss_list=[] sp = spm.SentencePieceProcessor() sp.Load("C:\Users\gaoxing\source\repos\wordvec_to_TextCNN\wordvec_to_TextCNN\m.model") #Load Trained model path=r"D:\data\1018\word2vecdata.txt" learning_rate = 0.001 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') epochs=100 embedding_dim=100 windows_size=30 use_sentencepiece=0 def read_file(path): fp=open(path,encoding='utf8') text=fp.readlines() fp.close() return text def cut_words(text): dict_index={<!-- -->} index=0 words_list=[] for line in text: line=line.replace('"','') line=line.replace('"','') line=line.replace('"','') line=line.replace('.','') line=line.replace('\\ ','') line=line.replace(' ','') words_cut=line.split(',') for strz in words_cut: if use_sentencepiece: words_l= sp.EncodeAsPieces(strz) else: words_l=[i for i in strz] for word in words_l: if word not in dict_index.keys(): dict_index[word]=index index=index + 1 if len(words_l)>0: words_list.append(words_l) return words_list,dict_index def get_data_corpus(words_list,window_size): data_corpus=[] for words in words_list: if len(words)<2: continue else: for index in range(len(words)): l=[] target=words[index] l.append(target) try: l.append(words[index + 1]) l.append(words[index + 2]) except: pass try: l.append(words[index-1]) l.append(words[index-2]) except: pass data_corpus.append(l) return data_corpus text=read_file(path) words_list,dict_index=cut_words(text) #print(words_list,dict_index) data_corpus=get_data_corpus(words_list,windows_size) #print(data_corpus) class CBOW(nn.Module): def __init__(self, vocab_size, embedding_dim): super(CBOW, self).__init__() self.embeddings = nn.Embedding(vocab_size, embedding_dim) # self.proj = nn.Linear(embedding_dim, vocab_size) self.output = nn.Linear(embedding_dim, vocab_size) def forward(self, inputs): embeds = sum(self.embeddings(inputs)).view(1, -1) # out = F.relu(self.proj(embeds)) out = self.output(embeds) nll_prob = F.log_softmax(out, dim=-1) return nll_prob length=len(dict_index.keys()) print("length",length) data_final=[] for words in data_corpus[0:1000]: target_vector=torch.zeros(length) context_id=[] if len(words)==5: target_vector[dict_index[words[0]]]=1 for i in words[1:]: context_id.append(dict_index[i]) data_final.append([target_vector,context_id]) #print(data_final) model=CBOW(length,embedding_dim).to(device) loss_function=nn.NLLLoss() optimizer=optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9) losses=[] for epoch in range(epochs): total_loss = 0 for data in data_final: target=data[0] context=data[1] # context_vector = make_context_vector(context, word_to_idx).to(device) # Put the context and labels of the training set into the cpu target = torch.tensor(target).type(dtype=torch.long) context=torch.tensor(context) target = target.cuda() context=context.to(device) model.zero_grad() # Clear gradient train_predict = model(context).cuda() # Start forward propagation #print("train_predict",train_predict[0]) #print("target",target) loss = loss_function(train_predict[0], target) loss.backward() # Backpropagation optimizer.step() # Update parameters total_loss + = loss.item() print("loss ",total_loss) loss_list.append(total_loss) losses.append(total_loss) #save torch.save(model,r'D:\data\1018\cbow_emb_uni.pt') #read path=r'D:\data\1018\cbow_emb_uni.pt' model = torch.load(path) print(type(model.state_dict())) # Check the type returned by state_dict, which is an "ordered dictionary OrderedDict" for param_tensor in model.state_dict(): # Dictionary traversal defaults to traversing key, so param_tensor is actually the key value print(param_tensor,'\t',model.state_dict()[param_tensor].size()) embedings=model.state_dict()['output.weight'] fp=open(r'D:\data\1018\cbow_emb_uni.txt','w',encoding='utf8') for word in dict_index.keys(): print(word,dict_index[word]) # print(word,embedings[dict_index[word]]) ls=list(embedings[dict_index[word]]) # print("ls",ls) ls=[float(i) for i in ls] ls=[str(i) for i in ls] # print(ls) ls=' '.join(ls) fp.write(word + ' ' + ls + '\\ ') fp.close() plt.plot(loss_list,label='uni-cbow-loss') plt.legend() plt.title('loss-epoch') plt.show() os.system("pause")
Attached here is our training loss curve chart:
(2) Then skip-gram model word vector training
Before that, text segmentation needs to be done first. CBOW is a two-step process that is connected together. This is text segmentation that is separate. The code is as follows:
data_precess_skip.py
#encoding=gbk import re import jieba import sentencepiece as spm sp = spm.SentencePieceProcessor() sp.Load("C:\Users\gaoxing\source\repos\wordvec_to_TextCNN\wordvec_to_TextCNN\m.model") #Load Trained model stopwords=[] use_sentencepiece=1 def get_stop_words(): file_object = open(r'D:\work\10-5\use_data\stopwords.txt',encoding='utf-8') stop_words = [] for line in file_object.readlines(): line = line[:-1] line = line.strip() stopwords.append(line) return stop_words f1 = open(r'D:\data\1018\word2vecdata.txt', 'r', encoding='utf-8', errors='ignore') f2 = open(r'D:\data\1018\word2vecdata_sp.txt', 'w', encoding='utf-8') line = f1.readline() count=540 while line and count: line = line.strip() # Remove leading and trailing spaces if line.isspace(): # Skip empty lines line = f1.readline() # line = re.findall('[\\一-\\龥] + ', line) # Remove punctuation marks line = "".join(line) count=count-1 #seg_list = [i for i in line] if use_sentencepiece: seg_list= sp.EncodeAsPieces(line) else: seg_list=[i for i in line] outStr = "" for word in seg_list: if word not in stopwords: # Remove stop words outStr + = word outStr + = " " if outStr: # Do not add newline character for empty space outStr = outStr.strip() + '\\ ' f2.writelines(outStr) line = f1.readline() f1.close() f2.close()
Then the model building code:
skip_gram.py
#encoding=gbk import torch import torch.nn as nn import torch.nn.functional as F class SkipGramModel(nn.Module): def __init__(self, emb_size, emb_dimension): super(SkipGramModel, self).__init__() self.emb_size = emb_size self.emb_dimension = emb_dimension self.u_embeddings = nn.Embedding(emb_size, emb_dimension, sparse=True) #Initialize the center word vector matrix self.v_embeddings = nn.Embedding(emb_size, emb_dimension, sparse=True) #Initialize the surrounding word vector matrix self.init_emb() def init_emb(self): initrange = 0.5 / self.emb_dimension self.u_embeddings.weight.data.uniform_(-initrange, initrange) # Initialize the center word vector matrix weight self.v_embeddings.weight.data.uniform_(-0, 0) def forward(self, pos_u, pos_v, neg_v): emb_u = self.u_embeddings(pos_u) # [batch_size * emb_dimension] emb_v = self.v_embeddings(pos_v) # [batch_size * emb_dimension] score = torch.mul(emb_u, emb_v).squeeze() # [batch_size * emb_dimension] score = torch.sum(score, dim=1) # [batch_size * 1] score = F.logsigmoid(score) # [batch_size * 1] neg_emb_v = self.v_embeddings(neg_v) # [batch_size, k, emb_dimension] neg_score = torch.bmm(neg_emb_v, emb_u.unsqueeze(2)).squeeze() # [batch_size, k] neg_score = F.logsigmoid(-1 * neg_score) # [batch_size, k] # L = log sigmoid (Xu.T * θv) + ∑neg(v) [log sigmoid (-Xu.T * θneg(v))] return -1 * (torch.sum(score) + torch.sum(neg_score)) def save_embedding(self, id2word, file_name, use_cuda): # Save the center word and surrounding word vector matrix embedding = self.u_embeddings.weight.cpu().data.numpy() # embedding_u = self.u_embeddings.weight.cpu().data.numpy() # embedding_v = self.v_embeddings.weight.cpu().data.numpy() # embedding = (embedding_u + embedding_v) / 2 fout = open(file_name, 'w', encoding="utf-8") fout.write('%d %d\\ ' % (len(id2word), self.emb_dimension)) for wid, w in id2word.items(): e = embedding[wid] e = ' '.join(map(lambda x: str(x), e)) fout.write('%s %s\\ ' % (w, e))
Negative sampling code:
input_data.py
#encoding=gbk import math import numpy from collections import deque from numpy import random numpy.random.seed(6) classInputData: def __init__(self, file_name, min_count): self.input_file_name = file_name self.get_words(min_count) self.word_pair_catch = deque() # deque is a queue, used to read data self.init_sample_table() # Sampling table print('Word Count: %d' % len(self.word2id)) print("Sentence_Count:", self.sentence_count) print("Sentence_Length:", self.sentence_length) def get_words(self, min_count): # Eliminate low-frequency words and generate mappings from id to word and word to id self.input_file = open(self.input_file_name, encoding="utf-8") self.sentence_length = 0 self.sentence_count = 0 word_frequency = dict() for line in self.input_file: self.sentence_count + = 1 line = line.strip().split(' ') # strip() removes leading and trailing spaces, split(' ') divides words by spaces self.sentence_length + = len(line) for w in line: try: word_frequency[w] + = 1 except: word_frequency[w] = 1 self.word2id = dict() self.id2word = dict() width = 0 self.word_frequency = dict() for w, c in word_frequency.items(): # items() returns the dictionary (key, value) as a list if c < min_count: self.sentence_length -= c continue self.word2id[w] = wid self.id2word[wid] = w self.word_frequency[wid] = c wid + = 1 self.word_count = len(self.word2id) def subsampling(self, corpus, word2id_freq): # Use the subsampling algorithm (subsampling) to process the corpus to enhance the training effect # This discard function determines whether a word will be replaced. This function is random and the result is different each time it is called. # If the frequency of a word is high, then the probability of it being abandoned is high. def discard(word_id): return random.uniform(0, 1) < 1 - math.sqrt( 1e-5 / word2id_freq[word_id] * len(corpus)) corpus = [word for word in corpus if not discard(word)] return corpus def init_sample_table(self): # Obtain the negative sample sampling table self.sample_table = [] sample_table_size = 1e8 # 10*8 pow_frequency = numpy.array(list(self.word_frequency.values())) ** 0.75 # Sampling formula words_pow = sum(pow_frequency) # Sum to obtain the normalized parameter Z ratio = pow_frequency / words_pow count = numpy.round(ratio * sample_table_size) # round to get the number of occurrences of each word for wid, c in enumerate(count): # Put words into the vocabulary according to the number of times estimated by the sampling frequency self.sample_table + = [wid] * int(c) self.sample_table = numpy.array(self.sample_table) numpy.random.shuffle(self.sample_table) #Shuffle the sampling table # self.sample_table = self.subsampling(self.sample_table,self.word_frequency) # Resampling def get_batch_pairs(self, batch_size, window_size): # Get positive samples while len(self.word_pair_catch) < batch_size: # When the number of data in the queue is less than batch_size, add data to the queue sentence = self.input_file.readline() if sentence is None or sentence == '': self.input_file = open(self.input_file_name, encoding="utf-8") sentence = self.input_file.readline() word_ids = [] for word in sentence.strip().split(' '): try: word_ids.append(self.word2id[word]) # Get the center word except: continue for i, u in enumerate(word_ids): # Take different id pairs by window for j, v in enumerate( word_ids[max(i - window_size, 0):i + window_size]): # Get surrounding words assert u < self.word_count assert v < self.word_count if i == j: # Context word = center word skip continue self.word_pair_catch.append((u, v)) # Add the positive sample pair (u, v) to the queue batch_pairs = [] for _ in range(batch_size): # Return the positive sampling pair of batch size batch_pairs.append(self.word_pair_catch.popleft()) # popleft() left out return batch_pairs def get_neg_v_neg_sampling(self, pos_word_pair, count): # Get negative samples neg_v = numpy.random.choice( # Selective random self.sample_table, size=(len(pos_word_pair), count)).tolist() return neg_v def evaluate_pair_count(self, window_size): # Estimate the positive sampling logarithm in the data, used to set the batch return self.sentence_length * (2 * window_size - 1) - ( self.sentence_count - 1) * (1 + window_size) * window_size
skip-gram word vector training code:
#encoding=gbk from input_data import InputData from skip_gram import SkipGramModel from torch.autograd import Variable import torch import torch.optim as optim from tqdm import tqdm from gensim import utils import matplotlib.pyplot as plt loss_list=[] class Word2Vec(utils.SaveLoad): def __init__(self, input_file_name, output_file_name, emb_dimension=100, # word embedding dimension batch_size=50, #Batch processing batch size window_size=50, #Context window iteration=1, initial_lr=0.025, k=5, #Number of negative samples min_count=15): # Set the number of occurrences of low-frequency words self.data = InputData(input_file_name, min_count) self.output_file_name = output_file_name self.emb_size = len(self.data.word2id) #Number of words self.emb_dimension = emb_dimension self.batch_size = batch_size self.window_size = window_size self.iteration = iteration self.initial_lr = initial_lr self.k = k self.skip_gram_model = SkipGramModel(self.emb_size, self.emb_dimension) self.use_cuda = torch.cuda.is_available() if self.use_cuda: self.skip_gram_model.cuda() self.optimizer = optim.SGD(self.skip_gram_model.parameters(), lr=self.initial_lr) def train(self): pair_count = self.data.evaluate_pair_count(self.window_size) # Number of sample pairs batch_count = self.iteration * pair_count / self.batch_size # Number of batches process_bar = tqdm(range(int(batch_count))) # Progress bar for i in process_bar: pos_pairs = self.data.get_batch_pairs(self.batch_size, self.window_size) # Get positive samples neg_v = self.data.get_neg_v_neg_sampling(pos_pairs, self.k) # Get negative samples pos_u = [pair[0] for pair in pos_pairs] pos_v = [pair[1] for pair in pos_pairs] # Pass in parameters for backpropagation pos_u = Variable(torch.LongTensor(pos_u)) pos_v = Variable(torch.LongTensor(pos_v)) neg_v = Variable(torch.LongTensor(neg_v)) if self.use_cuda: pos_u = pos_u.cuda() pos_v = pos_v.cuda() neg_v = neg_v.cuda() self.optimizer.zero_grad() # Initialize 0 gradient loss = self.skip_gram_model.forward(pos_u, pos_v, neg_v) # Forward propagation loss.backward() # Backpropagation self.optimizer.step() # Optimize objective function process_bar.set_description("Loss: %0.8f, lr: %0.6f" % (loss.data.item(), self.optimizer.param_groups[0]['lr'])) loss_list.append(loss.data.item()) if i * self.batch_size % 100000 == 0: # Dynamically update the learning rate lr = self.initial_lr * (1.0 - 1.0 * i / batch_count) for param_group in self.optimizer.param_groups: param_group['lr'] = lr plt.plot(loss_list,label='sp-skg-loss') plt.legend() plt.title('loss-epoch') plt.show() self.skip_gram_model.save_embedding( # Save word vector self.data.id2word, self.output_file_name, self.use_cuda) def save(self, *args, **kwargs): # Save the model super(Word2Vec, self).save(*args, **kwargs) if __name__ == '__main__': w2v = Word2Vec(r"D:\data\1018\word2vecdata_p.txt", r"D:\data\1018\skipgram_e.txt") w2v.train()
Attached here is our training loss curve chart:
4. Use the trained word vector to replace the initial word vector of the TextCNN model and apply it to the text classification task
Before replacing word vectors, we need to load word vectors first. The code is as follows:
#encoding=gbk import torch import torch.nn as nn import torch.nn.functional as F import os def get_dict(): f1 = open(r'D:\data\1018\cbow_emb_sp.txt', 'r', encoding='utf-8', errors='ignore') lines=f1.readlines() f1.close() word_to_em_dict={<!-- -->} for line in lines: #print(line) li=line.split() print(li) data=[ float(i) for i in li[1:]] word_to_em_dict[li[0]]=torch.FloatTensor(data) print(word_to_em_dict) return word_to_em_dict
The last is our TextCNN model training code:
#coding=gbk from cgi import test import os os.environ["CUDA_VISIBLE_DEVICES"] = "2, 3, 4,5" from get_embeding import get_dict import torch import numpy as np import torch.nn as nn import torch.optim as optim import torch.utils.data as Data import torch.nn.functional as F from get_data import get_data dtype = torch.FloatTensor device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 3 words sentences (=sequence_length is 3) import matplotlib.pyplot as plt sentences,labels,setences_test,label_test=get_data() print(sentences,labels) #sentences = ["i love you", "he loves me", "she likes baseball", "i hate you", "sorry for that", "this is awful\ "] #labels = [1, 1, 1, 0, 0, 0] # 1 is good, 0 is not good. num_classes = len(set(labels)) classnum=3 embedding_size = 100 batch_size = 10 sequence_length = 50 epochs=50 word_list = " ".join(sentences).split() word_list2 = " ".join(setences_test).split() vocab = list(set(word_list + word_list2)) word2idx = {<!-- -->w:i for i,w in enumerate(vocab)} idx2word = {<!-- -->i:w for i,w in enumerate(vocab)} vocab_size = len(vocab) def make_data(sentences, labels): inputs = [] for sen in sentences: l=[word2idx[n] for n in sen.split()] if len(l)<sequence_length: length=len(l) for i in range(sequence_length-length): l.append(0) inputs.append(l) else: inputs.append(l[0:sequence_length]) targets = [] for out in labels: targets.append(out) return inputs, targets input_batch, target_batch = make_data(sentences, labels) print(input_batch, target_batch) input_batch= torch.LongTensor(input_batch) target_batch= torch.LongTensor(target_batch) print("*"*100) print(input_batch.size(),target_batch.size()) dataset = Data.TensorDataset(input_batch,target_batch) loader = Data.DataLoader(dataset, batch_size, True) word_to_em_dict=get_dict() class TextCNN(nn.Module): def __init__(self): super(TextCNN, self).__init__() self.W = nn.Embedding(vocab_size, embedding_size) output_channel = 3 self.conv = nn.Sequential(nn.Conv2d(1, output_channel, kernel_size=(10,embedding_size)), # inpu_channel, output_channel, convolution kernel height and width n-gram and embedding_size nn.ReLU(), nn.MaxPool2d((10,1))) self.fc = nn.Linear(12,num_classes) def forward(self, X): ''' X: [batch_size, sequence_length] ''' batch_size = X.shape[0] embedding_X = self.W(X) # [batch_size, sequence_length, embedding_size] # print(X.size(),embedding_X.size()) p1=0 temp_emb=torch.zeros(batch_size, sequence_length, embedding_size) for secequence in X: p2=0 for id in secequence: word=idx2word[int(id)] # print(word) try: emd=word_to_em_dict[word] except: # print(word) emd=torch.zeros(embedding_size) # print(emd) # print(emd.size()) temp_emb[p1][p2]=emd p2=p2 + 1 p1=p1 + 1 embedding_X = temp_emb.unsqueeze(1) # add channel(=1) [batch, channel(=1), sequence_length, embedding_size] conved = self.conv(embedding_X) # [batch_size, output_channel,1,1] flatten = conved.view(batch_size, -1)# [batch_size, output_channel*1*1] output = self.fc(flatten) return output model = TextCNN().to(device) criterion = nn.CrossEntropyLoss().to(device) optimizer = optim.Adam(model.parameters(), lr=1e-3) loss_list=[] #Training for epoch in range(epochs): loss_add=0 count=0 for batch_x, batch_y in loader: batch_x, batch_y = batch_x.to(device), batch_y.to(device) pred = model(batch_x) # print("pred,batch_y",pred,batch_y) loss = criterion(pred, batch_y) loss_add=loss_add + loss count=count + 1 optimizer.zero_grad() loss.backward() optimizer.step() if (epoch + 1) % 5 == 0: print('Epoch:', ' d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss)) loss_list.append(loss_add/count) #test input_batch, target_batch = make_data(setences_test, label_test) input_batch= torch.LongTensor(input_batch) target_batch= torch.LongTensor(target_batch) print("*"*100) dataset = Data.TensorDataset(input_batch,target_batch) loader = Data.DataLoader(dataset, batch_size, True) test_loss = 0 correct = 0 total=0 target_num = torch.zeros((1,classnum)) predict_num = torch.zeros((1,classnum)) acc_num = torch.zeros((1,classnum)) for batch_x, batch_y in loader: batch_x, batch_y = batch_x.to(device), batch_y.to(device) pred = model(batch_x) loss = criterion(pred, batch_y) print('Epoch:', ' d' % (epoch + 1), 'loss =', '{:.6f}'.format(loss)) test_loss + = loss _, predicted = torch.max(pred.data, 1) total + = batch_y.size(0) correct + = predicted.eq(batch_y.data).cpu().sum() pre_mask = torch.zeros(pred.size()).scatter_(1, predicted.cpu().view(-1, 1), 1.) predict_num + = pre_mask.sum(0) tar_mask = torch.zeros(pred.size()).scatter_(1, batch_y.data.cpu().view(-1, 1), 1.) target_num + = tar_mask.sum(0) acc_mask = pre_mask*tar_mask acc_num + = acc_mask.sum(0) recall = acc_num/target_num precision = acc_num/predict_num F1 = 2*recall*precision/(recall + precision) accuracy = acc_num.sum(1)/target_num.sum(1) recall = (recall.numpy()[0]*100).round(3) precision = (precision.numpy()[0]*100).round(3) F1 = (F1.numpy()[0]*100).round(3) accuracy = (accuracy.numpy()[0]*100).round(3) # Print format for easy copying p=sum(precision)/3 r=sum(recall)/3 F=sum(F1)/3 print('recall:',r) print('precision:',p) print('F1:',F) print('accuracy',accuracy) plt.plot(loss_list,label='TextCNN_sp')########## plt.legend() plt.title('loss-epoch') plt.show()
Attached here is our loss curve: