Real-time comprehensive case (2) data source based on Flume+Kafka+Hbase+Flink+FineBI

Article directory

    • 04: Data source
    • 05: Technical architecture and technology selection
    • 06: Flume review and installation
    • 07: Development of Flume collection program

04: Data source

  • Objective: Understand the format of data sources and generate simulation data

  • Path

    • step1: data format
    • step2: data generation
  • Implementation

    • Data format

      image-20210905200540304

      th>

      Message time Sender’s nickname Sender’s account number Sender’s gender Sender IP Sender System Sender Mobile Phone Model Sender Network Standard Sender GPS Recipient Nickname Recipient IP Recipient Account Recipient System Recipient’s mobile phone model Recipient’s network format Recipient’s GPS Recipient’s gender Message type Distance between both parties Message
      msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message
      2020/05/08 15:11:33 Gu Boyi 14747877194 Male 48.147.134.255 Android 8.0 Xiaomi Redmi K30 4G 94.704577,36.247553 Leyou 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 Female TEXT 77.82KM A melancholy crossing at the end of the world, the Cowherd and the Weaver Girl across the river. The Buddha bowed his head in front of the throne and only wished to spend a hundred years together.
    • Data generation

      • Create original file directory

        mkdir /export/data/momo_init
        
      • Upload simulation data program

        cd /export/data/momo_init
        rz
        

        image-20210905142015948

      • Create simulation data directory

        mkdir /export/data/momo_data
        
      • Run the program to generate data

        • grammar

          java -jar /export/data/momo_init/MoMo_DataGen.jar Original data path Simulated data path Randomly generated data interval ms time
          
        • Test: Generate a piece of data every 500ms

          java -jar /export/data/momo_init/MoMo_DataGen.jar \
          /export/data/momo_init/MoMo_Data.xlsx \
          /export/data/momo_data/ \
          500
          
        • Result: The simulation data file MOMO_DATA.dat is generated, and the field separator in each piece of data is \001

        image-20210929100901349

  • Summary

    • Understand the format of data sources and generate simulation data

05: Technical architecture and technology selection

  • Goal: Master the technical architecture and technology selection of real-time cases

  • Path

    • step1: demand analysis
    • step2: technology selection
    • step3: technical architecture
  • Implementation

    • Requirements analysis

      • Offline storage calculation
        • Provide offline T+1 statistical analysis
        • Provide instant query of offline data
      • real-time storage computing
        • Provide real-time statistical analysis
    • Technical Selection

      • Offline
        • Data collection: Flume
        • Offline storage: Hbase
        • Offline Analysis: Hive: Complex Calculations
        • Instant Query: Phoenix: Efficient Querying
      • real time
        • Data collection: Flume
        • Real-time storage: Kafka
        • Real-time computing: Flink
        • Real-time application: MySQL + FineBI or Redis + JavaWeb visualization
    • Technical Architecture

      image-20210905162218286

      • Why not directly give Flume data to Hbase, but uniformly give it to Kafka, and then transfer it from Kafka to Hbase?
        • Avoid high machine load caused by high concurrent writing, achieve architecture decoupling, and achieve asynchronous efficiency
        • Ensure data consistency
  • Summary

    • Master the technical architecture and technology selection of real-time cases

