How to implement deduplication in Java? Is this showing off your skills?

Hello everyone, I am 3y. Since the deduplication logic has been reconstructed several times, many shareholders said they couldn’t understand it, so I will arrange another wave of code analysis today. austin supports two types of deduplication: the same content N times in N minutes and the same channel frequency N times in a day deduplication.

Java open source project message push platform Push to send [email] [SMS] [WeChat service account] [WeChat applet] [Enterprise WeChat] [DingTalk] and other message types.

  • gitee.com/zhongfuchen…[1]

  • github.com/ZhongFuChen…[2]

At the very beginning, my first version of the implementation looked like this:

public void duplication(TaskInfo taskInfo) {
    
    JSONObject property = JSON.parseObject(config.getProperty(DEDUPLICATION_RULE_KEY, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_OBJECT));
    JSONObject contentDeduplication = property.getJSONObject(CONTENT_DEDUPLICATION);
    JSONObject frequencyDeduplication = property.getJSONObject(FREQUENCY_DEDUPLICATION);
?
    
    DeduplicationParam contentParams = DeduplicationParam.builder()
        .deduplicationTime(contentDeduplication.getLong(TIME))
        .countNum(contentDeduplication.getInteger(NUM)).taskInfo(taskInfo)
        .anchorState(AnchorState.CONTENT_DEDUPLICATION)
        .build();
    contentDeduplicationService.deduplication(contentParams);
?
?
    
    Long seconds = (DateUtil.endOfDay(new Date()).getTime() - DateUtil.current()) / 1000;
    DeduplicationParam businessParams = DeduplicationParam.builder()
        .deduplicationTime(seconds)
        .countNum(frequencyDeduplication.getInteger(NUM)).taskInfo(taskInfo)
        .anchorState(AnchorState.RULE_DEDUPLICATION)
        .build();
    frequencyDeduplicationService.deduplication(businessParams);
}

It was very simple at that time. The basic main logic was written on this entrance, and everyone should be able to understand it. Later, Brother Didi in the group said that this kind of code was not good and you couldn’t tell what it did at a glance. So I raised a wave of pull requests and reconstructed the version. The entrance is as follows:

public void duplication(TaskInfo taskInfo) {
    
    
    String deduplication = config.getProperty(DeduplicationConstants.DEDUPLICATION_RULE_KEY, AustinConstant.APOLLO_DEFAULT_VALUE_JSON_OBJECT);
    
    
    DEDUPLICATION_LIST.forEach(
        key -> {
            DeduplicationParam deduplicationParam = builderFactory.select(key).build(deduplication, key);
            if (deduplicationParam != null) {
                deduplicationParam.setTaskInfo(taskInfo);
                DeduplicationService deduplicationService = findService(key + SERVICE);
                deduplicationService.deduplication(deduplicationParam);
            }
        }
    );
}

I guess his idea is to encapsulate Construct deduplication parameters and Select specific deduplication service, so that the outermost code looks very concise. Later, I chatted with him again, and his design idea was as follows: Considering that there would be other rules for deduplication in the future, I encapsulated the deduplication logic separately, and then reconstructed it using the design pattern of the strategy template. , the refactored code template remains unchanged, supports various deduplication strategies, is more scalable, stronger and simpler

Really awesome.

I slightly changed the entry based on the above ideas, and the code finally evolved into this:

public void duplication(TaskInfo taskInfo) {
    
    String deduplicationConfig = config.getProperty(DEDUPLICATION_RULE_KEY, CommonConstant.EMPTY_JSON_OBJECT);
?
    
    List<Integer> deduplicationList = DeduplicationType.getDeduplicationList();
    for (Integer deduplicationType : deduplicationList) {
        DeduplicationParam deduplicationParam = deduplicationHolder.selectBuilder(deduplicationType).build(deduplicationConfig, taskInfo);
        if (Objects.nonNull(deduplicationParam)) {
            deduplicationHolder.selectService(deduplicationType).deduplication(deduplicationParam);
        }
    }
}

At this point, most people should be able to keep up, right? Before talking about the specific code, let’s take a brief look at the code structure of the duplication function (this will be helpful when looking at the code later)

The logic of deduplication can be unified abstract as: Y threshold is reached within X time period. Remember what I once said: The essence of “duplication removal”: “Business Key” + “Storage”. Then the steps to implement deduplication can be simply divided into (I use Redis for storage here):

  • Get records from Redis by Key

  • Determine whether the record of the Key in Redis meets the conditions

  • If the conditions are met, the duplicates will be removed, and if the conditions are not met, the Redis update record will be re-entered.

In order to facilitate the adjustment of deduplication parameters, I put both X time period and Y threshold into the configuration {"deduplication_10":{"num":1, "time":300},"deduplication_20":{"num":5}}. There are currently two specific implementations of duplication removal:

