Design practice of tens of millions of high-availability distributed reconciliation systems

Programmer Hei Ge 2023-11-10 15:55 Published in Hunan

included in collection

#Code 14

#Design1

#Java218

#programming141

#programmer157

Background

At present, online business volume is increasing day by day, with daily orders exceeding tens of millions, capital flows are large, and capital security has become a key concern. In order to ensure the accuracy of each transaction, improve the accuracy of funds and protect the interests of the business, in addition to the strict logic of the RD code, it is also necessary to check the flow of daily or even hourly orders, and handle abnormal situations in a timely manner. Faced with tens of millions of orders, manual reconciliation is definitely not feasible. Therefore, it has become inevitable to implement a reconciliation system, which not only provides a basis for fund security, but also saves the company’s operation and maintenance manpower, and makes the data more visual. At present, this system has covered 100% of the reconciliation business between the aggregation channel gateway and external channels, completed the reconciliation of Alipay’s billion-level order volume during the Spring Festival Gala, and completed the daily AC project billion-level order volume< /strong>Reconciliation, accuracy of reconciliation achieved 6 9s, saving 2~3 manpower for the company.

Introduction

The reconciliation module is one of the core functions of the payment system. Different business designs have different reconciliation models, but they all encounter the following problems:

  • Massive data. Judging from the current order volume of aggregate payment, the designed reconciliation system needs to handle tens of millions of data volumes;

  • How to deal with orders with abnormal differences such as daily cuts, over-accounts, under-accounts, etc.;

  • Inconsistencies in bill format, bill download time, download method, etc.

In response to the above problems, combined with the characteristics of the financial aggregation payment system, this article will design a reconciliation system that can handle tens of millions of data volumes, is distributed and highly available, and uses the decoupling nature of the message queue Kafka to solve the various modules of the reconciliation system strong dependence between them. The article introduces the reconciliation system from three aspects. The first aspect is an overall introduction to the design of the reconciliation system, and the implementation of each module and the design patterns used in the process are introduced in turn. The second aspect is an introduction to the process of version iteration of the reconciliation system. , why it is necessary to carry out version iteration, and the “pits” encountered during the version iteration process; thirdly, summarize the characteristics of the existing version and propose the next optimization ideas.

System design

System structure diagram

Figure 1 is the general structure diagram of the reconciliation system, which is divided into six modules, namely file download module, file parsing and push module, platform data acquisition and push module, reconciliation execution module, reconciliation result statistics module and intermediate state module. Each module is responsible for its own functions.

Picture

Figure 1 General structure diagram of the reconciliation system

Figure 2 is a state transition diagram implemented by the reconciliation system using Kafka. Each module exists independently, and each other realizes system status conversion through the message middleware Kafka, and realizes status updating and message sending through the intermediate UpdateReconStatus class. This design not only implements pipeline reconciliation, but also uses the characteristics of message middleware to achieve retry and decoupling between modules.

Picture

Figure 2 Reconciliation system state transition diagram

In order to better understand the implementation process of each module, each module will be explained in turn below.

File download module
Design

The file download module mainly completes the download function of bills from various external channels. As we all know, aggregate payment is a payment method that integrates the capabilities of many three-party institutions, including Alipay, WeChat and other leaders in the payment industry. The diversity of payment channels leads to diversity in bill downloads. How to achieve multi-mode and pluggable The ability to download files has become the focus of this module’s design. Analyzing the characteristics of Java design patterns, this module adopts the interface pattern, which conforms to the object-oriented design concept and can achieve quick access. The specific implementation class diagram is shown in Figure 3 (only part of the class diagram is shown).

Picture

Figure 3 File download implementation class diagram

Let’s take the Alipay reconciliation file download method as an example to explain the implementation process in detail.

Implementation

Analyzing the Alipay interface document, the current download method is HTTPS, and the file format is a .csv compressed package. According to these conditions, the implementation of this system is as follows (only part of the code is extracted). Due to the mechanisms of the message middleware Kafka and the intermediate module, the ability to retry has been considered from the system level, so there is no need to consider the retry mechanism, and the same is true for subsequent modules.

