Muduo source code analysis: AsyncLogging asynchronous log class

Introduction

AsyncLogging is muduo’s log. If the program directly writes logs to files, it may be blocked. The muduo front-end has designed two BufferPtrs, namely currentBuffer_ and nextBuffer_, and a vector (buffers_) that stores BufferPtr.

Multiple front-end threads write data to currentBuffer_. When currentBuffer_ is full, put it into buffers_ and notify the back-end thread to read. The front-end thread replaces currentBuffer_ and nextBuffer_ and continues to write currentBuffer_.
The backend also has 2 BufferPtr, newBuffer1 and newBuffer2, and a BufferVector(buffersToWrite). After receiving the front-end notification, the back-end thread uses buffersToWrite and buffers_ to exchange, and uses newBuffer1 and newBuffer2 to return the front-end currentBuffer_ and nextBuffer_, and then writes the log to the file.

Muduo log files only provide writing to local files

Backend thread writing conditions:

  1. After the front-end thread buffer is written, the back-end thread is notified through the condition variable to write.
  2. Timeout, muduo sets the default time to 3 seconds (the third parameter flushInterval_ of the AsyncLogging constructor),

Source code analysis

AsyncLogging.h

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)

#ifndef MUDUO_BASE_ASYNCLOGGING_H
#define MUDUO_BASE_ASYNCLOGGING_H

#include "muduo/base/BlockingQueue.h"
#include "muduo/base/BoundedBlockingQueue.h"
#include "muduo/base/CountDownLatch.h"
#include "muduo/base/Mutex.h"
#include "muduo/base/Thread.h"
#include "muduo/base/LogStream.h"

#include <atomic>
#include <vector>

namespace muduo
{<!-- -->

class AsyncLogging : noncopyable
{<!-- -->
 public:

  AsyncLogging(const string & basename,
               off_t rollSize,
               int flushInterval = 3);

  ~AsyncLogging()
  {<!-- -->
    if (running_)
    {<!-- -->
      stop();
    }
  }
  
  void append(const char* logline, int len);

  void start()
  {<!-- -->
    running_ = true;
    thread_.start();
    latch_.wait(); // Ensure that the backend thread has started running
  }

  void stop() NO_THREAD_SAFETY_ANALYSIS
  {<!-- -->
    running_ = false;
    cond_.notify();
    thread_.join();
  }

 private:

  void threadFunc();

  typedef muduo::detail::FixedBuffer<muduo::detail::kLargeBuffer> Buffer;
  typedef std::vector<std::unique_ptr<Buffer>> BufferVector;
  typedef BufferVector::value_type BufferPtr;

  //Storage timeout variable
  const int flushInterval_;
  //Backend thread start flag
  std::atomic<bool> running_;
  //Log file name
  const string basename_;
  //Reserved log size
  const off_t rollSize_;
  //Call the backend writing thread
  muduo::Thread thread_;
  //This variable ensures that the backend log writing thread is already running
  muduo::CountDownLatch latch_;
  
  //Condition variables and mutex locks are used to ensure thread synchronization between front-end threads and back-end threads.
  muduo::MutexLock mutex_;
  muduo::Condition cond_ GUARDED_BY(mutex_);
    
  //The front-end thread is currently writing to the buffer
  BufferPtr currentBuffer_ GUARDED_BY(mutex_);
  //The next backup buffer of the front-end thread
  BufferPtr nextBuffer_ GUARDED_BY(mutex_);
  //Filled buffer queue with write file
  BufferVector buffers_ GUARDED_BY(mutex_);
};

} // namespace muduo

#endif // MUDUO_BASE_ASYNCLOGGING_H



AsyncLogging.cc

Front-end thread

The front end calls AsyncLogging::append() when generating a log message. In this function, if the remaining space in the current buffer (currentBuff_) is large enough, the log message will be copied (appended) directly to the current buffer. This is the most common situation.
Copying a log message here does not cause much overhead. The rest of the front-end and back-end code is not copied, but a simple pointer exchange. Otherwise, it means that the current buffer is full, send it (move it into) buffers, and try to move (move) another prepared buffer (nextBuffer_) as the current buffer, and then append the log message and notify (wake up) The end starts writing log data.
In the above two cases, there are no time-consuming operations within the critical section, and the running time is constant.
If the front-end writing speed is too fast and both buffers are used up at once, then a new buffer has to be allocated as the current buffer. This is a rare situation.

