Using Python to implement Naive Bayesian spam mailbox classification

1. Experiment purpose

Through this experiment, the aim is to use the Naive Bayes algorithm to classify spam mailboxes, and to be able to understand and master the following contents:
Understand the basic principles and application scenarios of the Naive Bayes algorithm.
Learn how to preprocess text data, including removing punctuation, converting to lowercase letters, word segmentation, and more.
Understand the concept of feature extraction and master common feature extraction methods, such as bag-of-words model and TF-IDF.
Learn to implement the Naive Bayes classifier using machine learning libraries in Python such as scikit-learn.
Learn how to evaluate the performance of a classifier, including metrics such as precision, recall, and F1 value.
Practice using the trained classifier to classify and predict new unknown emails.
By completing this experiment, you will be able to apply the Naive Bayes algorithm to solve text classification problems, especially spam mailbox classification. This technology has a wide range of applications in real life and can help us automatically filter spam and improve the efficiency of work and life. In addition, through practice, you will also deepen your understanding and application capabilities of machine learning algorithms and text processing technologies, laying the foundation for further exploration of more complex machine learning problems.

2. Equipment and environment

Jupyter notebook
Python=3.9

3. Experimental principles

4. Experiment content

Step 1: Prepare the dataset
First, we need to prepare a dataset for training and testing the Naive Bayes classifier. This dataset should contain email text labeled “Spam” and “Not Spam”, along with their corresponding classifications.
Step 2: Data preprocessing
Before training and testing, we need to preprocess the data to convert it into a format suitable for use by the Naive Bayes algorithm. This includes converting text data into digital format, removing stop words, performing stemming, etc.
Step 3: Train the model
Using the prepared training data set, we can train the Naive Bayes classifier. The training process involves calculating the prior probabilities of “spam” and “not spam”, as well as the conditional probability of each word appearing in each category.
Step 4: Test the model
The test set is an independent data set used to evaluate model performance. We will test the trained Naive Bayes classifier on the test set and calculate its accuracy, recall, F1 score and other indicators to evaluate its performance.
Step 5: Model Application
After completing training and testing, we can apply the trained Naive Bayes classifier to new unknown email text and classify it as “spam” or “not spam” based on its content.

5. Analysis of experimental results

A spam detector was constructed using the Naive Bayes algorithm, trained on the training set and predicted on the test set, and finally calculated the prediction accuracy. As can be seen from the figure below, the accuracy is 0.98, which means that the effect of the garbage detector is still very good.

Complete code and comments

See the resource for the data set~

#Packages that need to be imported
import os
import re
import string
import math
#Data reading

DATA_DIR = r'D:\D\Download\360 Safe Browser Download\enron Email dataset\enron'
target_names = ['ham', 'spam']
def get_data(DATA_DIR):
    subfolders = ['enron%d' % i for i in range(1,7)]
    data = []
    target = []
    for subfolder in subfolders:
        #spam
        spam_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'spam'))
        for spam_file in spam_files:
            with open(os.path.join(DATA_DIR, subfolder, 'spam', spam_file), encoding="latin-1") as f:
                data.append(f.read())
                target.append(1)
        #ham
        ham_files = os.listdir(os.path.join(DATA_DIR, subfolder, 'ham'))
        for ham_file in ham_files:
            with open(os.path.join(DATA_DIR, subfolder, 'ham', ham_file), encoding="latin-1") as f:
                data.append(f.read())
                target.append(0)
    return data, target
 
X, y = get_data(DATA_DIR)#Read data
#We read in all email contents and labels, where the email content is stored in data and the labels are stored in target. "1" indicates spam and "0" indicates normal email.

 #Define a class to preprocess data
class SpamDetector_1(object):
    """Implementation of Naive Bayes for binary classification"""
    #clear spaces
    def clean(self, s):
        translator = str.maketrans("", "", string.punctuation)
        return s.translate(translator)
    #separate each word
    def tokenize(self, text):
        text = self.clean(text).lower()
        return re.split("\W + ", text)
    #Count the number of times a word appears
    def get_word_counts(self, words):
        word_counts = {<!-- -->}
        for word in words:
            word_counts[word] = word_counts.get(word, 0.0) + 1.0
        return word_counts

 #Before we start the actual algorithm, we need to do three things: calculate the (logarithmic) class prior, i.e. calculate P (spam) and P (normal email); vocabulary (i.e. the occurrences in normal emails and spam all words, to facilitate Laplacian smoothing); the word frequency of spam and non-spam messages, that is, the number of times a given word appears in spam and non-spam messages.
