Achieving a rumor detection system based on Transformer
In Transformer, in addition to word Embedding, positional Embedding also needs to be used to indicate the position where the word appears in the sentence. Because Transformer does not use the structure of RNN, but uses global information , it cannot use the order information of words, which is very important for NLP. So Transformer uses position Embedding to save the relative or absolute position of words in the sequence.
For our classification task, we only need the encoder in Transformer, the decoder is not needed, the encoder in Transformer can be regarded as a semantic extractor, we input the entire sentence into the encoder and then go through the attention mechanism And MLP will form an encoding vector for each word, and each encoding vector takes into account the influence of different words on it, which uses the attention mechanism, so we can regard the final output of the entire Encoder as the encoding vector of the entire sentence , which contains the semantic information of the entire sentence, and then we can feed these vectors into the fully connected network for classification.
Since word-length embedding vectors will eventually be formed, that is, a vector will be formed at each time step, but we want to feed it to the fully connected network and need to process it:
-
Take a vector of labeled positions arbitrarily, since each word takes into account the contribution of all positions to it
-
Take the average of the encoded vectors generated by all words, then form an encoded vector and feed it to the output layer
-
Concatenate the encoding vectors of all words to form a long encoding vector and then feed it to the output layer, but this will make the vector dimension particularly high, which is the sequence length * embedding dimension
This project uses the mean vector
Location code:
class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=128): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len,d_model) position = torch.arange(0,max_len).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model) ) # Calculate PE(pos, 2i) pe[:, 0::2] = torch. sin(position * div_term) # Calculate PE(pos, 2i + 1) pe[:, 1::2] = torch.cos(position * div_term) # For the convenience of calculation, a batch is produced in the outermost unsqueeze pe = pe. unsqueeze(0) # If a parameter does not participate in gradient descent, but you want to save the model, save it # At this time, register_buffer can be used self. register_buffer("pe", pe) def forward(self, x): # Add x and positional encoding. x = x + self.pe[:, :x.size(1)].requires_grad_(False) return self. dropout(x)
pe = torch.arange(1,51).reshape(5,10) print(pe) print(pe[:,0::2]) print(pe[:,1::2]) exit() ### tensor([[ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 19, 20], [21, 22, 23, 24, 25, 26, 27, 28, 29, 30], [31, 32, 33, 34, 35, 36, 37, 38, 39, 40], [41, 42, 43, 44, 45, 46, 47, 48, 49, 50]]) tensor([[ 1, 3, 5, 7, 9], [11, 13, 15, 17, 19], [21, 23, 25, 27, 29], [31, 33, 35, 37, 39], [41, 43, 45, 47, 49]]) tensor([[ 2, 4, 6, 8, 10], [12, 14, 16, 18, 20], [22, 24, 26, 28, 30], [32, 34, 36, 38, 40], [42, 44, 46, 48, 50]])
Only use encoder with 6 layers of encoderlayer
class Transformer(nn.Module): def __init__(self,vocab_size, embedding_dim, num_class, feedforward_dim=256, num_head=2, num_layers=3, dropout=0.1, max_len=128): super().__init__() self.embedding = nn.Embedding(vocab_size,embedding_dim) self.positional_encoding = PositionalEncoding(embedding_dim,dropout,max_len) self.encoder_layer = nn.TransformerEncoderLayer(embedding_dim,num_head,feedforward_dim,dropout) self.transformer = nn.TransformerEncoder(self.encoder_layer,num_layers) self.fc = nn.Linear(embedding_dim,num_class) def forward(self,x): # The input data dimension is [batch, sequence length], it needs to be exchanged because the input dimension of transformer is [sequence length, batch, embedding vector dimension] x = x.transpose(0, 1) # Perform word embedding on the input data, and the dimension of the obtained data is [sequence length, batch, embedding vector dimension] x = self. embedding(x) # The dimension is [sequence length, batch, embedding vector dimension] x = self. positional_encoding(x) # The dimension is [sequence length, batch, embedding vector dimension] x = self. transformer(x) x = x.mean(axis=0) # Take the mean value of the output vector of each word, or you can take a mark output result at will, and the dimension is [batch, embedding vector dimension] # Classify, the dimension is [batch, number of categories] x = self.fc(x) return x
Full code:
import pickle import numpy as np import pandas as pd import torch import math import torch.nn as nn from tensorflow.keras.preprocessing.sequence import pad_sequences from sklearn.model_selection import train_test_split from torch.utils.data import TensorDataset from torch import optim from torchnet import meter from tqdm import tqdm input_shape = 180 def load_data(file_path, input_shape=20): df = pd.read_csv(file_path,sep='\t') labels =list(df['label'].unique()) #unique has no duplicates [0,1] vocabulary = list(df['text']. unique()) string='' for word in vocabulary: string + = word vocabulary=set(string) word_dictionary = {<!-- -->word:i + 1 for i,word in enumerate(vocabulary)} with open('word_dict.pk', 'wb') as f: pickle. dump(word_dictionary, f) # idx2word maps indices to words inverse_word_dictionary = {<!-- -->i + 1: word for i, word in enumerate(vocabulary)} label_dictionary = {<!-- -->label: i for i, label in enumerate(labels)} with open('label_dict.pk', 'wb') as f: pickle. dump(label_dictionary, f) # idx2label maps 0 and 1 to front and back output_dictionary = {<!-- -->i: labels for i, labels in enumerate(labels)} # The number of all words in the training data vocab_size = len(word_dictionary.keys()) # vocabulary size # Label categories, respectively front and back label_size = len(label_dictionary.keys()) # number of label categories # Sequence filling, filling according to input_shape, if the length is insufficient, fill it with 0 # Map a sentence to the corresponding index [0,24,63...] x = [[word_dictionary[word] for word in sent] for sent in df['text']] # If the length is not enough for input_shape, fill it with 0 x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0) #post adds 0 at the end # form labels 0 and 1 y = [[label_dictionary[sent]] for sent in df['label']] # y = [np_utils.to_categorical(label, num_classes=label_size) for label in y] y = np.array(y) return x, y, output_dictionary, vocab_size, label_size, inverse_word_dictionary class PositionalEncoding(nn.Module): def __init__(self, d_model, dropout=0.1, max_len=128): super(PositionalEncoding, self).__init__() self.dropout = nn.Dropout(p=dropout) pe = torch.zeros(max_len,d_model) position = torch.arange(0,max_len).unsqueeze(1) div_term = torch.exp( torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model) ) # Calculate PE(pos, 2i) pe[:, 0::2] = torch. sin(position * div_term) # Calculate PE(pos, 2i + 1) pe[:, 1::2] = torch.cos(position * div_term) # For the convenience of calculation, a batch is produced in the outermost unsqueeze pe = pe. unsqueeze(0) # If a parameter does not participate in gradient descent, but you want to save the model, save it # At this time, register_buffer can be used self. register_buffer("pe", pe) def forward(self, x): # Add x and positional encoding. x = x + self.pe[:, :x.size(1)].requires_grad_(False) return self. dropout(x) class Transformer(nn.Module): def __init__(self,vocab_size, embedding_dim, num_class, feedforward_dim=256, num_head=2, num_layers=3, dropout=0.1, max_len=128): super().__init__() self.embedding = nn.Embedding(vocab_size,embedding_dim) self.positional_encoding = PositionalEncoding(embedding_dim,dropout,max_len) self.encoder_layer = nn.TransformerEncoderLayer(embedding_dim,num_head,feedforward_dim,dropout) self.transformer = nn.TransformerEncoder(self.encoder_layer,num_layers) self.fc = nn.Linear(embedding_dim,num_class) def forward(self,x): # The input data dimension is [batch, sequence length], it needs to be exchanged because the input dimension of transformer is [sequence length, batch, embedding vector dimension] x = x.transpose(0, 1) # Perform word embedding on the input data, and the dimension of the obtained data is [sequence length, batch, embedding vector dimension] x = self. embedding(x) # The dimension is [sequence length, batch, embedding vector dimension] x = self. positional_encoding(x) # The dimension is [sequence length, batch, embedding vector dimension] x = self. transformer(x) # Take the mean value of the output vector of each word, or you can take a mark output result at will, and the dimension is [batch, embedding vector dimension] x = x.mean(axis=0) # Classify, the dimension is [batch, number of categories] x = self.fc(x) return x x_train, y_train, output_dictionary_train, vocab_size_train, label_size, inverse_word_dictionary_train = load_data("./data/train.tsv", input_shape) x_test, y_test, output_dictionary_test, vocab_size_test, label_size, inverse_word_dictionary_test = load_data("./data/test.tsv", input_shape) idx = 0 word_dictionary = {<!-- -->} for k,v in inverse_word_dictionary_train.items(): word_dictionary[idx] = v idx + =1 for k,v in inverse_word_dictionary_test.items(): word_dictionary[idx] = v idx + =1 x_train = torch.from_numpy(x_train).to(torch.int32) y_train = torch.from_numpy(y_train).to(torch.float32) x_test = torch.from_numpy(x_test).to(torch.int32) y_test = torch.from_numpy(y_test).to(torch.float32) train_data = TensorDataset(x_train, y_train) test_data = TensorDataset(x_test, y_test) batch_size = 32 train_loader = torch.utils.data.DataLoader(train_data, batch_size, true) test_loader = torch.utils.data.DataLoader(test_data, batch_size, False) embedding_dim = 20 # The size of the embedding vector formed by each word output_dim = 2 model = Transformer(len(word_dictionary), embedding_dim, output_dim) optimizer = optim.Adam(model.parameters(), lr=0.003) # optimizer criterion = nn.CrossEntropyLoss() # multi-classification loss function # model.to(device) loss_meter = meter.AverageValueMeter() best_acc = 0 # save the best accuracy best_model = None # Save the model parameters corresponding to the best accuracy epochs=1 for epoch in range(epochs): model.train() # Turn on the training mode epoch_acc = 0 # The accuracy of each epoch epoch_acc_count = 0 # The number of samples for each epoch training train_count = 0 # It is used to calculate the total number of samples, which is convenient for accuracy loss_meter.reset() train_bar = tqdm(train_loader) # form a progress bar for data in train_bar: x_train, y_train = data # Unpack X and Y in the iterator x_input = x_train.long().contiguous() # x_input = x_input.to(device) optimizer. zero_grad() # Form the prediction result output_ = model(x_input) # calculate loss loss = criterion(output_, y_train. long(). view(-1)) loss. backward() optimizer. step() loss_meter. add(loss. item()) # Calculate the correct number of each epoch epoch_acc_count + = (output_.argmax(axis=1) == y_train.view(-1)).sum() train_count + = len(x_train) # The accuracy rate corresponding to each epoch epoch_acc = epoch_acc_count / train_count # print information print("【EPOCH: 】%s" % str(epoch + 1)) print("The training loss is %s" % (str(loss_meter.mean))) print("The training accuracy is %s" % (str(epoch_acc.item() * 100)[:5]) + '%') # Save the model and related information if epoch_acc > best_acc: best_acc = epoch_acc best_model = model. state_dict() # Save the optimal model parameters at the end of training if epoch == epochs - 1: # save the model torch.save(best_model, './best_model.pkl') word2idx = {<!-- -->} for k,v in word_dictionary.items(): word2idx[v] = k label_dict = {<!-- -->0:"Not a rumor", 1:"Rumor"} try: input_shape = 180 # The sequence length is the time step size, which is the number of words in each sentence here # sent = "The TV has just been installed, to be honest, the picture quality is not very good, very poor!" # words for testing sent = "Changchun in the early morning, the lost child was found, and the stolen car was also found, but the car thief did not find it. It seems that the 50th anniversary of learning from Comrade Lei Feng is still an effective day.\ " # Convert the corresponding word into the corresponding serial number x = [[word2idx[word] for word in sent]] # If the length is less than 180, fill with 0 x = pad_sequences(maxlen=input_shape, sequences=x, padding='post', value=0) x = torch.from_numpy(x) # load the model model_path = './best_model.pkl' model = Transformer(len(word_dictionary), embedding_dim, output_dim) model.load_state_dict(torch.load(model_path, 'cpu')) # Model prediction, pay attention to the first input_shape of the input data, which is 180 y_pred = model(x. long()) print('Input statement: %s' % sent) print('Rumor detection result: %s' % label_dict[y_pred.argmax().item()]) except KeyError as err: print("There are Chinese characters in the sentence you entered that are not in the vocabulary, please re-enter!") print("The word not in the vocabulary is: %s." % err)