How to ensure data consistency in a distributed system?

Introduction

Today, when distributed systems and microservice architecture are popular, it has become normal for services to fail to call each other. How to handle exceptions and how to ensure data consistency has become an unavoidable problem in the microservice design process. In different business scenarios, the solutions will be different. Common methods are:

  1. Blocking retry;

  2. 2PC, 3PC traditional affairs;

  3. Use queues and background asynchronous processing;

  4. TCC Compensation Matters;

  5. Local message table (async guaranteed);

  6. MQ transactions.

This article focuses on several other items. There is already a lot of information online about 2PC and 3PC traditional affairs, so I won’t repeat them here.

Blocking retry

In microservice architecture, blocking retry is a common method. Pseudocode example:

m := db.Insert(sql)

err := request(B-Service,m)

func request(url string,body interface{}){
  for i:=0; i<3; i + + {
    result, err = request.POST(url,body)
    if err == nil {
        break
    }else {
      log.Print()
    }
  }
}

As above, when the API request for service B fails, up to three retries are initiated. If it still fails three times, print the log and continue execution or throw an error to the upper level. This approach will bring about the following problems

  1. The call to service B is successful, but due to network timeout, the current service thinks it has failed and continues to try again. In this way, service B will generate two pieces of the same data.

  2. The call to service B failed. Because service B was unavailable, it still failed after three retries. A record inserted into the DB by the current service in the previous code became dirty data.

  3. Retrying will increase the upstream delay for this call. If the downstream load is heavy, retrying will amplify the pressure on the downstream service.

The first problem: solved by making the API of service B support idempotence.

The second question: You can correct the data through background timing steps, but this is not a good method.

The third question: This is an essential sacrifice to improve consistency and availability through blocking retry.

Blocking retry is suitable for scenarios where the business is not sensitive to consistency requirements. If there are requirements for data consistency, additional mechanisms must be introduced to solve it.

Asynchronous Queue

In the process of solution evolution, introducing queues is a common and better way. Example:

m := db.Insert(sql)

err := mq.Publish("B-Service-topic",m)

After the current service writes the data to the DB, it pushes a message to MQ, and the independent service consumes MQ to process the business logic. Compared with blocking retry, although MQ is much more stable than ordinary business services, there is still the possibility of failure when pushing messages to MQ, such as network problems, current service downtime, etc. This will still encounter the same problem of blocking retry, that is, the DB write is successful, but the push fails.

Theoretically speaking, in a distributed system, code involving multiple service calls will have such a situation. In long-term operation, call failures will definitely occur. This is also one of the difficulties in distributed system design.

TCC Compensation Affairs

When there are requirements for transactions and it is inconvenient to decouple them, TCC compensating transactions are a better choice.

TCC divides calling each service into 2 stages and 3 operations:

  • Phase 1. Try operation: Detect business resources and reserve resources, such as inventory inspection and withholding.

  • Phase 2, Confirm operation: Submit and confirm the resource reservation for the Try operation. For example, update inventory withholding to deduction.

  • Phase 2, Cancel operation: After the Try operation fails, the withheld resources are released. For example, add back the inventory withheld.

TCC requires each service to implement the API for the above three operations. The operation that was completed in one call before the service accessed the TCC transaction now needs to be completed in two stages and three operations.

For example, a mall application needs to call A inventory service, B amount service, and C points service, as shown in the following pseudo code:

 m := db.Insert(sql)
    aResult, aErr := A.Try(m)
 bResult, bErr := B.Try(m)
 cResult, cErr := C.Try(m)
 if cErr != nil {
  A.Cancel()
  B.Cancel()
    C.Cancel()
 } else {
  A.Confirm()
  B.Confirm()
  C.Confirm()
 }

In the code, the service APIs of A, B, and C are called respectively to check and reserve the resources. If they all return successfully, then submit the confirmation (Confirm) operation; if the Try operation of service C fails, the Cancel APIs of A, B, and C are called respectively to release their reservations. H.

TCC solves the problem of data consistency across multiple services and multiple databases in a distributed system. However, there are still some problems with the TCC method, which need to be paid attention to in actual use, including the call failure mentioned in the above chapter.

