How to design a reconciliation system that can handle tens of millions of data volumes, be distributed and highly available?

d2b97f8469589613f8b d1cfc597f085e.jpeg

The path to growth as a programmer

Internet/Programmer/Technology/Data Sharing

focus on

This article should take approximately 11 minutes to read.

From: juejin.cn/post/7259949655583506490

1Background

At present, online business volume is increasing day by day, with daily orders exceeding tens of millions, capital flows are large, and capital security has become a key concern. In order to ensure the accuracy of each transaction, improve the accuracy of funds and protect the interests of the business, in addition to the strict logic of the RD code, it is also necessary to check the flow of daily or even hourly orders, and handle abnormal situations in a timely manner.

Faced with tens of millions of orders, manual reconciliation is definitely not feasible. Therefore, it has become inevitable to implement a reconciliation system, which not only provides a basis for fund security, but also saves the company’s operation and maintenance manpower, and makes the data more visual.

At present, this system has covered 100% of the reconciliation business between aggregate channel gateways and external channels. It has completed the reconciliation of Alipay’s 100-million-level order volume during the Spring Festival Gala, and completed the reconciliation of 10-million-level order volume for daily AC projects. The reconciliation accuracy rate has achieved 6 9s. , saving 2~3 manpower for the company.

2Introduction

The reconciliation module is one of the core functions of the payment system. Different business designs have different reconciliation models, but they all encounter the following problems:

  • Massive data. Judging from the current order volume of aggregate payment, the designed reconciliation system needs to handle tens of millions of data volumes;

  • How to deal with orders with abnormal differences such as daily cuts, over-accounts, under-accounts, etc.;

  • Inconsistencies in bill format, bill download time, download method, etc.

In response to the above problems, combined with the characteristics of the financial aggregation payment system, this article will design a reconciliation system that can handle tens of millions of data volumes, is distributed and highly available, and uses the decoupling nature of the message queue Kafka to solve the various modules of the reconciliation system strong dependence between them.

The article introduces the reconciliation system from three aspects. The first aspect is an overall introduction to the design of the reconciliation system, and the implementation of each module and the design patterns used in the process are introduced in turn. The second aspect is an introduction to the process of version iteration of the reconciliation system. , why it is necessary to carry out version iteration, and the “pits” encountered during the version iteration process; thirdly, summarize the characteristics of the existing version and propose the next optimization ideas.

3 System Design

System structure diagram

Figure 1 is the general structure diagram of the reconciliation system, which is divided into six modules, namely file download module, file parsing and push module, platform data acquisition and push module, reconciliation execution module, reconciliation result statistics module and intermediate state module. Each module is responsible for its own functions.

26f614cbe9ba466144fde48cb7748b29.png

Figure 1 General structure diagram of the reconciliation system

Figure 2 is a state transition diagram implemented by the reconciliation system using Kafka. Each module exists independently, and each other realizes system status conversion through the message middleware Kafka, and realizes status updating and message sending through the intermediate UpdateReconStatus class. This design not only implements pipeline reconciliation, but also uses the characteristics of message middleware to achieve retry and decoupling between modules.

3a1985bc634239f9297ee80da4fcb014.png

Figure 2 Reconciliation system state transition diagram

In order to better understand the implementation process of each module, each module will be explained in turn below.

File download module

Design

The file download module mainly completes the download function of bills from various external channels. As we all know, aggregate payment is a payment method that integrates the capabilities of many three-party institutions, including Alipay, WeChat and other leaders in the payment industry. The diversity of payment channels leads to diversity in bill downloads. How to achieve multi-mode and pluggable The ability to download files has become the focus of this module’s design.

Analyzing the characteristics of Java design patterns, this module adopts the interface pattern, which conforms to the object-oriented design concept and can achieve quick access. The specific implementation class diagram is shown in Figure 3 (only part of the class diagram is shown).

86b035286d6a2b1f1bf194f29fb94a25.png