public interface BillFetcher {<!-- --></code><code> // ReconTaskMessage is the kafka message, </code><code> // FetcherConsumer is the processing method after customizing the bill download</code><code> String[] fetch(ReconTaskMessage message,FetcherConsumer consumer) throws IOException;</code><code>}
@Component</code><code>public class AlipayFetcher implements BillFetcher {<!-- --></code>
<code>public AlipayFetcher(@Autowired BillDownloadService billDownloadService) {<!-- --></code><code> Security.addProvider(new BouncyCastleProvider());</code><code>billDownloadService.register(BillFetchWay.ALIPAY , this);</code><code> }</code><code> ...</code><code> @Override</code><code> public String[] fetch(ReconTaskMessage message, FetcherConsumer consumer) throws IOException {<!-- --></code><code> String appId = map.getString("appId");</code><code> String privateKey = getConvertedPrivateKey(map.getString("privateKey "));</code><code> String alipayPublicKey = getPublicKey(map.getString("publicKey"), appId);</code><code> String signType = map.getString("signType" );</code><code> String url = "https://openapi.alipay.com/gateway.do";</code><code> String format = "json";</code> <code> String charset = "utf-8";</code><code> String billDate = DateFormatUtils.format(message.getBillDate(), DateTimeConstants.BILL_DATE_PATTERN);</code><code> String notExists = \ "isp.bill_not_exist";</code><code> String fileContentType = "application/oct-stream";</code><code> String contentTypeAttr = "Content-Type";</code><code> //Instantiate the client</code><code> AlipayClient alipayClient = new DefaultAlipayClient(url, appId, privateKey, format, charset, alipayPublicKey, signType);</code><code> //Instantiate the specific API corresponding request class, the class name corresponds to the interface name, the current calling interface name</code><code> AlipayDataDataserviceBillDownloadurlQueryRequest request = new AlipayDataDataserviceBillDownloadurlQueryRequest(); </code><code> // trade refers to the merchant's business bill based on Alipay transaction receipt </code><code> //custom signer refers to the account bill based on the merchant's Alipay balance income and expenditure and other fund changes</code><code> request.setBizContent("{" + </code><code> ""bill_type":"trade"," + </code><code> ""bill_date":"" + billDate + """ + </code> <code> " }");</code><code> AlipayDataDataserviceBillDownloadurlQueryResponse response = alipayClient.execute(request);</code><code> if(response.isSuccess()){<!-- --> </code><code> //do Get the reconciliation file based on the download address and put the file in the specified directory through streaming </code><code> ...</code><code> System.out .println("Call successful");</code><code> } else {<!-- --></code><code> System.out.println("Call failed");</code><code> }</code><code> }</code><code>}

Specific steps:

  1. Rewrite the construction method, inject the implementation class into a map, and obtain the implementation class according to the corresponding download method;

  2. Implement the fetch interface, including constructing request parameters, requesting Alipay, parsing the response results, using streaming to put files into the corresponding directory, and handling exceptions in the process.

File parsing and pushing module
Design

As mentioned earlier, aggregate payment faces different external channels, and the diversity of reconciliation documents is self-evident. For example, WeChat uses txt format, Alipay uses csv format, etc., and the bill content of each channel is also inconsistent. How to solve the bill differences between channels has become a key issue to be considered in this template. Through research and analysis of the existing reconciliation system, this system adopts the implementation method of interface mode + RDF (structured text file). The interface mode solves the problem of multi-mode billing and also implements a pluggable mechanism and RDF tool component. Achieve rapid standardization of bills and make the operation simple and easy to understand. The specific implementation class diagram is shown in Figure 4 (only part of the class diagram is shown).

Picture

Figure 4 File standardization implementation class diagram

Let’s take Alipay reconciliation file analysis as an example to explain the implementation process in detail.

Implementation

According to Alipay’s bill format, define the RDF standard template in advance. Subsequent bill analysis will parse each line of the reconciliation file into a corresponding entity class based on the template. It should be noted that the fields of the standard template must correspond to the bill data one-to-one. The entity class There can be more fields than bill fields, but all bill fields must be included. The interface is defined as follows:

public interface BillConverter<T> {<!-- --></code><code> //Whether bill can use matcher</code><code> boolean match(String channelType, String name);</code><code> //Convert the original reconciliation file to Hive</code><code> void convertBill(InputStream sourceFile, ConverterConsumer<T> consumer) throws IOException;</code><code> //Convert the original reconciliation File to Hive</code><code> void convertBill(String localPath, ConverterConsumer<T> consumer) throws IOException;</code><code>}

The specific implementation steps are shown in Figure 5:

Picture

Figure 5 File parsing flow chart

  1. Define the RDF standard template. The following is the Alipay business flow details template. The field names in the body structure must be consistent with the entity class names.

{<!-- --></code><code> "head": [</code><code> "title|Alipay business details query|Required",</code><code> "merchantId|Account|Required",</code><code> "billDate|Start Date|Required",</code><code> "line|Business Details List|Required", </code><code> "header|header|Required"</code><code> ],</code><code> "body": [</code><code> "channelNo| Alipay transaction number",</code><code> "merchantNo|Merchant order number",</code><code> "businessType|Business type",</code><code> "production| Product name",</code><code> "createTime|Creation time|Date:yyyy-MM-dd HH:mm:ss",</code><code> "finishTime|Complete time|Date: yyyy-MM-dd HH:mm:ss",</code><code> "storeNo|store number",</code><code> "storeName|store name",</code><code> "operator|Operator",</code><code> "terminalNo|Terminal number",</code><code> "account|Other account",</code><code> "orderAmount|Order amount|BigDecimal",</code><code> "actualReceipt|Merchant actual receipt|BigDecimal",</code><code> "alipayRedPacket|Alipay red envelope|BigDecimal",</code><code> "jiFenBao|jifenbao|BigDecimal",</code><code> "alipayPreferential|Alipay discount|BigDecimal",</code><code> "merchantPreferential|merchant discount|BigDecimal ",</code><code> "cancelAfterVerificationAmount|ticket cancellation amount|BigDecimal",</code><code> "ticketName|ticket name",</code><code> "merchantRedPacket| Merchant red envelope consumption amount|BigDecimal",</code><code> "cardAmount|Card consumption amount|BigDecimal",</code><code> "refundOrRequestNo|Refund batch number/request number", </code><code> "fee|Service fee|BigDecimal",</code><code> "feeSplitting|Profit sharing|BigDecimal",</code><code> "remark|Remarks" ,</code><code> "merchantIdNo|Merchant Identification Number"</code><code> ],</code><code> "tail": [</code><code> "line |End of business details list|Required",</code><code> "tradeSummary|Transaction total|Required",</code><code> "refundSummary|Refund total|Required",</code><code> "exportTime|Export time|Required"</code><code> ],</code><code> "protocol": "alib",</code><code> \ "columnSplit":","</code><code>}
  1. Implement the getChannelType and match methods of the interface. These two methods are used to match which Convert class is used. If matching Alipay bills, the implementation method is:

@Override</code><code>public String getChannelType() {<!-- --></code><code> return ChannelType.ALI.name();</code><code>}</code><code>@Override</code><code>public boolean match(String channelType, String name) {<!-- --></code><code> return name.endsWith(".csv. zip");</code><code>}
  1. Implement the convertBill method of the interface to complete bill standardization;

@Override</code><code>public void convertBill(String path, ConverterConsumer<ChannelBillPojo> consumer) throws IOException {<!-- --></code><code> FileConfig config = new FileConfig(path, "rdf/alipay-business.json", new StorageConfig("nas"));</code><code> config.setFileEncoding("UTF-8");</code><code> FileReader fileReader = FileFactory.createReader(config);</code><code> AlipayBusinessConverter.AlipayBusinessPojo row;</code><code> try {<!-- --></code><code> while (null != (row = fileReader.readRow(AlipayBusinessConverter.AlipayBusinessPojo.class))) {<!-- --></code><code> convert(row, consumer);</code><code> }</code><code> ...</code><code>}
  1. Push standardized bills to Hive.

Platform data acquisition and push module

Platform data is generally obtained from the database. When the amount of data is small, the pressure on the database will not be great when querying. However, when the amount of data is large, such as e-commerce transactions, where the daily transaction volume is more than 1 million, query through the database. It is not advisable, not only is it inefficient, but it can also easily cause the database to crash and affect online transactions. This will be reflected in subsequent version iterations. Therefore, the platform data is extracted from Hive, and only the transaction data needs to be synchronized to the Hive table in advance. This is not only efficient, but also safer. Considering the different Hive tables extracted and the table structure of the data, the data collector Collector class also adopts the interface mode. The Collector interface is defined as follows:

public interface DataCollector {<!-- --></code><code> void collect(OutputStream os) throws IOException;</code><code>}

According to the current implementation of the platform data collector, the class diagram can be obtained as shown in Figure 6.

Picture

Figure 6 Platform data collector implementation class diagram

Execute reconciliation module

This module mainly completes the execution of Hive commands. Under the premise that all platform bills and channel bills have been pushed to Hive, it uses Hive’s high efficiency in processing big data to execute full connection sql and store the results in the specified Hive table. Used for reconciliation result statistics. The execution of reconciliation SQL can be determined according to business needs. If you need to know the full connection SQL of this system, please feel free to communicate with me.

Reconciliation result statistics module

After the reconciliation task is successfully executed, it is necessary to count the data after full connection, focusing on statistical inconsistencies in amounts, inconsistent statuses, daily cuts, under-accounts (the platform has no accounts, but the channels have accounts) and multiple accounts (the platform has accounts, but the channels have no accounts), etc. difference. For different situations, this system adopts the following solutions:

  1. Amounts are inconsistent: the front-end page displays the reason for the discrepancy and manually checks it;

  2. Inconsistent status: For refund orders, query the platform’s refund table. If it exists and the amounts are agreed to be equal, the difference will not be displayed. In other cases, the reason for the difference needs to be displayed on the front-end page and verified manually;

  3. Daily cut: When the platform order is successful and there is no order in the channel, it will be judged based on the creation time of the platform order whether there may be a daily cut. If it is judged to be a daily cut order, the order will be stored in the buffer file. After the statistics are completed, it will be The buffer file is uploaded to the Hive daily cut table, and this part of the data is reloaded the next day to achieve cross-day reconciliation. For the situation where there is no order on the platform but there is an order on the channel, check the platform database to determine whether there are differences. If there are differences, the differences need to be displayed on the front-end page and verified manually.

  4. Underaccounting: Currently, it is mainly determined whether there are differences by querying the platform database. When it is confirmed that there are indeed differences, the differences need to be displayed on the front-end page and verified manually.

  5. Multiple accounts: At present, this may be a daily cut, and the daily cut will be considered first. If it is not within the scope of the daily cut, the difference needs to be displayed on the front-end page and checked manually.

Intermediate module

The intermediate state module is a module used for state conversion between modules. It uses Kafka and the mechanism of whether the state is updated to realize message retransmission and reconciliation status update. From one state to the next, the current state must be successful before the reconciliation process will proceed to the next step. The design of the intermediate state not only solves the retry problem, but also converges the database operations, which is more in line with the modular design, and each module performs its own duties. The number of retries is not unlimited. The current number of retries is set to 3. If it still fails after 3 retries, a lark notification will be sent and manual intervention will be required to solve the problem.

In short, reconciliation work is neither complicated nor complicated. It requires us to be careful, have a deep understanding of the business, choose the appropriate processing method, and continuously iterate and optimize the system for different businesses.

Version iteration

The design of the system is greatly affected by the scale of the business. For financial aggregation payment, the order volume has changed by several orders of magnitude. In this process, problems in the reconciliation system are constantly exposed. It is inevitable to optimize and improve the reconciliation system. thing. From system design to the present, it can be roughly divided into three stages: initial stage, transition stage and current stage.

Initial version (v1.0)

After the initial version went online, it realized the automation of the reconciliation of aggregation channels. Especially during the 2018 Spring Festival activities, fund security provided an important guarantee and realized the reconciliation between aggregation and Laohezhong, Alipay, WeChat and other channels. With the development of financial business and the rapid rise of Douyin e-commerce, the reconciliation system has gradually exposed its shortcomings, such as an increase in reconciliation task failures, especially reconciliations with large amounts of data, display of abnormal difference results, and low reconciliation efficiency. question. Through continuous analysis, we found the following problems:

  1. The system files are placed in the temporary directory tmp. The TCE platform will clean the files in this directory regularly. As a result, when pushing files to Hive, it will report that the files cannot be found, especially for reconciliation tasks with large amounts of data;

  2. The accumulation of Kafka messages causes the reconciliation process to be interrupted, mainly due to the addition of new channels and the increase in reconciliation tasks. At the same time, the Hive execution queue is a shared queue, and most of the reconciliation processes are stuck because there are no resources;

  3. The display of abnormal difference results is mainly due to the fact that there is no retry mechanism in the query. When an exception such as a timeout occurs during the query process, abnormal difference results will appear. There are also abnormal difference results caused by the small daily span.

Transitional version (v2.0)

Taking into account the shortcomings of the initial version of the reconciliation system and the urgency of the reconciliation function, the initial version was transitionally optimized to initially implement the reconciliation function for large amounts of data, while also improving the accuracy of the difference results. Compared with the initial version, this version mainly has the following optimizations:

  1. The file storage directory has been changed from temporary to a directory under the service to prevent large files from being recycled and delete files after they are uploaded to Hive;

  2. Re-apply for the exclusive execution queue to solve the problem of the reconciliation process being stuck due to insufficient resources;

  3. A new retry mechanism is added to the query, and the daily query span is increased to solve the problem of abnormal display of difference results and provide the accuracy of difference results.

The transitional version focuses on solving the obvious problems of the initial version, but does not completely solve some potential problems, such as low code fault tolerance, slow manual response after abnormal reconciliation tasks, low reconciliation efficiency, and low database security.

Current version (v3.0)

The purpose of the current version of optimization is to achieve the “three highs” of the reconciliation system, which are high efficiency, high accuracy (six nines) and high stability.

For high efficiency, it is mainly reflected in the slow acquisition of platform data and the existence of database security issues. This logic has been optimized and the data acquisition method has been changed. From the original database acquisition to the high-efficiency Hive, it only needs to be obtained in advance. Just synchronize the data to the Hive table.

For high accuracy, we mainly optimize the reconciliation difference processing logic, further refine the difference processing methods, add difference result alarms, and refine the causes of differences on the front-end pages.

For high stability, it mainly optimizes the RDF processing reconciliation file and adds a new cover-up logic to improve the fault tolerance of the system; adds an alarm when the reconciliation task fails or exceeds the specified retry threshold to speed up the manual response rate; check orders and other operations Database logic increases current limit to prevent database crash.

The version iteration process can be summarized as follows. I hope readers will not fall into the trap again, especially in the processing of large files.

Picture

Summary

The reconciliation system model is closely related to the business. Different businesses will have different reconciliation system models. However, the overall architecture of most reconciliation systems has not changed much. The main difference is the implementation of each module. I hope that the reconciliation system introduced in this article can provide design ideas for readers and avoid repeated pitfalls.

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 17053 people are learning the system

syntaxbug.com © 2021 All Rights Reserved.