class SpamDetector_2(SpamDetector_1):
    # X:data,Y:target tag (spam or normal email)
    def fit(self, X, Y):
        self.num_messages = {<!-- -->}
        self.log_class_priors = {<!-- -->}
        self.word_counts = {<!-- -->}
        # Create a collection to store all occurrences of words
        self.vocab = set()
        # Count the number of spam and ham emails
        self.num_messages['spam'] = sum(1 for label in Y if label == 1)
        self.num_messages['ham'] = sum(1 for label in Y if label == 0)
 
        # Calculate the prior probability, that is, the proportion of spam emails and normal emails among all emails
        self.log_class_priors['spam'] = math.log(
            self.num_messages['spam'] / (self.num_messages['spam'] + self.num_messages['ham']))
        self.log_class_priors['ham'] = math.log(
            self.num_messages['ham'] / (self.num_messages['spam'] + self.num_messages['ham']))
 
        self.word_counts['spam'] = {<!-- -->}
        self.word_counts['ham'] = {<!-- -->}
 
        for x, y in zip(X, Y):
            c = 'spam' if y == 1 else 'ham'
            # Build a dictionary to store the words and their number in a single email
            counts = self.get_word_counts(self.tokenize(x))
            for word, count in counts.items():
                if word not in self.vocab:
                    self.vocab.add(word)#Make sure self.vocab contains the words in all emails
                # The following statement is to calculate the word frequency of spam and non-spam emails, that is, the number of times a given word appears in spam and non-spam emails.
                # c is 0 or 1, the spam tag
                if word not in self.word_counts[c]:
                    self.word_counts[c][word] = 0.0
                self.word_counts[c][word] + = count
 #You can use the following statement to debug to determine whether the operation is correct. If correct, log_class_priors of spam should be -0.6776, and log_class_priors of ham should be -0.7089.
#We selected the emails after the 100th as the training set, and the first 100 emails as the test set.
MNB = SpamDetector_2()
# Select the emails after the 100th as the training set, and the first 100 emails as the test set
MNB.fit(X[100:], y[100:])

#print("log_class_priors of spam", MNB.log_class_priors['spam']) #-0.6776
#print("log_class_priors of ham", MNB.log_class_priors['ham']) #-0.7089
#Define a class spamDetector to test the test set. The main idea is to compare lnP (spam) + and lnP (normal) + to determine whether it is spam or normal mail.
class SpamDetector(SpamDetector_2):
    def predict(self, X):
        result = []
        flag_1 = 0
        # Iterate through all test sets
        for x in X:
            counts = self.get_word_counts(self.tokenize(x)) # Generate a dictionary that can record words and the number of times the word appears
            spam_score = 0
            ham_score = 0
            flag_2 = 0
            for word, _ in counts.items():
                if word not in self.vocab: continue
 
                # Calculate P (content | spam) and P (content | normal mail) below, and all words must be Laplace smoothed
                else:
                    # This word exists in the training set of normal emails and the training set of spam emails
                    if word in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log(
                            (self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log(
                            (self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
                    # This word exists in the training set of spam emails, but does not exist in the training set of normal emails.
                    if word in self.word_counts['spam'].keys() and word not in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log(
                            (self.word_counts['spam'][word] + 1) / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log( 1 / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
                    # This word exists in the training set of normal emails, but does not exist in the training set of spam emails
                    if word not in self.word_counts['spam'].keys() and word in self.word_counts['ham'].keys():
                        log_w_given_spam = math.log( 1 / (sum(self.word_counts['spam'].values()) + len(self.vocab)))
                        log_w_given_ham = math.log(
                            (self.word_counts['ham'][word] + 1) / (sum(self.word_counts['ham'].values()) + len(
                                self.vocab)))
 
                # Add the calculated P(content|spam) and P(content|normal mail)
                spam_score + = log_w_given_spam
                ham_score + = log_w_given_ham
 
                flag_2 + = 1
 
                # Finally, the priors must be added, namely P (spam) and P (normal mail)
                spam_score + = self.log_class_priors['spam']
                ham_score + = self.log_class_priors['ham']
 
            # Finally, make a prediction. If spam_score > ham_score, the flag is 1, which is spam.
            if spam_score > ham_score:
                result.append(1)
            else:
                result.append(0)
 
            flag_1 + = 1
 
        return result
 
MNB = SpamDetector()
MNB.fit(X[100:], y[100:])
pred = MNB.predict(X[:100])
true = y[:100]
 
accuracy=0
for i in range(100):
    if pred[i] == true[i]:
        accuracy + = 1
print(accuracy) # 0.98