1 Real-time streaming computing
1.1 Concept
Streaming computing generally has high real-time requirements. At the same time, the target calculation is generally defined first, and then the calculation logic is applied to the data after the data arrives. At the same time, in order to improve calculation efficiency, incremental calculation is often used instead of full calculation whenever possible. That is to say, the data is first gathered together and processed in full.
2.2 Application scenarios
- Log analysis
- Big screen billboard statistics
- Real-time bus data
- Real-time article score calculation
2 Kafka Stream
2.1 Overview
Kafka Stream is a new Feature introduced by Apache Kafka from version 0.10. It provides the function of stream processing and analysis of data stored in Kafka.
The characteristics of Kafka Stream are as follows:
- Kafka Stream provides a very simple and lightweight Library, which can be easily embedded in any Java application and can be packaged and deployed in any way.
- No external dependencies except Kafka
3 Practice-Calculation of hot articles on the app
3.1 Introducing dependencies
1) Introduced in the pom file of the previous kafka-demo project
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <exclusions> <exclusion> <artifactId>connect-json</artifactId> <groupId>org.apache.kafka</groupId> </exclusion> <exclusion> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> </exclusion> </exclusions> </dependency>
2) Add kafka configuration to the nacos configuration of the sender’s microservice
```yaml spring: application: name: leadnews-behavior kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
Mainly include the serializer of the sent key and value and the number of retries of the service address.
3.2 Like behavior is sent to kafka stream processing
1) Call kafkaTemplate.send to send data
//Send messages, data aggregation kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess)); //Viewed messages, set type to viewsw, data aggregation UpdateArticleMess mess = new UpdateArticleMess(); mess.setArticleId(dto.getArticleId()); mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS); mess.setAdd(1); kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));
2) Define the message sending encapsulation class: UpdateArticleMess
package com.heima.model.mess; import lombok.Data; @Data public class UpdateArticleMess {<!-- --> /** * Modify the field type of the article */ private UpdateArticleType type; /** * Article ID */ private Long articleId; /** * The increment of modified data can be positive or negative */ private Integer add; public enum UpdateArticleType{<!-- --> COLLECTION,COMMENT,LIKES,VIEWS; } }
The enum here means iteration, which is related to the subsequent aggregation operation condition judgment.
3.3 Use kafkaStream to receive messages in real time and aggregate content
1) Define entity classes for score encapsulation after aggregation
package com.heima.model.article.mess; import lombok.Data; @Data public class ArticleVisitStreamMess {<!-- --> /** * Article id */ private Long articleId; /** * read */ private int view; /** * collect */ private int collect; /** * Comment */ private int comment; /** * like */ private int like; }
2) Modify the constant class: add constants
package com.heima.common.constans; public class HotArticleConstants {<!-- --> public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic"; public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic"; }
The meaning of the two constants is that the hotspot cache data of the current item and the default item in the redis cache need to be modified.
3) Define stream, receive messages and aggregate them
package com.heima.article.stream; import com.alibaba.fastjson.JSON; import com.heima.common.constants.HotArticleConstants; import com.heima.model.mess.ArticleVisitStreamMess; import com.heima.model.mess.UpdateArticleMess; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Duration; @Configuration @Slf4j public class HotArticleStreamHandler {<!-- --> @Bean public KStream<String,String> kStream(StreamsBuilder streamsBuilder){<!-- --> //receive message KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC); //Aggregation streaming stream.map((key,value)->{<!-- --> UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class); //Reset message key:1234343434 and value: likes:1 return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name() + ":" + mess.getAdd()); }) //Aggregation based on article id .groupBy((key,value)->key) //time window .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) /** * Complete the calculation of aggregation by yourself */ .aggregate(new Initializer<String>() {<!-- --> /** * Initial method, the return value is the value of the message * @return */ @Override public String apply() {<!-- --> return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"; } /** * A real aggregation operation, the return value is the value of the message */ }, new Aggregator<String, String, String>() {<!-- --> @Override public String apply(String key, String value, String aggValue) {<!-- --> if(StringUtils.isBlank(value)){<!-- --> return aggValue; } String[] aggAry = aggValue.split(","); int col = 0,com=0,lik=0,vie=0; for (String agg : aggAry) {<!-- --> String[] split = agg.split(":"); /** * Get the initial value, which is also the value after calculation within the time window */ switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){<!-- --> case COLLECTION: col = Integer.parseInt(split[1]); break; case COMMENT: com = Integer.parseInt(split[1]); break; case LIKES: lik = Integer.parseInt(split[1]); break; case VIEWS: vie = Integer.parseInt(split[1]); break; } } /** * Accumulation operation */ String[] valAry = value.split(":"); switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){<!-- --> case COLLECTION: col + = Integer.parseInt(valAry[1]); break; case COMMENT: com + = Integer.parseInt(valAry[1]); break; case LIKES: lik + = Integer.parseInt(valAry[1]); break; case VIEWS: vie + = Integer.parseInt(valAry[1]); break; } String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie); System.out.println("Article id:" + key); System.out.println("Message processing results within the current time window:" + formatStr); return formatStr; } }, Materialized.as("hot-atricle-stream-count-001")) .toStream() .map((key,value)->{<!-- --> return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value)); }) //Send a message .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC); return stream; } /** * Format the value data of the message * @param articleId * @param value * @return */ public String formatObj(String articleId,String value){<!-- --> ArticleVisitStreamMess mess = new ArticleVisitStreamMess(); mess.setArticleId(Long.valueOf(articleId)); //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0 String[] valAry = value.split(","); for (String val : valAry) {<!-- --> String[] split = val.split(":"); switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){<!-- --> case COLLECTION: mess.setCollect(Integer.parseInt(split[1])); break; case COMMENT: mess.setComment(Integer.parseInt(split[1])); break; case LIKES: mess.setLike(Integer.parseInt(split[1])); break; case VIEWS: mess.setView(Integer.parseInt(split[1])); break; } } log.info("The result after aggregated message processing is: {}",JSON.toJSONString(mess)); return JSON.toJSONString(mess); } }
The conversion rules are mainly defined through new Aggregator
3.4 The consumer monitors messages and completes modifications to the cache
@Component @Slf4j public class ArticleIncrHandleListener {<!-- --> @Autowired private ApArticleService apArticleService; @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC) public void onMessage(String mess){<!-- --> if(StringUtils.isNotBlank(mess)){<!-- --> ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class); apArticleService.updateScore(articleVisitStreamMess); } } }
2) updateScore updates the score and updates mysql and redis
update mysql
private ApArticle updateArticle(ArticleVisitStreamMess mess) { ApArticle apArticle = getById(mess.getArticleId()); apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection() + mess.getCollect()); apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment() + mess.getComment()); apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes() + mess.getLike()); apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews() + mess.getView()); updateById(apArticle); return apArticle;
Update the current page hotspot data and default page data in redis
```java /** * Update the score of the article and update the hot article data in the cache * @param mess */ @Override public void updateScore(ArticleVisitStreamMess mess) { //1. Update the number of reads, likes, collections, and comments on the article ApArticle apArticle = updateArticle(mess); //2. Calculate the score of the article Integer score = computeScore(apArticle); score = score * 3; //3. Replace the hot data of the channel corresponding to the current article replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId()); //4. Replace the hot data corresponding to the recommendation replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG); } /** * Replace data and store it in redis * @param apArticle * @param score * @param s */ private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) { String articleListStr = cacheService.get(s); if (StringUtils.isNotBlank(articleListStr)) { List<HotArticleVo> hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class); boolean flag = true; //If the article exists in the cache, only update the score for (HotArticleVo hotArticleVo : hotArticleVoList) { if (hotArticleVo.getId().equals(apArticle.getId())) { hotArticleVo.setScore(score); flag = false; break; } } //If it does not exist in the cache, query the piece of data with the smallest score in the cache and compare the scores. If the score of the current article is greater than the data in the cache, replace it. if (flag) { if (hotArticleVoList.size() >= 30) { hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1); if (lastHot.getScore() < score) { hotArticleVoList.remove(lastHot); HotArticleVo hot = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hot); hot.setScore(score); hotArticleVoList.add(hot); } } else { HotArticleVo hot = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hot); hot.setScore(score); hotArticleVoList.add(hot); } } //Cache to redis hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); cacheService.set(s, JSON.toJSONString(hotArticleVoList)); } }