Empty release

If the C.Try() call in the above code actually fails, then the redundant C.Cancel() call below will release and not lock the resource. This is because the current service cannot determine whether the failed call actually locked the C resource. If you don’t call it, it actually succeeds, but returns failure due to network reasons. This will cause C’s resources to be locked and never released.

Null release often occurs in production environments. When the service implements the TCC transaction API, it should support the execution of null release.

Timing

In the above code, if C.Try() fails, then the C.Cancel() operation is called. Due to network reasons, it is possible that the C.Cancel() request will arrive at the C service first, and the C.Try() request will arrive later. This will cause an empty release problem, and at the same time cause C’s resources to be locked and never released.

So the C service should refuse Try() operations after releasing the resource. In terms of specific implementation, the unique transaction ID can be used to distinguish the first Try() or the Try() after release.

Call failed

During the calling process of Cancel and Confirm, there will still be failures, such as common network reasons.

Failure of the Cancel() or Confirm() operation will cause the resource to be locked and never be released. Common solutions to this situation include:

  1. Blocking retries. But there are the same problems, such as downtime and constant failure.

  2. Write to logs and queues, and then a separate asynchronous service automatically or manually intervenes to process. But there will still be problems. When writing logs or queues, there will be failures.

Theoretically speaking, there will be intermediate states in the two pieces of non-atomic and transactional code. If there are intermediate states, there is a possibility of failure.

Local message table

The local message table was originally proposed by eBay. It allows the local message table and the business data table to be in the same database, so that local transactions can be used to meet transaction characteristics.

The specific method is to insert a piece of message data when inserting business data in the local transaction. Then perform subsequent operations. If other operations are successful, the message will be deleted; if it fails, the message will not be deleted, and the message will be listened to asynchronously and retried continuously.

Local message tables are a good idea and can be used in many ways:

Cooperate with MQ

Example pseudocode:

 messageTx := tc.NewTransaction("order")
 messageTxSql := tx.TryPlan("content")

  m,err := db.InsertTx(sql,messageTxSql)
  if err!=nil {
    return err
  }

  aErr := mq.Publish("B-Service-topic",m)
  if aErr!=nil { // Push to MQ failed
    messageTx.Confirm() // Update message status to confirm
  }else {
    messageTx.Cancel() // Delete message
  }
//Asynchronously process the confirm message and continue pushing
func OnMessage(task *Task){
   err := mq.Publish("B-Service-topic", task.Value())
   if err==nil {
     messageTx.Cancel()
   }
}

In the above code, messageTxSql is a piece of SQL inserted into the local message table:

insert into `tcc_async_task` (`uid`,`name`,`value`,`status`) values ('?','?','?','?')

It is executed in the same transaction as the business SQL and either succeeds or fails.

If the push is successful, it will be pushed to the queue. If the push is successful, messageTx.Cancel() will be called to delete the local message; if the push fails, the message will be marked as confirm. There are two statuses try and confirm in the local message table. OnMessage can monitor either status. to initiate a retry.

Local transactions guarantee that messages and services will be written to the database. Regardless of subsequent execution downtime or network push failure, asynchronous monitoring can be used for subsequent processing, thus ensuring that messages will be pushed to MQ.

MQ guarantees that it will reach the consumer service. Using MQ’s QOS policy, the consumer service will be able to process it or continue to deliver it to the next business queue, thereby ensuring the integrity of the transaction.

Cooperate with service calls

Example pseudocode:

 messageTx := tc.NewTransaction("order")
 messageTxSql := tx.TryPlan("content")

  body,err := db.InsertTx(sql,messageTxSql)
  if err!=nil {
    return err
  }

  aErr := request.POST("B-Service",body)
  if aErr!=nil { // Failed to call B-Service
    messageTx.Confirm() // Update message status to confirm
  }else {
    messageTx.Cancel() // Delete message
  }
//Asynchronously process the confirm or try message and continue to call B-Service
func OnMessage(task *Task){
  // request.POST("B-Service",body)
}

