Article directory
-
- 08: Offline analysis: Hbase table design and construction
- 09: Offline analysis: Kafka consumer construction
- 10: Offline analysis: Hbase connection construction
- 11: Offline analysis: Construction of Rowkey
- 12: Offline analysis: Put data column construction
- 13: Offline Analysis: Storage Run Test
- 14: Offline analysis: Hive correlation test
- 15: Offline analysis: Phoenix correlation test
08: Offline analysis: Hbase table design and construction
-
Goal: Master the design of Hbase tables and the implementation of table creation
-
Path
- step1: basic design
- step2: Rowkey design
- step3: Partition design
- step4: Create table
-
Implementation
-
Basic design
-
Namespace: MOMO_CHAT
-
Table:MOMO_MSG
-
Family:C1
-
Qualifier: consistent with the field name in the data
-
-
Rowkey Design
-
Query requirements: Query chat records based on sender id + recipient id + message date
- Sender account
- Recipient account
- time
-
Design rules: business, unique, length, hash, combined
-
Design implementation
- Salting solution: CRC, Hash, MD5, MUR
- => 8-bit, 16-bit, 32-bit
MD5Hash [sender account_recipient account_message time => 8 digits]_sender account_recipient account_message time
-
-
Zoning Design
- Rowkey prefix: MD5 encoding, consisting of letters and numbers
- Data concurrency: high
- Partition design: use HexSplit16 to divide multiple partitions
-
Create table
- Start Hbase: start-hbase.sh
- Enter the client: hbase shell
#Create NS create_namespace 'MOMO_CHAT' #buildtable create 'MOMO_CHAT:MOMO_MSG', {<!-- -->NAME => "C1", COMPRESSION => "GZ"}, {<!-- --> NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
-
-
Summary
- Master the design of Hbase tables and the implementation of creating tables
09: Offline analysis: Kafka consumer construction
-
Goal: Develop offline consumers
-
Path
-
Overall implementation path
//Entry: Call to consume Kafka and write data to Hbase public void main(){<!-- --> //step1: Consume Kafka consumerKafka(); } //Used to consume Kafka data public void consumerKafka(){<!-- --> prop = new Properties() KafkaConsumer consumer = new KafkaConsumer(prop) consumer.subscribe("MOMO_MSG") ConsumerRecords records = consumer.poll //Consumption and processing based on each partition record: Topic, Partition, Offset, Key, Value //step2:Write to Hbase writeToHbase(value) //Submit the offset of this partition commitSycn(offset + 1) } //Used to write value data into Hbase method public void writeToHbase(){<!-- --> //step1: Build connection //step2: Build Table object //step3: Build Put object //Get rowkey rowkey = getRowkey(value) Put put = new Put(rowkey) put.Add each column table.put() } public String getRowkey(){<!-- --> value.getSender value.getReceiver value.getTime rowkey = MD5 + sender + receiverId + time return rowkey }
-
-
Implementation
/** * Used to consume Kafka data and write legal data to Hbase */ private static void consumerKafkaToHbase() throws Exception {<!-- --> //Build configuration object Properties props = new Properties(); //Specify the server address props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); //Specify the ID of the consumer group props.setProperty("group.id", "momo"); //Turn off automatic submission props.setProperty("enable.auto.commit", "false"); //Specify the type of K and V deserialization props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //Build consumer connection KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); //Specify which topics to subscribe to consumer.subscribe(Arrays.asList("MOMO_MSG")); //Continue to pull data while (true) {<!-- --> //Request to pull data from Kafka and wait for Kafka's response. If there is a response within 100ms, pull the data. If there is no response within 100ms, submit the next request: 100ms is the waiting time for Kafka's response. //All the data pulled: multiple KV data are in the ConsumerRecords object, similar to a collection ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); //todo:3-process the pulled data: print //Get the data of each partition for processing Set<TopicPartition> partitions = records.partitions();//Get all partitions in this data //Process the data of each partition for (TopicPartition partition : partitions) {<!-- --> List<ConsumerRecord<String, String>> partRecords = records.records(partition);//Get all the data of this partition //Process the data of this partition long offset = 0; for (ConsumerRecord<String, String> record : partRecords) {<!-- --> //Get Topic String topic = record.topic(); //Get partition int part = record.partition(); //Get offset offset = record.offset(); //Get Key String key = record.key(); //Get Value String value = record.value(); System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value); //Write Value data to Hbase if(value != null & amp; & amp; !"".equals(value) & amp; & amp; value.split("\001").length == 20 ){<!-- --> writeToHbase(value); } } //Manually submit the commit offset of the partition Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset + 1)); consumer.commitSync(offsets); } } }
-
Summary
- Implement the development of offline consumers
10: Offline analysis: Hbase connection construction
-
Goal: Realize the construction of Hbase connection
-
Implementation
private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private static Connection conn; private static Table table; private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//table name private static byte[] family = Bytes.toBytes("C1");//Column family // Static code block: loaded as the class is loaded, generally only loaded once to avoid building multiple connections that affect performance static{<!-- --> try {<!-- --> //Build configuration object Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181"); //Build connection conn = ConnectionFactory.createConnection(conf); //Get table object table = conn.getTable(tableName); } catch (IOException e) {<!-- --> e.printStackTrace(); } }
-
Summary
- Implement the construction of Hbase connection
11: Offline analysis: Construction of Rowkey
-
Goal: Achieve the construction of Rowkey
-
Implementation
private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {<!-- --> //Convert timestamp long time = format.parse(stime).getTime(); String suffix = sender_accounter + "_" + receiver_accounter + "_" + time; //Build MD5 String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8); //Merge and return return prefix + "_" + suffix; }
-
Summary
- Implement the construction of Rowkey
12: Offline analysis: Put data column construction
-
Goal: Construct the Put data column
-
Implementation
put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18])); put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
-
Summary
- Implement the construction of Put data columns
13: Offline Analysis: Storage Run Test
-
Goal: The test run consumes Kafka data and dynamically writes it to Hbase
-
Implementation
-
Start consumer program
-
Start the Flume program
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
Start simulation data
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 10
-
Observe Hbase results
-
-
Summary
- Test run consumes Kafka data and dynamically writes it to Hbase
14: Offline analysis: Hive correlation test
-
Goal: Use Hive to associate with Hbase to achieve offline analysis
-
Path
- step1: Association
- step2: Query
-
Implementation
-
Start Hive and yarn
start-yarn.sh hive-daemon.sh metastore hive-daemon.sh hiveserver2 start-beeline.sh
-
Association
create database MOMO_CHAT; use MOMO_CHAT; create external table if not exists MOMO_CHAT.MOMO_MSG ( id string, msg_time string , sender_nickyname string , sender_account string, sender_sex string, sender_ip string, sender_os string, sender_phone_type string , sender_network string, sender_gps string, receiver_nickyname string , receiver_ip string , receiver_account string , receiver_os string, receiver_phone_type string , receiver_network string, receiver_gps string , receiver_sex string , msg_type string, distance string, message string ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type, C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account, C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex, C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
-
Analytical queries
--Basic query select msg_time,sender_nickyname,receiver_nickyname,distance from momo_msg limit 10; --Query chat history: sender id + receiver id + date: 1f300e5d_13280256412_15260978785_1632888342000 select * from momo_msg where sender_account='13280256412' and receiver_account='15260978785' and substr(msg_time,0,10) = '2021-09-29'; --Count the number of messages per hour select substr(msg_time,0,13) as hour, count(*) as cnt from momo_msg group by substr(msg_time,0,13);
-
-
Summary
- Use Hive to associate with Hbase to implement offline analysis
15: Offline analysis: Phoenix correlation test
-
Goal: Use Phoenix to associate with Hbase to achieve instant query
-
Path
- step1: Association
- step2: Query
-
Implementation
-
start up
cd /export/server/phoenix-5.0.0-HBase-2.0-bin/ bin/sqlline.py node1:2181
-
Association
create view if not exists MOMO_CHAT.MOMO_MSG ( "id" varchar primary key, C1."msg_time" varchar, C1."sender_nickyname" varchar , C1."sender_account" varchar, C1."sender_sex" varchar , C1."sender_ip" varchar, C1."sender_os" varchar , C1."sender_phone_type" varchar, C1."sender_network" varchar , C1."sender_gps" varchar , C1."receiver_nickyname" varchar , C1."receiver_ip" varchar, C1."receiver_account" varchar, C1."receiver_os" varchar , C1."receiver_phone_type" varchar, C1."receiver_network" varchar, C1."receiver_gps" varchar , C1."receiver_sex" varchar , C1."msg_type" varchar, C1."distance" varchar , C1."message" varchar );
-
Instant query
--Basic query select "id",c1."sender_account",c1."receiver_account" from momo_chat.momo_msg limit 10; --Query the number of messages sent by each sender select c1."sender_account" , count(*) as cnt from momo_chat.momo_msg group by c1."sender_account"; --Query the number of people chatting with each sender select c1."sender_account" , count(distinct c1."receiver_account") as cnt from momo_chat.momo_msg group by c1."sender_account" order by cnt desc;
-
-
Summary
- Use Phoenix to associate with Hbase to achieve instant query