Asynchronously send logs to remote server in Python

background

The most common way to use logs in Python is to output logs in the console and files. The logging module also provides corresponding classes that are very convenient to use. However, sometimes we may have some needs, such as How to achieve this requirement by sending logs to the remote end or directly writing them to the database?

StreamHandler and FileHandler

# -*- coding: utf-8 -*-
"""
--------------------------------------------------
 File Name: logger
 Description:
 Author: yangyanxing
 date: 2020/9/23
--------------------------------------------------
"""
import logging
importsys
import os
#Initialize logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# Set log format
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# Add cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
#Add file handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# Add cmd and file handler to logger
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("The weather is good today")

First initialize a logger and set its log level to DEBUG, then initialize cmd_handler and file_handler, and finally add them to the logger. Run the script and it will be printed in cmd.

[2020-09-23 10:45:56] [DEBUG] The weather is good today

AddHTTPHandler

# Add an httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("The weather is good today")

As a result, we received a lot of information on the server side

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9 \x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b ' & amp;lt;module & amp;gt;'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9 \x94\x99'],
'asctime': [b '2020-09-23 11:17:34']
}

It can be said that there is a lot of information, but it is not what we want. We just want something similar to

[2020-09-23 10:45:56][DEBUG] The weather is good today

logging.handlers.HTTPHandler simply sends all the log information to the server. As for how the server organizes the content, it is done by the server. So we can have two methods. One is to change the server code. According to the passed The log information is used to reorganize the log content. The second is that we rewrite a class so that it can reformat the log content and send it to the server when sending.

We use the second method because this method is more flexible. The server is only used for recording, and the client should decide what content to send.

We need to redefine a class. We can refer to the logging.handlers.HTTPHandler class and rewrite the httpHandler class.

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   Rewrite the emit method, mainly to add the baseParam during initialization.
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = ' &'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

This line of code indicates that the corresponding content will be returned according to the format set by the log object.

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\ x94\x99']}

Just convert the bytes type and get it

[2020-09-23 11:43:50] [DEBUG] The weather is good today

Send remote logs asynchronously

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

At this point we print the log above

logger.debug("The weather is good today")
logger.debug("It's sunny")

The resulting output is

[2020-09-23 11:47:33] [DEBUG] The weather is good today
[2020-09-23 11:47:38] [DEBUG] It’s sunny and sunny

So now comes the problem. It was originally just a log, but now it has become a burden that drags down the entire script, so we need to handle remote log writing asynchronously.

1

Use multithreading

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = ' &'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

2

Use thread pool processing

There are ThreadPoolExecutor and ProcessPoolExecutor classes in python’s concurrent.futures, which are thread pools and process pools. They define several threads during initialization, and then let these threads handle the corresponding functions, so that new threads do not need to be created every time.

exector = ThreadPoolExecutor(max_workers=1) # Initialize a thread pool with only one thread
exector.submit(fn, args, kwargs) # Submit the function to the thread pool

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = ' &'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

Use the asynchronous aiohttp library to send requests

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = ' &'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

At this point the code execution crashes

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

The reason is that because the async with session.post function is used in the emit method, it needs to be executed in a function modified with async, so the emit function is modified and modified with async. The emit function here becomes an asynchronous function and returns It is a coroutine object. To execute the coroutine object, you need to use await, but await emit() is not called anywhere in the script, so the crash information shows that coroutine ‘CustomHandler.emit’ was never awaited.

async def main():
  await logger.debug("The weather is good today")
  await logger.debug("It's sunny")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Execution still reports an error

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

There seems to be no solution. I want to use an asynchronous library to send, but there is no place where await can be called.

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("End of execution, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

Pay attention to the above code. We do not use await somewhere to execute the coroutine. Instead, we register the coroutine to an event loop object and then call the run_forever() function of the loop, thereby making the loop The coroutine object can be executed normally.

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
Execution ends, task n is 0

It can be seen that the task created using the event loop object can be executed after the loop executes run_forever(). If the loop.run_forever() function is not executed, the coroutine registered on it will not be executed.

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # Use aiohttp to encapsulate the sending data function
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = ' &'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
#Add an httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("The weather is good today")
logger.debug("It's sunny")
loop.run_forever()

But one thing to note about this method is that loop.run_forever() will always block, so there needs to be a place to call the loop.stop() method. It can be registered in the callback of a task.

Finally, I would like to thank everyone who read my article carefully. Reciprocity is always necessary. Although it is not a very valuable thing, if you can use it, you can take it directly:

This information should be the most comprehensive and complete preparation warehouse for [software testing] friends. This warehouse has also accompanied tens of thousands of test engineers through the most difficult journey. I hope it can also help you!

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Python entry skill treeHomepageOverview 387040 people are learning the system