Figure 3 File download implementation class diagram

Let’s take the Alipay reconciliation file download method as an example to explain the implementation process in detail.

Implementation

Analyzing the Alipay interface document, the current download method is HTTPS, and the file format is a .csv compressed package. According to these conditions, the implementation of this system is as follows (only part of the code is extracted). Due to the mechanism of the message middleware Kafka and the intermediate module, the ability to retry has been considered from the system level, so there is no need to consider the retry mechanism, and the same is true for subsequent modules.

public interface BillFetcher {
    // ReconTaskMessage is kafka message,
    // FetcherConsumer customizes the processing method after downloading the bill
    String[] fetch(ReconTaskMessage message,FetcherConsumer consumer) throws IOException;
}
@Component
public class AlipayFetcher implements BillFetcher {

public AlipayFetcher(@Autowired BillDownloadService billDownloadService) {
        Security.addProvider(new BouncyCastleProvider());
billDownloadService.register(BillFetchWay.ALIPAY, this);
    }
    ...
    @Override
    public String[] fetch(ReconTaskMessage message, FetcherConsumer consumer) throws IOException {
 String appId = map.getString("appId");
 String privateKey = getConvertedPrivateKey(map.getString("privateKey"));
 String alipayPublicKey = getPublicKey(map.getString("publicKey"), appId);
 String signType = map.getString("signType");
 String url = "https://openapi.alipay.com/gateway.do";
 String format = "json";
 String charset = "utf-8";
 String billDate = DateFormatUtils.format(message.getBillDate(), DateTimeConstants.BILL_DATE_PATTERN);
 String notExists = "isp.bill_not_exist";
 String fileContentType = "application/oct-stream";
 String contentTypeAttr = "Content-Type";
 //instantiate client
 AlipayClient alipayClient = new DefaultAlipayClient(url, appId, privateKey, format, charset, alipayPublicKey, signType);
 //Instantiate the request class corresponding to the specific API, the class name corresponds to the interface name, and the current calling interface name
 AlipayDataDataserviceBillDownloadurlQueryRequest request = new AlipayDataDataserviceBillDownloadurlQueryRequest();
 // trade refers to the merchant’s business bill based on Alipay transaction receipt
 // signcustomer refers to the account bill based on the merchant's Alipay balance income and expenditure and other fund changes
 request.setBizContent("{" +
  ""bill_type":"trade"," +
  ""bill_date":"" + billDate + """ +
                " }");
        AlipayDataDataserviceBillDownloadurlQueryResponse response = alipayClient.execute(request);
        if(response.isSuccess()){
            //do obtains the reconciliation file based on the download address and places the file in the specified directory through streaming
            ...
            System.out.println("Call successful");
        } else {
            System.out.println("Call failed");
        }
    }
}

Specific steps:

  • Rewrite the construction method, inject the implementation class into a map, and obtain the implementation class according to the corresponding download method;

  • Implement the fetch interface, including constructing request parameters, requesting Alipay, parsing the response results, using streaming to put files into the corresponding directory, and handling exceptions in the process.

File parsing and pushing module

Design

As mentioned earlier, aggregate payment faces different external channels, and the diversity of reconciliation documents is self-evident. For example, WeChat uses txt format, Alipay uses csv format, etc., and the bill content of each channel is also inconsistent. How to solve the bill differences between channels has become a key issue to be considered in this template.

Through research and analysis of the existing reconciliation system, this system adopts the implementation method of interface mode + RDF (structured text file). The interface mode solves the problem of multi-mode billing and also implements a pluggable mechanism and RDF tool component. Achieve rapid standardization of bills and make the operation simple and easy to understand.

The specific implementation class diagram is shown in Figure 4 (only part of the class diagram is shown).

422453d62fad693bde4e72d67506a568.png

Figure 4 File standardization implementation class diagram

Let’s take Alipay reconciliation file analysis as an example to explain the implementation process in detail.

Implementation

