kafka practice-hot data display

1 Real-time streaming computing

1.1 Concept

Streaming computing generally has high real-time requirements. At the same time, the target calculation is generally defined first, and then the calculation logic is applied to the data after the data arrives. At the same time, in order to improve calculation efficiency, incremental calculation is often used instead of full calculation whenever possible. That is to say, the data is first gathered together and processed in full.

2.2 Application scenarios

  • Log analysis
  • Big screen billboard statistics
  • Real-time bus data
  • Real-time article score calculation

2 Kafka Stream

2.1 Overview

Kafka Stream is a new Feature introduced by Apache Kafka from version 0.10. It provides the function of stream processing and analysis of data stored in Kafka.

The characteristics of Kafka Stream are as follows:

  • Kafka Stream provides a very simple and lightweight Library, which can be easily embedded in any Java application and can be packaged and deployed in any way.
  • No external dependencies except Kafka

3 Practice-Calculation of hot articles on the app

3.1 Introducing dependencies

1) Introduced in the pom file of the previous kafka-demo project

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <exclusions>
        <exclusion>
            <artifactId>connect-json</artifactId>
            <groupId>org.apache.kafka</groupId>
        </exclusion>
        <exclusion>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </exclusion>
    </exclusions>
</dependency>

2) Add kafka configuration to the nacos configuration of the sender’s microservice

```yaml
spring:
  application:
    name: leadnews-behavior
  kafka:
    bootstrap-servers: 192.168.200.130:9092
    producer:
      retries: 10
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

Mainly include the serializer of the sent key and value and the number of retries of the service address.

3.2 Like behavior is sent to kafka stream processing


1) Call kafkaTemplate.send to send data

 //Send messages, data aggregation
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));


//Viewed messages, set type to viewsw, data aggregation
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(dto.getArticleId());
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

2) Define the message sending encapsulation class: UpdateArticleMess

package com.heima.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess {<!-- -->

    /**
     * Modify the field type of the article
      */
    private UpdateArticleType type;
    /**
     * Article ID
     */
    private Long articleId;
    /**
     * The increment of modified data can be positive or negative
     */
    private Integer add;

    public enum UpdateArticleType{<!-- -->
        COLLECTION,COMMENT,LIKES,VIEWS;
    }
}

The enum here means iteration, which is related to the subsequent aggregation operation condition judgment.

3.3 Use kafkaStream to receive messages in real time and aggregate content

1) Define entity classes for score encapsulation after aggregation

package com.heima.model.article.mess;

import lombok.Data;

@Data
public class ArticleVisitStreamMess {<!-- -->
    /**
     * Article id
     */
    private Long articleId;
    /**
     * read
     */
    private int view;
    /**
     * collect
     */
    private int collect;
    /**
     * Comment
     */
    private int comment;
    /**
     * like
     */
    private int like;
}

2) Modify the constant class: add constants

package com.heima.common.constans;

public class HotArticleConstants {<!-- -->

    public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic";
    public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic";
}

The meaning of the two constants is that the hotspot cache data of the current item and the default item in the redis cache need to be modified.
3) Define stream, receive messages and aggregate them

package com.heima.article.stream;

import com.alibaba.fastjson.JSON;
import com.heima.common.constants.HotArticleConstants;
import com.heima.model.mess.ArticleVisitStreamMess;
import com.heima.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler {<!-- -->

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder){<!-- -->
        //receive message
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //Aggregation streaming
        stream.map((key,value)->{<!-- -->
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //Reset message key:1234343434 and value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name() + ":" + mess.getAdd());
        })
                //Aggregation based on article id
                .groupBy((key,value)->key)
                //time window
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /**
                 * Complete the calculation of aggregation by yourself
                 */
                .aggregate(new Initializer<String>() {<!-- -->
                    /**
                     * Initial method, the return value is the value of the message
                     * @return
                     */
                    @Override
                    public String apply() {<!-- -->
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    }
                    /**
                     * A real aggregation operation, the return value is the value of the message
                     */
                }, new Aggregator<String, String, String>() {<!-- -->
                    @Override
                    public String apply(String key, String value, String aggValue) {<!-- -->
                        if(StringUtils.isBlank(value)){<!-- -->
                            return aggValue;
                        }
                        String[] aggAry = aggValue.split(",");
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) {<!-- -->
                            String[] split = agg.split(":");
                            /**
                             * Get the initial value, which is also the value after calculation within the time window
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){<!-- -->
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            }
                        }
                        /**
                         * Accumulation operation
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){<!-- -->
                            case COLLECTION:
                                col + = Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com + = Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik + = Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie + = Integer.parseInt(valAry[1]);
                                break;
                        }

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        System.out.println("Article id:" + key);
                        System.out.println("Message processing results within the current time window:" + formatStr);
                        return formatStr;
                    }
                }, Materialized.as("hot-atricle-stream-count-001"))
                .toStream()
                .map((key,value)->{<!-- -->
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                })
                //Send a message
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    }

    /**
     * Format the value data of the message
     * @param articleId
     * @param value
     * @return
     */
    public String formatObj(String articleId,String value){<!-- -->
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) {<!-- -->
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){<!-- -->
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            }
        }
        log.info("The result after aggregated message processing is: {}",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);

    }
}

The conversion rules are mainly defined through new Aggregator(){two apply methods}. Here, the values of the same operation are added together. The output JSON is passed to the consumer by adding the article id to the json string through ArticleVisitStreamMess (including the article id field).

3.4 The consumer monitors messages and completes modifications to the cache

@Component
@Slf4j
public class ArticleIncrHandleListener {<!-- -->

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess){<!-- -->
        if(StringUtils.isNotBlank(mess)){<!-- -->
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);

        }
    }
}

2) updateScore updates the score and updates mysql and redis
update mysql

 private ApArticle updateArticle(ArticleVisitStreamMess mess) {
        ApArticle apArticle = getById(mess.getArticleId());
        apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection() + mess.getCollect());
        apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment() + mess.getComment());
        apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes() + mess.getLike());
        apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews() + mess.getView());
        updateById(apArticle);
        return apArticle;

Update the current page hotspot data and default page data in redis

```java
/**
     * Update the score of the article and update the hot article data in the cache
     * @param mess
     */
