Automated testing–verify email content

Scene

There are many scenarios for sending emails in business. The emails are basically sent automatically, and the content of the email is very important. Whether the email is sent or not, and whether the sending time is correct, the workload of each regression test is too large. So consider adding this part to automated testing

Tools

  • python (Install dependency packages pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib)

  • gmail

    To access gmail, first go to console.cloud.google.com to create access credentials, as shown in the figure, download and obtain a json file credentials.json

Implementation code

Among them, DateFormat and get_path are custom methods, which are to get the time and get the file respectively.

from __future__ import print_function
import base64
importos.path
import logging
import time

from utils.get_file_path import get_path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from utils.date_format import DateFormat
from datetime import datetime
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify']


class GmailSearch:

    def __init__(self):
        """
        get token.json.
        """
        self.creds = None
        # The file token.json stores the user's access and refresh tokens, and is
        # created automatically when the authorization flow completes for the first
        # time.
        file_token = get_path('token.json', 'utils')
        if os.path.exists(file_token):
            self.creds = Credentials.from_authorized_user_file(file_token, SCOPES)
        # If there are no (valid) credentials available, let the user log in.
        if not self.creds or not self.creds.valid:
            if self.creds and self.creds.expired and self.creds.refresh_token:
                self.creds.refresh(Request())
            else:
                flow = InstalledAppFlow.from_client_secrets_file(
                    get_path('credentials.json', 'utils'), SCOPES)
                self.creds = flow.run_local_server(port=0)
            # Save the credentials for the next run
            with open(file_token, 'w') as token:
                token.write(self.creds.to_json())

    def search_email(self, **kwargs):
        '''
        search email by keywords (from, sender, subject, before, after, unread, interval)
        :param kwargs:
        By default, unread emails 10 minutes before and after the current time are queried. If not found for the first time, it will be queried once every 10 seconds by default, for a total of three times. If found, it will be marked as read.
        Query subject parameters: subject
        Sender parameters: sender
        Whether it has been read: unread (bool type)
        Query time period parameters: before, after
        Set query interval: interval
        Set the number of queries: count
        :return: the email, if no result is found, return none
        '''
        count = kwargs.get("count") if kwargs.get("count") is not None else 3
        if count == 0:
            return "no email found"
        search_str = ["in:inbox"]
        unread = False if kwargs.get("unread") else True
        if unread:
            search_str.append("is:unread")
        if kwargs.get("attachment"):
            search_str.append("has:attachment")
        if kwargs.get("sender"): # value like [email protected], [email protected]
            search_str.append(f'from:({<!-- -->kwargs.get("sender")})')
        if kwargs.get("to"): # value like [email protected], [email protected]
            search_str.append(f'to:({<!-- -->kwargs.get("to")})')
        if kwargs.get("subject"):
            search_str.append(f'subject:({<!-- -->kwargs.get("subject")})')
        if kwargs.get("before"):
            search_str.append(f'before:{<!-- -->kwargs.get("before")}')
        else: # default value is current + 10.minutes
            search_str.append(f'before:{<!-- -->DateFormat.from_current_minutes(10)}')
        if kwargs.get("after"):
            search_str.append(f'after:{<!-- -->kwargs.get("after")}')
        else: # default value is current - 10.minutes
            search_str.append(f'after:{<!-- -->DateFormat.from_current_minutes(-10)}')
        interval = kwargs.get("interval") if kwargs.get("interval") else 10
        query = " ".join(search_str)
        unread = kwargs.get("unread")
        time.sleep(interval)
        logging.info(datetime.now().strftime("%H:%M:%S"))
        logging.info(str(count - 1))

        try:
            # Call the Gmail API
            service = build('gmail', 'v1', credentials=self.creds)
            results = service.users().messages().list(userId='me', q=query).execute()
            messages = results.get("messages")

            if not messages:
                logging.info('No email found, continue to search')
                kwargs.update({<!-- -->"count": count-1})
                return self.search_email_content(**kwargs)
            # get the last email, mark read, return email body
            # body = []
            last_email = service.users().messages().get(userId='me', id=messages[0]['id']).execute()
            payload = last_email['payload']
            # internalDate = last_email['internalDate']
            # The Body of the message is in Encrypted format. So, we have to decode it.
            # Get the data and decode it with base 64 decoder.
            parts = payload.get('parts')[0]['body'] if payload.get('parts') else payload.get('body')
            data = parts['data']
            data = data.replace("-", " + ").replace("_", "/")
            decoded_data = base64.b64decode(data)
            body = str(decoded_data, 'utf-8')
            # mark read
            if unread:
                logging.info("mark read");
                service.users().messages().modify(userId='me', id=messages[0]['id'],
                                                  body={<!-- -->'removeLabelIds': ['UNREAD']}).execute()
            return body

        except HttpError as error:
            logging.info(f'An error occurred: {<!-- -->error}')


# test=GmailSearch()
# test.search_email(before='2023/10/3', after='2023/9/25', to="[email protected]", unread=True)

Reference documentation: Gmail api documentation