This is an example of a local message table + calling other services, without the introduction of MQ. This use of asynchronous retries and the use of local message tables to ensure the reliability of messages solves the problems caused by blocking retries and is relatively common in daily development.

If there is no local operation to write to the DB, you can only write to the local message table, which is also processed in OnMessage:

messageTx := tc.NewTransaction("order")
messageTx := tx.Try("content")
aErr := request.POST("B-Service",body)
// ....

Message expiration

Configure the handler for the Try and Confirm messages of the local message table:

TCC.SetTryHandler(OnTryMessage())
TCC.SetConfirmHandler(OnConfirmMessage())

In the message processing function, it is necessary to determine whether the current message task has existed for too long. For example, if it still fails after retrying for an hour, consider sending emails, text messages, log alerts, etc. to allow manual intervention.

func OnConfirmMessage(task *tcc.Task) {
if time.Now().Sub(task.CreatedAt) > time.Hour {
    err := task.Cancel() // Delete this message and stop retrying.
   // doSomeThing() alarm, manual intervention
    return
 }
}

In the Try processing function, it is also necessary to separately determine whether the current message task is too short, because the message in the Try status may have just been created and has not yet been confirmed to be submitted or deleted. . This will repeat the execution of normal business logic, which means that successful calls will also be retried; to avoid this situation, you can check whether the creation time of the message is very short, and if it is short, it can be skipped.

The retry mechanism must rely on the idempotence of the business logic of the downstream API. Although it is feasible not to process it, the design should still try to avoid interfering with normal requests.

Independent messaging service

The independent message service is an upgraded version of the local message table, which separates the local message table into an independent service. Before all operations, add a message to the message service. If the subsequent operation is successful, the message will be deleted. If the subsequent operation fails, a confirmation message will be submitted.

Then use asynchronous logic to monitor the message and perform corresponding processing, which is basically the same as the processing logic of the local message table. However, since adding messages to the message service cannot be put into the same transaction as local operations, there will be cases where the message is added successfully but subsequently fails, and the message at this time is useless.

The following example scenario:

 err := request.POST("Message-Service",body)
  if err!=nil {
    return err
  }
  aErr := request.POST("B-Service",body)
  if aErr!=nil {
    return aErr
  }

This useless message requires the message service to confirm whether the message is executed successfully. If not, it will be deleted and the subsequent logic will continue to be executed. Compared with the local transaction tables try and confirm, the message service has an additional state prepare in front.

MQ Transaction

Some MQ implementations support transactions, such as RocketMQ. MQ transactions can be regarded as a specific implementation of independent message services, with completely consistent logic.

Before all operations, a message is delivered to MQ. If the subsequent operation is successful, Confirm will confirm the submission of the message, and if it fails, Cancel will delete the message. MQ transactions will also have the prepare state, which requires MQ’s consumption processing logic to confirm whether the business is successful.

Summary

From the perspective of distributed system practice, in order to ensure data consistency, additional mechanisms must be introduced.

The advantage of TCC is that it acts on the business service layer, does not rely on a specific database, is not coupled to a specific framework, and has flexible resource lock granularity, making it very suitable for microservice scenarios. The disadvantage is that each service must implement 3 APIs, which requires large business intrusions and changes, and various failure exceptions must be handled. It is difficult for developers to completely handle various situations. Finding a mature framework can greatly reduce costs, such as Alibaba’s Fescar.

The advantages of the local message table are that it is simple, does not rely on the transformation of other services, can be used well with service calls and MQ, and is practical in most business scenarios. The disadvantage is that the local database has more message tables, which are coupled with the business tables. The example of the local message table method in this article comes from a library written by the author. Interested students can refer to https://github.com/mushroomsir/tcc

The advantage of MQ transactions and independent message services is to isolate a common service to solve transaction problems, avoiding the coupling of message tables and services for each service, and increasing the processing complexity of the service itself. The disadvantage is that there are very few MQs that support transactions; and calling the API to add a message before each operation will increase the overall call delay, which is a redundant overhead in most normal response business scenarios.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,746 people are learning the system