Real-time comprehensive case (3) offline analysis based on Flume+Kafka+Hbase+Flink+FineBI

Article directory

    • 08: Offline analysis: Hbase table design and construction
    • 09: Offline analysis: Kafka consumer construction
    • 10: Offline analysis: Hbase connection construction
    • 11: Offline analysis: Construction of Rowkey
    • 12: Offline analysis: Put data column construction
    • 13: Offline Analysis: Storage Run Test
    • 14: Offline analysis: Hive correlation test
    • 15: Offline analysis: Phoenix correlation test

08: Offline analysis: Hbase table design and construction

  • Goal: Master the design of Hbase tables and the implementation of table creation

  • Path

    • step1: basic design
    • step2: Rowkey design
    • step3: Partition design
    • step4: Create table
  • Implementation

    • Basic design

      • Namespace: MOMO_CHAT

      • Table:MOMO_MSG

      • Family:C1

      • Qualifier: consistent with the field name in the data

        image-20210905200550740

    • Rowkey Design

      • Query requirements: Query chat records based on sender id + recipient id + message date

        • Sender account
        • Recipient account
        • time
      • Design rules: business, unique, length, hash, combined

      • Design implementation

        • Salting solution: CRC, Hash, MD5, MUR
        • => 8-bit, 16-bit, 32-bit
        MD5Hash [sender account_recipient account_message time => 8 digits]_sender account_recipient account_message time
        
    • Zoning Design

      • Rowkey prefix: MD5 encoding, consisting of letters and numbers
      • Data concurrency: high
      • Partition design: use HexSplit16 to divide multiple partitions
    • Create table

      • Start Hbase: start-hbase.sh
      • Enter the client: hbase shell
      #Create NS
      create_namespace 'MOMO_CHAT'
      #buildtable
      create 'MOMO_CHAT:MOMO_MSG', {<!-- -->NAME => "C1", COMPRESSION => "GZ"}, {<!-- --> NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
      

      image-20210905192807020

  • Summary

    • Master the design of Hbase tables and the implementation of creating tables

09: Offline analysis: Kafka consumer construction

  • Goal: Develop offline consumers

  • Path

    • Overall implementation path

      //Entry: Call to consume Kafka and write data to Hbase
      public void main(){<!-- -->
          //step1: Consume Kafka
          consumerKafka();
          
      }
      
      //Used to consume Kafka data
      public void consumerKafka(){<!-- -->
          prop = new Properties()
      KafkaConsumer consumer = new KafkaConsumer(prop)
          consumer.subscribe("MOMO_MSG")
          ConsumerRecords records = consumer.poll
          //Consumption and processing based on each partition
              record: Topic, Partition, Offset, Key, Value
          //step2:Write to Hbase
              writeToHbase(value)
          //Submit the offset of this partition
           commitSycn(offset + 1)
      }
      
      
      //Used to write value data into Hbase method
      public void writeToHbase(){<!-- -->
          //step1: Build connection
          //step2: Build Table object
          //step3: Build Put object
          //Get rowkey
         rowkey = getRowkey(value)
          Put put = new Put(rowkey)
          put.Add each column
          table.put()
      }
      
      public String getRowkey(){<!-- -->
          value.getSender
          value.getReceiver
          value.getTime
              rowkey = MD5 + sender + receiverId + time
              return rowkey
      }
      
  • Implementation

     /**
         * Used to consume Kafka data and write legal data to Hbase
         */
        private static void consumerKafkaToHbase() throws Exception {<!-- -->
            //Build configuration object
            Properties props = new Properties();
            //Specify the server address
            props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
            //Specify the ID of the consumer group
            props.setProperty("group.id", "momo");
            //Turn off automatic submission
            props.setProperty("enable.auto.commit", "false");
            //Specify the type of K and V deserialization
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //Build consumer connection
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            //Specify which topics to subscribe to
            consumer.subscribe(Arrays.asList("MOMO_MSG"));
            //Continue to pull data
            while (true) {<!-- -->
                //Request to pull data from Kafka and wait for Kafka's response. If there is a response within 100ms, pull the data. If there is no response within 100ms, submit the next request: 100ms is the waiting time for Kafka's response.
                //All the data pulled: multiple KV data are in the ConsumerRecords object, similar to a collection
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                //todo:3-process the pulled data: print
                //Get the data of each partition for processing
                Set<TopicPartition> partitions = records.partitions();//Get all partitions in this data
                //Process the data of each partition
                for (TopicPartition partition : partitions) {<!-- -->
                    List<ConsumerRecord<String, String>> partRecords = records.records(partition);//Get all the data of this partition
                    //Process the data of this partition
                    long offset = 0;
                    for (ConsumerRecord<String, String> record : partRecords) {<!-- -->
                        //Get Topic
                        String topic = record.topic();
                        //Get partition
                        int part = record.partition();
                        //Get offset
                        offset = record.offset();
                        //Get Key
                        String key = record.key();
                        //Get Value
                        String value = record.value();
                        System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                        //Write Value data to Hbase
                        if(value != null & amp; & amp; !"".equals(value) & amp; & amp; value.split("\001").length == 20 ){<!-- -->
                            writeToHbase(value);
                        }
                    }
                    //Manually submit the commit offset of the partition
                    Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset + 1));
                    consumer.commitSync(offsets);
                }
            }
        }
    
  • Summary

    • Implement the development of offline consumers