According to Alipay’s bill format, define the RDF standard template in advance. Subsequent bill analysis will parse each line of the reconciliation file into a corresponding entity class based on the template. It should be noted that the fields of the standard template must correspond to the bill data one-to-one. The entity class There can be more fields than bill fields, but all bill fields must be included. The interface is defined as follows:

public interface BillConverter<T> {
    //Whether the bill can use the matcher
    boolean match(String channelType, String name);
    //Convert the original reconciliation file to Hive
    void convertBill(InputStream sourceFile, ConverterConsumer<T> consumer) throws IOException;
    //Convert the original reconciliation file to Hive
    void convertBill(String localPath, ConverterConsumer<T> consumer) throws IOException;
}

The specific implementation steps are shown in Figure 5:

9010c0b3f25f6044a81891b772fb66b1.png

Figure 5 File parsing flow chart

1. Define the RDF standard template. The following is the Alipay business flow details template. The field names in the body structure must be consistent with the entity class names.

{
  "head": [
    "title|Alipay business details query|Required",
    "merchantId|Account|Required",
    "billDate|StartDate|Required",
    "line|Business details list|Required",
    "header|header|Required"
  ],
  "body": [
    "channelNo|Alipay transaction number",
    "merchantNo|Merchant order number",
    "businessType|business type",
    "production|product name",
    "createTime|Creation time|Date:yyyy-MM-dd HH:mm:ss",
    "finishTime|Complete time|Date:yyyy-MM-dd HH:mm:ss",
    "storeNo|store number",
    "storeName|store name",
    "operator|operator",
    "terminalNo|terminal number",
    "account|other party's account",
    "orderAmount|order amount|BigDecimal",
    "actualReceipt|Merchant actual receipt|BigDecimal",
    "alipayRedPacket|Alipay Red Packet|BigDecimal",
    "jiFenBao|jifenbao|BigDecimal",
    "alipayPreferential|Alipay Preferential|BigDecimal",
    "merchantPreferential|Merchant Preferential|BigDecimal",
    "cancelAfterVerificationAmount|coupon verification amount|BigDecimal",
    "ticketName|ticket name",
    "merchantRedPacket|Merchant red envelope consumption amount|BigDecimal",
    "cardAmount|card consumption amount|BigDecimal",
    "refundOrRequestNo|Refund batch number/request number",
    "fee|Service fee|BigDecimal",
    "feeSplitting|Splitting|BigDecimal",
    "remark|note",
    "merchantIdNo|Merchant Identification Number"
  ],
  "tail": [
    "line|End of business details list|Required",
    "tradeSummary|Trade summary|Required",
    "refundSummary|Refund Total|Required",
    "exportTime|Export time|Required"
  ],
  "protocol": "alib",
  "columnSplit":","
}

2. Implement the getChannelType and match methods of the interface. These two methods are used to match which Convert class is used. If matching Alipay bills, the implementation method is:

@Override
public String getChannelType() {
    return ChannelType.ALI.name();
}
@Override
public boolean match(String channelType, String name) {
    return name.endsWith(".csv.zip");
}

3. Implement the convertBill method of the interface to complete bill standardization;

