Scene
There are many scenarios for sending emails in business. The emails are basically sent automatically, and the content of the email is very important. Whether the email is sent or not, and whether the sending time is correct, the workload of each regression test is too large. So consider adding this part to automated testing
Tools
-
python (Install dependency packages
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
) -
gmail
To access gmail, first go to console.cloud.google.com to create access credentials, as shown in the figure, download and obtain a json file credentials.json
Implementation code
Among them, DateFormat and get_path are custom methods, which are to get the time and get the file respectively.
from __future__ import print_function import base64 importos.path import logging import time from utils.get_file_path import get_path from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError from utils.date_format import DateFormat from datetime import datetime # If modifying these scopes, delete the file token.json. SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify'] class GmailSearch: def __init__(self): """ get token.json. """ self.creds = None # The file token.json stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. file_token = get_path('token.json', 'utils') if os.path.exists(file_token): self.creds = Credentials.from_authorized_user_file(file_token, SCOPES) # If there are no (valid) credentials available, let the user log in. if not self.creds or not self.creds.valid: if self.creds and self.creds.expired and self.creds.refresh_token: self.creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( get_path('credentials.json', 'utils'), SCOPES) self.creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(file_token, 'w') as token: token.write(self.creds.to_json()) def search_email(self, **kwargs): ''' search email by keywords (from, sender, subject, before, after, unread, interval) :param kwargs: By default, unread emails 10 minutes before and after the current time are queried. If not found for the first time, it will be queried once every 10 seconds by default, for a total of three times. If found, it will be marked as read. Query subject parameters: subject Sender parameters: sender Whether it has been read: unread (bool type) Query time period parameters: before, after Set query interval: interval Set the number of queries: count :return: the email, if no result is found, return none ''' count = kwargs.get("count") if kwargs.get("count") is not None else 3 if count == 0: return "no email found" search_str = ["in:inbox"] unread = False if kwargs.get("unread") else True if unread: search_str.append("is:unread") if kwargs.get("attachment"): search_str.append("has:attachment") if kwargs.get("sender"): # value like [email protected], [email protected] search_str.append(f'from:({<!-- -->kwargs.get("sender")})') if kwargs.get("to"): # value like [email protected], [email protected] search_str.append(f'to:({<!-- -->kwargs.get("to")})') if kwargs.get("subject"): search_str.append(f'subject:({<!-- -->kwargs.get("subject")})') if kwargs.get("before"): search_str.append(f'before:{<!-- -->kwargs.get("before")}') else: # default value is current + 10.minutes search_str.append(f'before:{<!-- -->DateFormat.from_current_minutes(10)}') if kwargs.get("after"): search_str.append(f'after:{<!-- -->kwargs.get("after")}') else: # default value is current - 10.minutes search_str.append(f'after:{<!-- -->DateFormat.from_current_minutes(-10)}') interval = kwargs.get("interval") if kwargs.get("interval") else 10 query = " ".join(search_str) unread = kwargs.get("unread") time.sleep(interval) logging.info(datetime.now().strftime("%H:%M:%S")) logging.info(str(count - 1)) try: # Call the Gmail API service = build('gmail', 'v1', credentials=self.creds) results = service.users().messages().list(userId='me', q=query).execute() messages = results.get("messages") if not messages: logging.info('No email found, continue to search') kwargs.update({<!-- -->"count": count-1}) return self.search_email_content(**kwargs) # get the last email, mark read, return email body # body = [] last_email = service.users().messages().get(userId='me', id=messages[0]['id']).execute() payload = last_email['payload'] # internalDate = last_email['internalDate'] # The Body of the message is in Encrypted format. So, we have to decode it. # Get the data and decode it with base 64 decoder. parts = payload.get('parts')[0]['body'] if payload.get('parts') else payload.get('body') data = parts['data'] data = data.replace("-", " + ").replace("_", "/") decoded_data = base64.b64decode(data) body = str(decoded_data, 'utf-8') # mark read if unread: logging.info("mark read"); service.users().messages().modify(userId='me', id=messages[0]['id'], body={<!-- -->'removeLabelIds': ['UNREAD']}).execute() return body except HttpError as error: logging.info(f'An error occurred: {<!-- -->error}') # test=GmailSearch() # test.search_email(before='2023/10/3', after='2023/9/25', to="[email protected]", unread=True)
Reference documentation: Gmail api documentation