Backend thread

First prepare two free buffers for exchange in the critical section.
In the critical section, wait for the condition to be triggered. There are two conditions here: one is timeout, and the other is that the front end fills one or more buffers.
Note that this is an unconventional usage of conditionvariable. It does not use a while loop, and the waiting time has an upper limit. When the “condition” is met, first move the current buffer (currentBuffe_) into buffers_, and immediately move the free newBuffer1 to the current buffer.
Note that this entire code is within the critical section, so there will not be any race conditions. Next, buffer_ is exchanged with buffersToWrite. The following code can safely access buffersToWrite outside the critical section and write the log data to the file.
The last thing done in the critical section is to replace nextBuffer with newBuffer2, so that the front end always has a prepared buffer for deployment. nextBuffer_ can reduce the probability of memory allocation in the front-end critical section and shorten the length of the front-end critical section. Note that there are no time-consuming operations in the critical section of the backend, and the running time is constant. The buffers in buffersToWrite will be refilled with newBuffer1 and newBuffer2, so that there will be two free buffers available to replace the current buffer and preliminary buffer of the front end during the next execution.
Finally, these four buffers will all be filled with 0 when the program starts. This can avoid page faults causing unstable performance when the program warms up.

// Use of this source code is governed by a BSD-style license
// that can be found in the License file.
//
// Author: Shuo Chen (chenshuo at chenshuo dot com)
 
#include "muduo/base/AsyncLogging.h"
#include "muduo/base/LogFile.h"
#include "muduo/base/Timestamp.h"
 
#include <stdio.h>
 
using namespace muduo;
 
//Asynchronous log
AsyncLogging::AsyncLogging(const string & amp; basename,
                           off_t rollSize,
                           int flushInterval)
  : flushInterval_(flushInterval),//Backend thread timeout variable, the default is 3s
    running_(false),//Backend thread startup flag
    basename_(basename),//Set the output log file name
    rollSize_(rollSize),//Reserved log size
    thread_(std::bind( & amp;AsyncLogging::threadFunc, this), "Logging"),//The thread that executes the asynchronous logger
    latch_(1),//The function of this variable is to ensure that the back-end thread has entered
    mutex_(),
    cond_(mutex_),
    currentBuffer_(new Buffer), //Current buffer
    nextBuffer_(new Buffer),//prepare buffer
    buffers_()//buffer queue
{<!-- -->
  currentBuffer_->bzero(); //Clear
  nextBuffer_->bzero();//Clear
  buffers_.reserve(16);//Set the buffer queue size to 16
}
 
//All LOG_* will eventually call the append function
void AsyncLogging::append(const char* logline, int len)
{<!-- -->
  muduo::MutexLockGuard lock(mutex_);
  if (currentBuffer_->avail() > len) // If there is still space in the current buffer, add it to the current log
  {<!-- -->
    currentBuffer_->append(logline, len);//Call the append of vector
  }
  else
  {<!-- -->
    //Add the used buffer to the buffer vector
    buffers_.push_back(std::move(currentBuffer_));
 
    if (nextBuffer_) // Reset the current buffer
    {<!-- -->
      currentBuffer_ = std::move(nextBuffer_);
    }
    else
    {<!-- -->
      currentBuffer_.reset(new Buffer); // Rarely happens
      //If the front-end writing speed is too fast and both buffers are used up at once, then a new buffer has to be allocated as the current buffer. This is a rare situation.
    }
    currentBuffer_->append(logline, len);
 
    //Notify the log thread that there is data to write
    cond_.notify();
  }
}
 
