How can Suning’s over 600 million members achieve user portrait query in seconds?

User portrait is to build a labeled user model based on user characteristics, business scenarios, user behavior and other information. For example, consumer user portraits are divided into two categories: attributes and behavior tags. Of these two types of labels, one is fixed and the other is variable. The fixed attribute tags are basically the buyer’s gender, age group, membership level, consumption level, purchasing power, etc. The variable behavior tags basically include the buyer’s browsing, shopping cart, purchase and other behaviors.

After years of construction, Suning has built a complete user tag system covering multiple industries such as retail, finance, and sports. At the same time, a tag service platform has been built to provide intelligent tag middle-end service capabilities for advertising, recommendations, etc. by opening up rich tag data capabilities.

With the increasing amount of data, how to conduct second-level user portraits on 600 million + user billions-level label data? This article will bring the new development and architectural practice of user profiling technology, introduce the tag platform customized and developed based on ClickHouse, truly achieve the rapid import of massive tag data and second-level user profiling query analysis, and provide a complete set of crowd screening from early stage to later stage. Labeling system for marketing strategy optimization.

1. Introduction to business scenarios

“Double 11” is here. Assuming that 10 million home appliance coupons need to be issued, we first need to filter out the eligible people based on the tags. The number is about 10 million, and then conduct a profile analysis on the selected people to see if they meet expectations. Characteristics. If the crowd meets the characteristics, the system will generate a crowd package (userid list) for marketing with one click and automate publishing and marketing.

Figure 1: Business process

1.1 Estimated number of people

Users select tags and the intersection and difference relationships between tags, select qualified people, and estimate the number of people in real time. For example, choose:

Figure 2: Creating a crowd

The meaning of the label selection in the above picture is: “User age range is 25-36 years old” and “smart home characteristics” people, excluding people who spent less than 10 yuan in the last 30 days. The formula expressed as a set operation is:

{ {User age 25-36} ∩ {Smart home crowd} } - {Consumption less than 10 yuan in 30 days}

Technical difficulties include:

  • There are many crowds.
  • Each crowd package has a large user base.
  • The system outputs calculation results in real time, which is difficult.

1.2 Portrait Analysis

When the number of users selected matches the planned number of consumer coupons, characteristic analysis of the crowd is required to see whether the crowd meets the characteristic requirements. An example of the structure of a user portrait table is as follows:


Associate the filtered crowd package with the user portrait table, and analyze the associated profile characteristics in detail. It is also possible to further conduct some historical data analysis on the portrait features.

Our previous solution was to store user tags in a large wide table in ElasticSearch. The structure of a large wide table is: a table structure in which a user attaches a bunch of tags. When inserting data into a large wide table, you need to wait until all business data is ready before running the related table operation, and then insert the related results into ES.

A common situation encountered is that the task of a certain business party is delayed, resulting in the inability to execute the associated tasks inserted into the ES, and the operation personnel cannot use the latest portrait data in a timely manner.

Modifying the document structure in ES is a relatively heavy operation. Modifying or deleting tags is time-consuming. ES’s multi-dimensional aggregation performance is relatively poor. ES’s DSL syntax is not friendly to developers, so we replaced the tag storage engine from ES to ClickHouse. .

ClickHouse is an open source columnar database that has attracted much attention in recent years and is mainly used in the field of data analysis (OLAP). With its excellent query performance, it has been favored by the industry, and major manufacturers have followed up to use it on a large scale. Suning Big Data has introduced and transformed ClickHouse, encapsulating it into a rich Bitmap interface to support the storage and analysis of the tag platform.

2. ClickHouse integrates Bitmap

We integrated RoaringBitmap in ClickHouse, implemented the Bitmap calculation function set, and contributed it to the open source community. The userid is compressed and stored as a bitmap, and the intersection and difference calculation of crowd packets is handed over to a high-efficiency bitmap function, which saves space and improves query speed.

Figure 3: ClickHouse integrated Bitmap

