Real-time comprehensive case based on Flume+Kafka+Hbase+Flink+FineBI (4) Real-time computing requirements and technical solutions

Article directory

    • 16: Real-time computing requirements and technical solutions
    • 17: Basic introduction to Flink
    • 18: Code module construction
    • 19: Province parsing tool test
    • 20: Flink code interpretation
    • 21: Flink real-time computing test

16: Real-time computing requirements and technical solutions

  • Goal: Understand real-time computing requirements and technical solutions

  • Path

    • step1: real-time calculation requirements
    • step2: technical solution
  • Implementation

    • Real-time computing requirements

      • Real-time statistics of total message volume

        select count(*) from tbname;
        
      • Real-time statistics on the total volume of messages sent in each region

        select sender_area,count(*) from tbname group by sender_area;
        
      • Real-time statistics of the total number of messages received in each region

        select receiver_area,count(*) from tbname group by receiver_area;
        
      • Real-time statistics of the total number of messages sent by each user

        select sender_account,count(*) from tbname group by sender_account;
        
      • Real-time statistics of the total number of messages received by each user

        select receiver_account,count(*) from tbname group by receiver_account;
        
      • |

      • Build real-time statistical reports

    • Technical solutions

      • Real-time collection: Flume
      • Real-time storage: Kafka
      • Real-time computing: Flink
      • Real-time results: MySQL/Redis
      • Real-time reporting: FineBI/JavaWeb visualization

      image-20210905221839002

  • Summary

    • Understand real-time computing requirements and technology selection

17: Basic introduction to Flink

  • Goal: Understand the functions, features and application scenarios of Flink

  • Path

    • step1: function
    • step2: Features
    • step3: Application
  • Implementation

    image-20210905230122726

    Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
    
    • Function: It can implement high-performance stateful distributed real-time computing for bounded or unbounded data flows based on any common cluster platform.

      dc0b14b214ae4ec4a117ec7b7d15c894.jpeg

      • Flink DataSet: Batch operations on bounded data
      • Flink DataStream: perform real-time processing operations on unbounded data
      • Flink Table: Implementing structured data processing based on DSL
      • Flink SQL: Implementing structured data processing based on SQL
      • Flink Gelly: Flink’s graph computing library
      • Flink ML: Flink’s machine learning library
    • Features

      • Support high throughput, low latency, and high performance stream processing
      • Supports window operations with event time
      • Supports exactly-once semantics for stateful computing
      • Support highly flexible window (Window) operations, support window operations based on time, count, session, and data-driven
      • Supports continuous flow model with Backpressure function
      • Supports fault tolerance based on lightweight distributed snapshots (Snapshot)
      • One runtime supports both Batch on Streaming processing and Streaming processing
      • Flink implements its own memory management inside the JVM
      • Supports iterative calculations
      • Support automatic program optimization: avoid expensive operations such as Shuffle and sorting under certain circumstances, and intermediate results must be cached
    • Application: All real-time and offline data computing scenarios

      • Data analysis applications: real-time data warehouse
        • Batch analytics
        • Streaming analytics
      • Event-driven applications
        • Fraud detection
        • Anomaly detection
        • Rule-based alerting
        • Business process monitoring
        • Web Applications (Social Networks)
      • Data pipeline ETL
        • Periodic ETL and Data Pipeline
      • company
        • Ali: https://developer.aliyun.com/article/72242
        • Tencent: https://mp.weixin.qq.com/s/tyq6ZdwsgiuXYGi-VR_6KA
        • Meituan: https://tech.meituan.com/2021/08/26/data-warehouse-in-meituan-waimai.html
        • Youzan: https://tech.youzan.com/flink-practice/
        • Oppo, iQiyi, Vipshop…
  • Summary

    • Understand the functions, features and application scenarios of Flink

18: Code module construction

  • Goal: Achieve the construction of code modules in the development environment

  • Implementation

    • Import the code in the package into IDEA

      image-20210905233102447

    • flink package: Application class package, used to store actual application classes

      • MoMoFlinkCount: used to implement statistical calculations for each requirement

      • MySQLSink: used to write calculated results into MySQL

    • pojo package: Entity class package, used to store all entity classes

      • MoMoCountBean: used to encapsulate the results of statistical analysis

        private Integer id; //result id
        private Long moMoTotalCount; //Total number of messages
        private String moMoProvince; //Province
        private String moMoUsername; //User
        private Long moMo_MsgCount; //number of messages
        //Result type: 1-Total number of messages 2-Number of messages sent by each province 3-Number of messages received by each province 4-Number of messages sent by each user 5-Number of messages received by each user
        private String groupType;
        
    • utils package: tool package, used to store all tool classes

      • HttpClientUtils: Tool class used to resolve longitude and latitude addresses into provinces

        public static String findByLatAndLng(String lat, String lng)
        
        • Parameters: longitude, latitude
        • Return value: province
  • Summary

    • Implement code module construction for development environment