void AsyncLogging::threadFunc() // Function called by the thread, mainly used to periodically flush data to the log file
{<!-- -->
  assert(running_ == true);
  latch_.countDown();
  LogFile output(basename_, rollSize_, false);//Open the log file
  BufferPtr newBuffer1(new Buffer); //These two are buffers for background threads
  BufferPtr newBuffer2(new Buffer);
  newBuffer1->bzero();//clear
  newBuffer2->bzero();//clear
  BufferVector buffersToWrite; //Used to swap with the buffers_ of the foreground thread
  buffersToWrite.reserve(16); //Reserve space
  while (running_)
  {<!-- -->
    assert(newBuffer1 & amp; & amp; newBuffer1->length() == 0);
    assert(newBuffer2 & amp; & amp; newBuffer2->length() == 0);
    assert(buffersToWrite.empty());
 
    {<!-- -->
      muduo::MutexLockGuard lock(mutex_);
//If the buffer is empty, it means that there is no data to be written to the file, then wait for the specified time (default three seconds)
      if (buffers_.empty()) // unusual usage!
      {<!-- -->
        cond_.waitForSeconds(flushInterval_);
      }
 
//No matter why cond wakes up, currentBuffer_ must be placed in buffers_.
      //If it wakes up because the time is up, then currentBuffer_ is not full yet, and it must be written to LogFile at this time.
      //If a front buffer is already full, then a front buffer has been placed in buffers_ in the foreground thread
      //Now. At this time, you still need to put currentBuffer_ into buffers_ (note that the front and back placements are different buffers.
      //Because in the foreground thread, currentBuffer_ has been replaced by the buffer pointed to by nextBuffer_)
      buffers_.push_back(std::move(currentBuffer_)); //currentBuffer_ is the current buffer
 
/*---Return a buffer---*/ // Convert the new buffer to the current buffer
      currentBuffer_ = std::move(newBuffer1);
//Exchange buffers_ with new unused buffersToWrite, and write the data in buffers_ to LogFile in the asynchronous thread
      buffersToWrite.swap(buffers_);//Internal pointer exchange, not copying
      if (!nextBuffer_)
      {<!-- -->
        nextBuffer_ = std::move(newBuffer2);/*-----Return the second one if necessary----*/
      }
    }
 
    assert(!buffersToWrite.empty());
 
// If the number of buffers in the buffer list to be written to the file is greater than 25, delete the excess data.
    // Message accumulation
    //The front-end falls into an infinite loop and desperately sends log messages, exceeding the back-end's processing capabilities.
    //This is a typical situation where the production speed exceeds the consumption speed, which will cause data to accumulate in the memory.
    //In severe cases, performance problems may occur (insufficient available memory),
    //Or the program crashes (failed to allocate memory)
    if (buffersToWrite.size() > 25)
    {<!-- -->
      char buf[256];
      snprintf(buf, sizeof buf, "Dropped log messages at %s, %zd larger buffers\\
",
               Timestamp::now().toFormattedString().c_str(),
               buffersToWrite.size()-2);
      fputs(buf, stderr);
      output.append(buf, static_cast<int>(strlen(buf)));
 
// Throw away excess logs to free up memory, leaving only two buffers
      buffersToWrite.erase(buffersToWrite.begin() + 2, buffersToWrite.end());
    }
 
//Write buffersToWrite data to the log
    for (const auto & amp; buffer : buffersToWrite)
    {<!-- -->
      // FIXME: use unbuffered stdio FILE ? or use ::writev ?
      output.append(buffer->data(), buffer->length());
    }
 
//Resize buffersToWrite
    if (buffersToWrite.size() > 2)
    {<!-- -->
      // drop non-bzero-ed buffers, avoid trashing
      buffersToWrite.resize(2);
    }
 
    if (!newBuffer1)
    {<!-- -->
      assert(!buffersToWrite.empty());
// Pop one from buffersToWrite as newBuffer1
      newBuffer1 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer1->reset();//Clean up newBuffer1
    }
 
//The front buffer is returned by newBuffer1 2. Now return the buffersToWrite buffer to the background buffer
    if (!newBuffer2)
    {<!-- -->
      assert(!buffersToWrite.empty());
      newBuffer2 = std::move(buffersToWrite.back());
      buffersToWrite.pop_back();
      newBuffer2->reset();
    }
 
    buffersToWrite.clear();
    output.flush();//Refresh the log file (write)
  }
  output.flush();
}
 

What to do if log messages accumulate

What will happen if the front end falls into an infinite loop and desperately sends log messages, exceeding the processing (output) capacity of the back end? For synchronous logs, this is not a problem, because blocking IO naturally limits the writing speed of the front end and acts as a throttle. But for asynchronous logs, this is a typical problem where the production speed is higher than the consumption speed, which will cause data to accumulate in the memory. In severe cases, it will cause performance problems (insufficient available memory) or program crash (failure to allocate memory).
The method for muduo log library to deal with log accumulation is very simple: directly throw away the redundant log buffer to free up memory. This can prevent the log library itself from causing program failures and is a self-protection measure.