A complete set of calculation functions is implemented around the Bitmap object. There are two ways to construct a Bitmap object, one is to construct it from the aggregate function groupBitmap, the other is to construct it from an Array object, or you can convert a Bitmap object to an Array object. ClickHouse’s Array type has a large set of functions, which makes it easier to process data. The middle part of the picture above is the calculation function set of Bitmap, which includes bit operation functions, aggregation operation functions, and evaluation operation functions, which are relatively rich.

3. New architecture based on ClickHouse

3.1 Architecture Introduction

The architecture diagram is as follows:

Figure 4: Tag architecture

ClickHouse Manager is our self-developed ClickHouse management platform, which is responsible for cluster management, metadata management and node load coordination.

The Spark task is responsible for the generation and import of tag data. When a business party’s task is completed, tag-generate will be called immediately to generate a tag data file, stored in HDFS, and then the SQL statement imported from HDFS to ClickHouse will be executed on ClickHouse. This completes the label production work. Label production runs concurrently. If the data of a certain business party is not ready, it will not affect the label production of other businesses.

The user portrait platform queries tag data from the ClickHouse cluster through Proxy. Before querying, the query expression needs to be converted into SQL. We have encapsulated this logic and provided a general conversion module called: to-ch-sql. The business layer can basically query ClickHouse without modification.

3.2 Basic structure of tag data table

Compared with ElasticSearch’s storage structure, we convert label storage into a row-to-column storage. Each tag corresponds to a Bitmap object. The userid collection is stored in the Bitmap object:

CREATE TABLE ch_label_string
(
 labelname String, --label name
 labelvalue String, --label value
 uv AggregateFunction( groupBitmap, UInt64 ) --userid collection
)
ENGINE = AggregatingMergeTree()
PARTITION BY labelname
ORDER BY (labelname, labelvalue)
SETTINGS index_granularity = 128;

The uv field is a Bitmap type field that stores integer userids. Each userid is represented by a bit.

The default value of the primary key index (index_granularity) is 8192, which can be changed to 128 or other values. Since the storage space occupied by Bitmap is relatively large, it should be changed to a decimal value to reduce the read amplification problem of sparse indexes.

According to the data type of the tag value, it is divided into four types of tables:

  • String
  • Integer
  • Double
  • Date

Tag name as Partition. Through this design, it is more convenient to add or delete tag data. You only need to modify the Partition data. Partition management has corresponding SQL statements, which is more convenient to operate.

3.3 Userid sharded storage

When importing tag data, it is imported in slices according to userid, and each machine only stores tag data corresponding to userid.

Each machine imports the fragmented label data separately, realizing parallel import of data. In our environment, the single-machine import performance is about 1.5 million items/second.

When filtering people based on tags, SQL only needs to be executed on a single shard, and intermediate results do not need to be returned to the query node.

The advantage is particularly obvious when performing “estimated number of people” calculations: each shard only needs to return the number of people who meet the conditions, perform a sum operation on the query node, and then return the sum result to the client. Fully exploit the distributed parallel computing capabilities of ClickHouse.

3.4 Query process

Use the with statement to calculate the Bitmap object of the crowd packet, and then use the Bitmap function to calculate the intersection and difference.

When there are many tags that need to be calculated, the tag query SQL is more complex. The tag query SQL is packaged into the SQL of the distributed proxy table. The distributed proxy table itself does not store data. The proxy table identifies which nodes to query, and the distributed proxy table identifies which nodes to query. Execute label query SQL on the node identified by the proxy table.

The query results are then summarized on distributed proxy tables. Through the characteristics of ClickHouse distributed tables, the colocate mechanism of tag query is implemented.

Figure 5: Query process

Sample SQL is as follows:

-- Local query agent
CREATE TABLE ch_agent_user
(
    agentname String
)
ENGINE = MergeTree()
PARTITION BY agentname
ORDER BY (agentname)
SETTINGS index_granularity = 8192;

-- Distributed proxy table
CREATE TABLE ch_agent_dist_user AS ch_agent_user
ENGINE = Distributed('cluster_test', 'test', 'ch_agent_user', cityHash64(agentname))