1. If the same user receives the same content within 5 minutes, it should be filtered out.

2. If the same user has received content from a certain channel 5 times in a day, it should be filtered out.

After getting the configuration information from the configuration center, Builder builds DeduplicationParam based on these two types, which is the following code:

DeduplicationParam deduplicationParam = deduplicationHolder.selectBuilder(deduplicationType).build(deduplicationConfig, taskInfo);

Builder and DeduplicationService both use similar writing methods (Specify the type when the subclass is initialized, receive it uniformly in the parent class, and put it in the Map for management)

There is a central place for unified management of these services. I named this DeduplicationHolder

 * @author huskey
 * @date 2022/1/18
 */
@Service
public class DeduplicationHolder {
?
    private final Map<Integer, Builder> builderHolder = new HashMap<>(4);
    private final Map<Integer, DeduplicationService> serviceHolder = new HashMap<>(4);
?
    public Builder selectBuilder(Integer key) {
        return builderHolder.get(key);
    }
?
    public DeduplicationService selectService(Integer key) {
        return serviceHolder.get(key);
    }
?
    public void putBuilder(Integer key, Builder builder) {
        builderHolder.put(key, builder);
    }
?
    public void putService(Integer key, DeduplicationService service) {
        serviceHolder.put(key, service);
    }
}

The business Key mentioned earlier is built under a subclass of AbstractDeduplicationService:

The specific deduplication logic implementation is under LimitService. {If the same user has received content from a certain channel 5 times in a day} is processed in SimpleLimitService. code>mget and pipelineSetEX complete the implementation. {If the same user receives the same content within 5 minutes} is processed in SlideWindowLimitService, and the lua script is used to complete the implementation.

The code of LimitService comes from @caolongxiu[3]’s pull request. It is recommended that you compare it with commit and learn more strong>:gitee.com/zhongfuchen…[4]

1. Frequency deduplication uses the ordinary counting and deduplication method, which limits the number of items sent per day. 2. Content deduplication uses the newly developed sliding window deduplication based on zset in redis, which can strictly control the frequency per unit time strong>. 3. redis uses lua scripts to ensure atomicity and reduce network io losses 4. redis‘s key adds a prefix to achieve data isolation (there may be a need to dynamically change the deduplication method later) 5. Extract the specific current limiting deduplication method from DeduplicationService, DeduplicationServiceYou only need to set the type of AbstractLimitService (specific current limiting deduplication service) injected during constructor injection to dynamically change the deduplication method. 6. Use the snowflake algorithm to generate zsetThe only value, score uses the current timestamp

For sliding window deduplication, new questions may arise:What is the logic of limit.lua? Why should we remove the previous data of the time window? Why must the ARGV[4] parameter be unique? Why expire?

A: Using sliding window can ensure N times of deduplication in N minutes. The sliding window can review TCP, and you can also review some questions when brushing LeetCode. So why should it be removed?

Why ARGV[4] must be unique? For details, you can look at the zadd command. We only need to ensure that each time add enters the member in the window If it is unique, then no update operations will be triggered (I think this design will be simpler), and it is more convenient to use the snowflake algorithm for the unique Key.

Why expire? , if this key is called only once. Then it is very likely that the redis memory will be resident. expire can avoid this situation.

If you want to learn Java projects, highly recommend My Project the message push platform Austin (8K stars), which can be used asgraduation project , can be used for school recruitment and you can see how the production environment pushes messages. Message push platform Push and send message types such as [email] [SMS] [WeChat service account] [WeChat applet] [Enterprise WeChat] [DingTalk] and so on.

  • gitee.com/zhongfuchen…[5]

  • github.com/ZhongFuChen…[6]

Reference materials

[1]

https://gitee.com/zhongfucheng/austin/: https://link.juejin.cn/?target=https://gitee.com/zhongfucheng/austin/

[2]

https://github.com/ZhongFuCheng3y/austin: https://link.juejin.cn/?target=https://github.com/ZhongFuCheng3y/austin

[3]

https://gitee.com/caolongxiu: https://link.juejin.cn/?target=https://gitee.com/caolongxiu

[4]

https://gitee.com/zhongfucheng/austin/pulls/19: https://link.juejin.cn/?target=https://gitee.com/zhongfucheng/austin/pulls/19

[5]

https://gitee.com/zhongfucheng/austin/: https://link.juejin.cn/?target=https://gitee.com/zhongfucheng/austin/

[6]

https://github.com/ZhongFuCheng3y/austin: https://link.juejin.cn/?target=https://github.com/ZhongFuCheng3y/austin

syntaxbug.com © 2021 All Rights Reserved.