10: Offline analysis: Hbase connection construction

  • Goal: Realize the construction of Hbase connection

  • Implementation

     private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static Connection conn;
        private static Table table;
        private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//table name
        private static byte[] family = Bytes.toBytes("C1");//Column family
    
        // Static code block: loaded as the class is loaded, generally only loaded once to avoid building multiple connections that affect performance
        static{<!-- -->
            try {<!-- -->
                //Build configuration object
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
                //Build connection
                conn = ConnectionFactory.createConnection(conf);
                //Get table object
                table = conn.getTable(tableName);
            } catch (IOException e) {<!-- -->
                e.printStackTrace();
            }
        }
    
  • Summary

    • Implement the construction of Hbase connection

11: Offline analysis: Construction of Rowkey

  • Goal: Achieve the construction of Rowkey

  • Implementation

    private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {<!-- -->
            //Convert timestamp
            long time = format.parse(stime).getTime();
            String suffix = sender_accounter + "_" + receiver_accounter + "_" + time;
            //Build MD5
            String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
            //Merge and return
            return prefix + "_" + suffix;
        }
    
  • Summary

    • Implement the construction of Rowkey

12: Offline analysis: Put data column construction

  • Goal: Construct the Put data column

  • Implementation

    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
    
  • Summary

    • Implement the construction of Put data columns

13: Offline Analysis: Storage Run Test

  • Goal: The test run consumes Kafka data and dynamically writes it to Hbase

  • Implementation

    • Start consumer program

    • Start the Flume program

      cd /export/server/flume-1.9.0-bin
      bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
      
    • Start simulation data

      java -jar /export/data/momo_init/MoMo_DataGen.jar \
      /export/data/momo_init/MoMo_Data.xlsx \
      /export/data/momo_data/ \
      10
      
    • Observe Hbase results

      image-20210905213457245

  • Summary

    • Test run consumes Kafka data and dynamically writes it to Hbase

14: Offline analysis: Hive correlation test

  • Goal: Use Hive to associate with Hbase to achieve offline analysis

  • Path

    • step1: Association
    • step2: Query
  • Implementation

    • Start Hive and yarn

      start-yarn.sh
      hive-daemon.sh metastore
      hive-daemon.sh hiveserver2
      start-beeline.sh
      
    • Association

      create database MOMO_CHAT;
      use MOMO_CHAT;
      create external table if not exists MOMO_CHAT.MOMO_MSG (
        id string,
        msg_time string ,
        sender_nickyname string ,
        sender_account string,
        sender_sex string,
        sender_ip string,
        sender_os string,
        sender_phone_type string ,
        sender_network string,
        sender_gps string,
        receiver_nickyname string ,
        receiver_ip string ,
        receiver_account string ,
        receiver_os string,
        receiver_phone_type string ,
        receiver_network string,
        receiver_gps string ,
        receiver_sex string ,
        msg_type string,
        distance string,
        message string
      ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
      with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname,
      C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,
      C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,
      C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,
      C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
      
    • Analytical queries

      --Basic query
      select
        msg_time,sender_nickyname,receiver_nickyname,distance
      from momo_msg limit 10;
      
      --Query chat history: sender id + receiver id + date: 1f300e5d_13280256412_15260978785_1632888342000
      select
        *
      from momo_msg
      where sender_account='13280256412'
      and receiver_account='15260978785'
      and substr(msg_time,0,10) = '2021-09-29';
      
      --Count the number of messages per hour
      select
        substr(msg_time,0,13) as hour,
        count(*) as cnt
      from momo_msg
      group by substr(msg_time,0,13);
      
  • Summary

    • Use Hive to associate with Hbase to implement offline analysis

15: Offline analysis: Phoenix correlation test

  • Goal: Use Phoenix to associate with Hbase to achieve instant query

  • Path

    • step1: Association
    • step2: Query
  • Implementation

    • start up

      cd /export/server/phoenix-5.0.0-HBase-2.0-bin/
      bin/sqlline.py node1:2181
      
    • Association

      create view if not exists MOMO_CHAT.MOMO_MSG (
        "id" varchar primary key,
        C1."msg_time" varchar,
        C1."sender_nickyname" varchar ,
        C1."sender_account" varchar,
        C1."sender_sex" varchar ,
        C1."sender_ip" varchar,
        C1."sender_os" varchar ,
        C1."sender_phone_type" varchar,
        C1."sender_network" varchar ,
        C1."sender_gps" varchar ,
        C1."receiver_nickyname" varchar ,
        C1."receiver_ip" varchar,
        C1."receiver_account" varchar,
        C1."receiver_os" varchar ,
        C1."receiver_phone_type" varchar,
        C1."receiver_network" varchar,
        C1."receiver_gps" varchar ,
        C1."receiver_sex" varchar ,
        C1."msg_type" varchar,
        C1."distance" varchar ,
        C1."message" varchar
      );
      
    • Instant query

      --Basic query
      select
        "id",c1."sender_account",c1."receiver_account"
      from momo_chat.momo_msg
      limit 10;
      
      --Query the number of messages sent by each sender
      select
        c1."sender_account" ,
        count(*) as cnt
      from momo_chat.momo_msg
      group by c1."sender_account";
      
      --Query the number of people chatting with each sender
      select
        c1."sender_account" ,
        count(distinct c1."receiver_account") as cnt
      from momo_chat.momo_msg
      group by c1."sender_account"
      order by cnt desc;
      
  • Summary

    • Use Phoenix to associate with Hbase to achieve instant query