-- Query the number of users
SELECT sum(user_number) AS user_number
FROM ch_agent_dist_user
RIGHT JOIN
(
    WITH
        (
            SELECT groupBitmapState(userid) AS users0
            FROM ch_label_string
            WHERE labelname = 'T'
        ) AS users0
    SELECT
        'agent' AS agentname,
        bitmapCardinality(users0) AS user_number
) USING (agentname) settings enable_scalar_subquery_optimization = 0;

The ch_agent_user table itself does not store data. When performing a right join query with the with statement, since it is a right join query, the query result is based on the result set of the with statement.

The query results of each node are returned to the query node, and the query node performs summary calculations. The parameter enable_scalar_subquery_optimization = 0 means that the query results of the with statement are not optimized and each node needs to be executed.

By default, the result of the with statement in ClickHouse is cached as a scalar, and the scalar of the query node will be distributed to other servers. When the scalar is found to already exist, the with statement will not be executed on the local node. We expect the with statement to be executed on every node, so set this parameter to 0.

3.5 User portrait

User portraits have relatively high performance requirements, and the average query response time cannot be greater than 5 seconds. Users can circle any group of people on the interface, and then perform portrait analysis on the selected groups of people in real time.

User portrait technology has undergone three architecture reconstructions:

①V1: Large wide table mode

The earliest solution is to create a portrait table with userid as the primary key, and the other fields in the table are the characteristic fields of the portrait, perform an in operation on the selected people and the portrait table, and then perform a group by operation.

This design brings two serious problems:

  • When adding or deleting feature fields, the table structure of the portrait table needs to be modified.
  • When the number of people to be circled is relatively large, it involves the group by operation of a large record set, and the performance is poor.

②V2: Bitmap mode

Store the userid collection under a characteristic value as a Bitmap object into a record. The content of a record is as follows:

The Bitmap object of the crowd selected by the user performs an AND operation with the Bitmap object of the portrait table to return the portrait information of the selected crowd.

Through this design, the performance requirements are basically met, and the average time is less than 5 seconds. However, for some large crowds, the performance of the portrait is still poor, around 10 seconds.

The amount of recorded data in the portrait table is not large, but the Bitmap fields of the portrait table need to be deserialized from the disk during calculation. Some Bitmap objects occupy hundreds of megabytes of space, resulting in performance degradation.

③V3: Join table engine mode

ClickHouse’s Join table engine can keep data resident in memory. When inserting data, the data is first written into the memory and then flushed to the disk file. When the system restarts, the data is automatically loaded back into the memory. The Join table engine can be said to be a memory-resident table with persistence function.

We save the data of the portrait table to the Join table engine, and the Bitmap field of the portrait table is resident in the memory. When the selected group of people’s Bitmap objects perform an AND operation, the difference between the two Bitmap objects that have been loaded in the memory is The calculation is very fast.

Through this optimization, the average query time is optimized to 1 to 2 seconds, and the analysis of tens of millions of crowd portraits does not exceed 5 seconds.

4. Summary

Through the integration of Bitmap function in ClickHouse and the application of Join table engine, a series of optimizations on the architecture have greatly improved the data analysis capabilities of the labeling platform.

The new architecture mainly has the following advantages:

  • Tag data can be constructed in parallel to speed up tag data production.
  • HDFS files are imported into ClickHouse concurrently to speed up the readiness of tag data.
  • The average response time for query requests is less than 2 seconds, and for complex queries, it is less than 5 seconds.
  • Supports quasi-real-time update of tag data.
  • Label expressions and query SQL are more user-friendly and improve the maintainability of the system.
  • Compared with the configuration of ElasticSearch, half of the hardware resources can be saved.

future plan:

  • Currently, ClickHouse uses the 32-bit version of RoaringBitmap and plans to add a 64-bit version.
  • ClickHouse queries have lower concurrency and add a more intelligent Cache layer.
  • Supports offline generation of ClickHouse data files to further indicate the readiness speed of labels.

refer to:
ClickHouse official website: https://clickhouse.tech/
ClickHouse Chinese community: http://www.clickhouse.com.cn/
Bitmap PR: https://github.com/ClickHouse/ClickHouse/pull/4207

Original article: How can Suning’s over 600 million members achieve user portrait query in seconds?