@Override
public void convertBill(String path, ConverterConsumer<ChannelBillPojo> consumer) throws IOException {
    FileConfig config = new FileConfig(path, "rdf/alipay-business.json", new StorageConfig("nas"));
    config.setFileEncoding("UTF-8");
    FileReader fileReader = FileFactory.createReader(config);
    AlipayBusinessConverter.AlipayBusinessPojo row;
    try {
        while (null != (row = fileReader.readRow(AlipayBusinessConverter.AlipayBusinessPojo.class))) {
            convert(row, consumer);
        }
        ...
}

4. Push standardized bills to Hive.

Platform data acquisition and push module

Platform data is generally obtained from the database. When the amount of data is small, the pressure on the database during query will not be great. However, when the amount of data is large, such as e-commerce transactions, where the daily transaction volume is more than 1 million, query through the database. It is not advisable, not only is it inefficient, but it can also easily cause the database to crash and affect online transactions. This will be reflected in subsequent version iterations.

Therefore, the platform data is extracted from Hive, and only the transaction data needs to be synchronized to the Hive table in advance. This is not only efficient, but also safer. Considering the different Hive tables extracted and the table structure of the data, the data collector Collector class also adopts the interface mode. The Collector interface is defined as follows:

public interface DataCollector {
    void collect(OutputStream os) throws IOException;
}

According to the current implementation of the platform data collector, the class diagram can be obtained as shown in Figure 6.

32fca9984f2d52388d991f7e90b9a59b.png

Figure 6 Platform data collector implementation class diagram

Execute reconciliation module

This module mainly completes the execution of Hive commands. Under the premise that all platform bills and channel bills have been pushed to Hive, it uses Hive’s high efficiency in processing big data to execute full connection sql and store the results in the specified Hive table. Used for reconciliation result statistics. The execution of reconciliation SQL can be determined according to business needs. If you need to know the full connection SQL of this system, please feel free to communicate with me.

Reconciliation result statistics module

After the reconciliation task is successfully executed, it is necessary to count the data after full connection, focusing on statistical inconsistencies in amounts, inconsistent statuses, daily cuts, under-accounts (the platform has no accounts, but the channels have accounts) and multiple accounts (the platform has accounts, but the channels have no accounts), etc. difference. For different situations, this system adopts the following solutions:

  • Amount inconsistent: The front-end page displays the reason for the discrepancy and manually checks it;

  • Inconsistent status: For refund orders, query the platform’s refund table. If it exists and the amounts are agreed, it is considered to be equal, and the difference will not be displayed. In other cases, the reason for the difference needs to be displayed on the front-end page and verified manually;

  • Riche: When the platform order is successful and there is no order in the channel, it will be judged based on the creation time of the platform order whether there may be a daily cut. If it is judged to be a daily cut order, the order will be stored in the buffer file. , after the statistics are completed, upload the buffer file to the Hive daily cut table, and wait for this part of the data to be reloaded the next day to achieve cross-day reconciliation. For the situation where there is no order on the platform but there is an order on the channel, check the platform database to determine whether there are differences. If there are differences, the differences need to be displayed on the front-end page and verified manually.

  • Underaccounts: Currently, it is mainly determined whether there are differences by querying the platform database. When it is confirmed that there are indeed differences, the differences need to be displayed on the front-end page and verified manually.

  • Multiple accounts: At present, this may be a daily cut, and the daily cut will be considered first. If it is not within the scope of the daily cut, the difference needs to be displayed on the front-end page and checked manually.

Intermediate module

The intermediate state module is a module used for state conversion between modules. It uses Kafka and the mechanism of whether the state is updated to realize message retransmission and reconciliation status update. From one state to the next, the current state must be successful before the reconciliation process will proceed to the next step.

The design of the intermediate state not only solves the retry problem, but also converges the database operations, which is more in line with the modular design, and each module performs its own duties. The number of retries is not unlimited. The current number of retries is set to 3. If it still fails after 3 retries, a lark notification will be sent and manual intervention will be required to solve the problem.

In short, reconciliation work is neither complicated nor complicated. It requires us to be careful, have a deep understanding of the business, choose the appropriate processing method, and continuously iterate and optimize the system for different businesses.

4 version iterations

The design of the system is greatly affected by the scale of the business. For financial aggregation payment, the order volume has changed by several orders of magnitude. In this process, problems in the reconciliation system are constantly exposed. It is inevitable to optimize and improve the reconciliation system. thing. From system design to the present, it can be roughly divided into three stages: initial stage, transition stage and current stage.

Initial version (v1.0)

After the initial version went online, it realized the automation of the reconciliation of aggregation channels. Especially during the 2018 Spring Festival activities, fund security provided an important guarantee and realized the reconciliation between aggregation and Laohezhong, Alipay, WeChat and other channels. With the development of financial business and the rapid rise of Douyin e-commerce, the reconciliation system has gradually exposed its shortcomings, such as an increase in reconciliation task failures, especially reconciliations with large amounts of data, display of abnormal difference results, and low reconciliation efficiency. question. Through continuous analysis, we found the following problems:

  • The system files are placed in the temporary directory tmp. The TCE platform will clean the files in this directory regularly. As a result, when pushing files to Hive, it will report that the files cannot be found, especially for reconciliation tasks with large amounts of data;

  • The accumulation of Kafka messages causes the reconciliation process to be interrupted, mainly due to the addition of new channels and the increase in reconciliation tasks. At the same time, the Hive execution queue is a shared queue, and most of the reconciliation processes are stuck because there are no resources;

  • The display of abnormal difference results is mainly due to the fact that there is no retry mechanism in the query. When an exception such as a timeout occurs during the query process, abnormal difference results will appear. There are also abnormal difference results caused by the small daily span.

Transitional version (v2.0)

Taking into account the shortcomings of the initial version of the reconciliation system and the urgency of the reconciliation function, the initial version was transitionally optimized to initially implement the reconciliation function for large amounts of data, while also improving the accuracy of the difference results. Compared with the initial version, this version mainly has the following optimizations:

  • The file storage directory has been changed from temporary to a directory under the service to prevent large files from being recycled and delete files after they are uploaded to Hive;

  • Re-apply for the exclusive execution queue to solve the problem of the reconciliation process being stuck due to insufficient resources;

  • A new retry mechanism is added to the query, and the daily query span is increased to solve the problem of abnormal display of difference results and provide the accuracy of difference results.

The transitional version focuses on solving the obvious problems of the initial version, but does not completely solve some potential problems, such as low code fault tolerance, slow manual response after abnormal reconciliation tasks, low reconciliation efficiency, and low database security.

Current version (v3.0)

The purpose of the optimization of the current version is to achieve the “three highs” of the reconciliation system, which are high efficiency, high accuracy (six nines) and high stability.

  • For high efficiency, it is mainly reflected in the slow acquisition of platform data and the existence of database security issues. This logic has been optimized and the data acquisition method has been changed from the original database to the highly efficient Hive. To obtain it, you only need to synchronize the data to the Hive table in advance.

  • For high accuracy, we mainly optimize the reconciliation difference processing logic, further refine the difference processing method, add difference result alarms, and refine the reasons for differences on the front-end pages.

  • For high stability, it mainly optimizes the RDF processing and adds a new cover-up logic when an exception occurs in the reconciliation file to improve the fault tolerance of the system; adds an alarm when the reconciliation task fails or exceeds the specified retry threshold to speed up manual response. Rate; add current limit to database logic for operations such as order checking to prevent database crash.

The version iteration process can be summarized as follows. I hope readers will not fall into the trap again, especially in the processing of large files.

449f0a78b3b8fab28ecb2ebe63c885c7.png

5 Summary

The reconciliation system model is closely related to the business. Different businesses will have different reconciliation system models. However, the overall architecture of most reconciliation systems has not changed much. The main difference is the implementation of each module.

I hope that the reconciliation system introduced in this article can provide design ideas for readers and avoid repeated pitfalls. Students who are interested in the reconciliation system can have a detailed chat with students from the financial payment team, discuss it in depth together, and put forward optimization suggestions, such as optimizing the full connection strategy.

Recommended reading:

Another domestic IDE comes out! It has nothing to do with VS Code, “pure self-research”

Four ways to prevent SQL injection in Java projects

Internet interview questions for junior high school and senior companies (9 Gs)
The content includes Java basics, JavaWeb, MySQL performance optimization, JVM, locks, millions of concurrency, message queues, high-performance caching, reflection, Spring family bucket principles, microservices, Zookeeper... and other technology stacks!
?Click to read the original text to get it! I have read it

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 17031 people are learning the system