06: Flume review and installation

  • Objective: Review the basic use of Flume and implement Flume installation testing

  • Path

    • step1: Flume review
    • step2: Flume installation
    • step3: Flume test
  • Implementation

    • Flume’s review

      • Function: Monitor and collect data streams on files or network ports in real time
      • Scenario: Real-time collection of files
      • develop
        • step1: First develop a configuration file: properties [K=V]
        • step2: Run this file
      • composition
        • Agent: An Agent is a Flume program
        • Source: Responsible for monitoring the data source, converting the dynamic data of the data source into each piece of Event data, and putting the Event data stream into the Channel
        • Channel: Responsible for temporarily storing data sent by Source for Sink to retrieve data.
        • Sink: Responsible for pulling data from the Channel and writing it to the destination.
        • Event: represents a data object
          • head: Map collection [KV]
          • body:byte[]
    • Flume installation

      • Upload installation package

        cd /export/software/
        rz
        

        image-20210905162948401

      • Unzip and install

        tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/
        cd /export/server
        mv apache-flume-1.9.0-bin flume-1.9.0-bin
        
      • Change setting

        #Integrate HDFS and copy HDFS configuration files
        cd /export/server/flume-1.9.0-bin
        cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/
        #Modify Flume environment variables
        cd /export/server/flume-1.9.0-bin/conf/
        mv flume-env.sh.template flume-env.sh
        vim flume-env.sh
        
        #Modify line 22
        export JAVA_HOME=/export/server/jdk1.8.0_65
        #Modify line 34
        export HADOOP_HOME=/export/server/hadoop-3.3.0
        
      • Delete Flume’s own guava package and replace it with Hadoop’s

        cd /export/server/flume-1.9.0-bin
        rm -rf lib/guava-11.0.2.jar
        cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
        
      • Create a directory

        cd /export/server/flume-1.9.0-bin
        #Program configuration file storage directory
        mkdir usercase
        #Taildir metadata storage directory
        mkdir position
        
    • Flume’s Test

      • Requirements: Collect chat data and write to HDFS

      • analyze

        • Source: taildir: dynamically monitor multiple files to achieve real-time data collection
        • Channel: mem: cache data in memory
        • Sink: hdfs
      • develop

        vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
        
        # define a1
        a1.sources = s1
        a1.channels = c1
        a1.sinks = k1
        
        #define s1
        a1.sources.s1.type = TAILDIR
        #Specify a metadata record file
        a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json
        #Convert all data sources that need to be monitored into a group
        a1.sources.s1.filegroups = f1
        #Specify who f1 is: monitor all files in the directory
        a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
        #Specify that the header of the data collected by f1 contains a KV pair
        a1.sources.s1.headers.f1.type = momo
        a1.sources.s1.fileHeader = true
        
        #define c1
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 10000
        a1.channels.c1.transactionCapacity = 1000
        
        #define k1
        a1.sinks.k1.type = hdfs
        a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d
        a1.sinks.k1.hdfs.fileType = DataStream
        #Specify to generate files based on time, generally closed
        a1.sinks.k1.hdfs.rollInterval = 0
        #Specify the file size to generate the file, generally the number of bytes corresponding to 120 ~ 125M
        a1.sinks.k1.hdfs.rollSize = 102400
        #Specify the number of events to generate a file, usually closed
        a1.sinks.k1.hdfs.rollCount = 0
        a1.sinks.k1.hdfs.filePrefix = momo
        a1.sinks.k1.hdfs.fileSuffix = .log
        a1.sinks.k1.hdfs.useLocalTimeStamp = true
        
        #bound
        a1.sources.s1.channels = c1
        a1.sinks.k1.channel = c1
        
      • Start HDFS

        start-dfs.sh
        
      • Run Flume

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
        
      • Run simulated data

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        100
        
      • View Results

        image-20210905171157230

  • Summary

    • Review the basic use of Flume and implement Flume installation testing

07: Development of Flume collection program

  • Goal: Realize the development of case Flume collection program

  • Path

    • step1: demand analysis
    • step2: program development
    • step3: test implementation
  • Implementation

    • Requirements analysis

      • Requirements: Collect chat data and write it to Kafka in real time

      • Source:taildir

      • Channel:mem

      • Sink: Kafka sink

        a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
        a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
        a1.sinks.k1.kafka.producer.acks = 1
        a1.sinks.k1.kafka.topic = mytopic
        a1.sinks.k1.kafka.flumeBatchSize = 20
        a1.sinks.k1.kafka.producer.linger.ms = 1
        a1.sinks.k1.kafka.producer.compression.type = snappy
        
    • Program Development

      vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
      
      # define a1
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      
      #define s1
      a1.sources.s1.type = TAILDIR
      #Specify a metadata record file
      a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json
      #Convert all data sources that need to be monitored into a group
      a1.sources.s1.filegroups = f1
      #Specify who f1 is: monitor all files in the directory
      a1.sources.s1.filegroups.f1 = /export/data/momo_data/.*
      #Specify that the header of the data collected by f1 contains a KV pair
      a1.sources.s1.headers.f1.type = momo
      a1.sources.s1.fileHeader = true
      
      #define c1
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 10000
      a1.channels.c1.transactionCapacity = 1000
      
      #define k1
      a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
      a1.sinks.k1.kafka.topic = MOMO_MSG
      a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092
      a1.sinks.k1.kafka.flumeBatchSize = 10
      a1.sinks.k1.kafka.producer.acks = 1
      a1.sinks.k1.kafka.producer.linger.ms = 100
      
      #bound
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
      
    • Test implementation

      • Start Kafka

        start-zk-all.sh
        start-kafka.sh
        
      • Create Topic

        kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
        

        Note: Kafka2.11 version uses –zookeeper instead
        kafka-topics.sh –create –topic MOMO_MSG –partitions 3 –replication-factor 2 –zookeeper node01:9092

      • enumerate

        kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • Start consumer

        kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
        
      • Start the Flume program

        cd /export/server/flume-1.9.0-bin
        bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
        
      • Start simulation data

        java -jar /export/data/momo_init/MoMo_DataGen.jar \
        /export/data/momo_init/MoMo_Data.xlsx \
        /export/data/momo_data/ \
        50
        
      • Observation results

  • Summary

    • Realize the development of case Flume collection program