@Override
public void updateScore(ArticleVisitStreamMess mess) {
    //1. Update the number of reads, likes, collections, and comments on the article
    ApArticle apArticle = updateArticle(mess);
    //2. Calculate the score of the article
    Integer score = computeScore(apArticle);
    score = score * 3;

    //3. Replace the hot data of the channel corresponding to the current article
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

    //4. Replace the hot data corresponding to the recommendation
    replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG);

}

/**
     * Replace data and store it in redis
     * @param apArticle
     * @param score
     * @param s
     */
private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) {
    String articleListStr = cacheService.get(s);
    if (StringUtils.isNotBlank(articleListStr)) {
        List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class);

        boolean flag = true;

        //If the article exists in the cache, only update the score
        for (HotArticleVo hotArticleVo : hotArticleVoList) {
            if (hotArticleVo.getId().equals(apArticle.getId())) {
                hotArticleVo.setScore(score);
                flag = false;
                break;
            }
        }

        //If it does not exist in the cache, query the piece of data with the smallest score in the cache and compare the scores. If the score of the current article is greater than the data in the cache, replace it.
        if (flag) {
            if (hotArticleVoList.size() >= 30) {
                hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1);
                if (lastHot.getScore() < score) {
                    hotArticleVoList.remove(lastHot);
                    HotArticleVo hot = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle, hot);
                    hot.setScore(score);
                    hotArticleVoList.add(hot);
                }


            } else {
                HotArticleVo hot = new HotArticleVo();
                BeanUtils.copyProperties(apArticle, hot);
                hot.setScore(score);
                hotArticleVoList.add(hot);
            }
        }
        //Cache to redis
        hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
        cacheService.set(s, JSON.toJSONString(hotArticleVoList));

    }
}