19: Province parsing tool test

  • Goal: Understand the implementation of province parsing

  • Path

    • step1: basic design
    • step2: Register as a Baidu developer
    • step3: Test province analysis
  • Implementation

    • Basic design

      • Business scenario: Obtain the user’s country, province, and city information based on IP or latitude and longitude analysis
      • Option 1: Offline parsing library [local parsing, fast, inaccurate]
      • Option 2: Online parsing library [remote parsing, concurrency limit, accurate]
  • Register Baidu Developer

    • Baidu map open platform: https://lbsyun.baidu.com/

    • Reverse geocoding: https://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding-abroad

       https://api.map.baidu.com/reverse_geocoding/v3/?ak=yourak & amp;output=json & amp;coordtype=wgs84ll & amp;location=31.225696563611,121.49884033194 / /GET request
      
    • Register on the open platform and obtain the AK code: refer to “Appendix Three”

  • Test province analysis

    • Note: Change the AK code in the code to your own
     package bigdata.itcast.cn.momo.online.utils;
      
      import com.alibaba.fastjson.JSONObject;
      import org.apache.http.HttpEntity;
      import org.apache.http.client.methods.CloseableHttpResponse;
      import org.apache.http.client.methods.HttpGet;
      import org.apache.http.impl.client.CloseableHttpClient;
      import org.apache.http.impl.client.HttpClients;
      import org.apache.http.util.EntityUtils;
      
      import javax.swing.text.html.parser.Entity;
      import java.io.IOException;
      import java.util.Map;
      
      public class HttpClientUtils {<!-- -->
      
          //Pass in the latitude and longitude and return the queried area
          public static String findByLatAndLng(String lat , String lng){<!-- -->
              try {<!-- -->
                  CloseableHttpClient httpClient = HttpClients.createDefault();
                  String url = "http://api.map.baidu.com/reverse_geocoding/v3/?ak=l8hKKRCuX2zrRa93jneDrPmc2UspGatO & amp;output=json & amp;coordtype=wgs84ll & amp;location=" + lat + ", " + lng;
                  System.out.println(url);
                  //Request parsing
                  HttpGet httpGet = new HttpGet(url);
                  //got the answer
                  CloseableHttpResponse response = httpClient.execute(httpGet);
                  //retrieve data
                  HttpEntity httpEntity = response.getEntity();
                  //Convert to JSON
                  String json = EntityUtils.toString(httpEntity);
                  //Return provinces from JSON
                  Map<String,Object> result = JSONObject.parseObject(json, Map.class);
                  if(result.get("status").equals(0)){<!-- -->
                      Map<String,Object> resultMap = (Map<String,Object>)result.get("result");
                      resultMap = (Map<String, Object>) resultMap.get("addressComponent");
                      String province = (String) resultMap.get("province");
                      return province;
                  }
      
              } catch (IOException e) {<!-- -->
                  e.printStackTrace();
              }
              return null;
          }
      
          public static void main(String[] args) {<!-- -->
              //test
              String sf = findByLatAndLng("43.921297","124.655376");
              System.out.println(sf);
      
          }
      
      }
      
    

    image-20210906001048198

  • Summary

    • Understand the implementation of province parsing

20: Flink code interpretation

  • Goal: Understand the basic implementation of Flink code

  • Path

    • step1: Consume Kafka
    • step2: real-time statistical analysis
    • step3: Update the results to MySQL in real time
  • Implementation

    • Consume Kafka

      //Build Kafka configuration
      Properties props = new Properties();
      props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
      props.setProperty("group.id", "momo2");
      //Build consumer
      FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new SimpleStringSchema(),props);
      //Flink loads consumers
      DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
      
    • Real-time statistical analysis

      //todo:3. Perform conversion statistics operations:
      //3.1: Statistics of total message volume
      countTotalMsg(streamSource);
      //3.2: Based on latitude and longitude, count the number of messages sent by each province
      countProvinceSenderMsg(streamSource);
      //3.3: Based on latitude and longitude, count the number of messages received by each province
      countProvinceReceiverMsg(streamSource);
      //3.4: Statistics of each user, volume of messages sent
      countUserNameSenderMsg(streamSource);
      //3.5: Statistics of each user, volume of messages received
      countUserNameReceiverMsg(streamSource);
      //5. Perform flink operation
      env.execute("momoFlinkCount");
      
    • Update results to MySQL in real time

      streamOperator.addSink(new MysqlSink("2"));
      
      if (status.equals("2")){<!-- -->
          String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '" + value.getMoMoProvince() + "' ";
          ResultSet resultSet = stat.executeQuery(sql);
          boolean flag = resultSet.next();
          if(flag) {<!-- -->
              sql = "update momo_count set momo_msgcount= '" + value.getMoMo_MsgCount() + "' where momo_groupType = '2' and momo_province= '" + value.getMoMoProvince() + "' ";
          }else {<!-- -->
              sql = "insert into momo_count(momo_province,momo_msgcount,momo_groupType) values ('" + value.getMoMoProvince() + "'," + value.getMoMo_MsgCount() + ",'2') ";
          }
          stat.executeUpdate(sql);
      }
      
  • Summary

    • Understand the basic implementation of Flink code

21: Flink real-time computing test

  • Goal: Implement Flink real-time analysis test

  • Path

    • step1: MySQL preparation
    • step2: run the test
  • Implementation

    • MySQL preparation

      • Find SQL file

        image-20210906002152421

      • Run SQL files to create result databases and tables

        image-20210906002122983

    • Run the test

      • Start the Flink program: run MoMoFlinkCount

      • Start the Flume program

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
        
      • Start simulation data

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        100
        
      • Observe MySQL results

        image-20210929153954647

  • Summary

    • Implement Flink real-time analysis and testing

AI Side Business Practical Manual: http://www.yibencezi.com/notes/253200?affiliate_id=1317 (Currently 40 + tools and practical cases, continuously updated, practical booklet ranks first, you can’t make money after three months of doing it Find me a refund and make a friend’s product)