Article directory
-
- 04: Data source
- 05: Technical architecture and technology selection
- 06: Flume review and installation
- 07: Development of Flume collection program
04: Data source
-
Objective: Understand the format of data sources and generate simulation data
-
Path
- step1: data format
- step2: data generation
-
Implementation
-
Data format
Message time Sender’s nickname Sender’s account number Sender’s gender Sender IP Sender System Sender Mobile Phone Model Sender Network Standard Sender GPS Recipient Nickname Recipient IP Recipient Account Recipient System th>
Recipient’s mobile phone model Recipient’s network format Recipient’s GPS Recipient’s gender Message type Distance between both parties Message msg_time sender_nickyname sender_account sender_sex sender_ip sender_os sender_phone_type sender_network sender_gps receiver_nickyname receiver_ip receiver_account receiver_os receiver_phone_type receiver_network receiver_gps receiver_sex msg_type distance message 2020/05/08 15:11:33 Gu Boyi 14747877194 Male 48.147.134.255 Android 8.0 Xiaomi Redmi K30 4G 94.704577,36.247553 Leyou 97.61.25.52 17832829395 IOS 10.0 Apple iPhone 10 4G 84.034145,41.423804 Female TEXT 77.82KM A melancholy crossing at the end of the world, the Cowherd and the Weaver Girl across the river. The Buddha bowed his head in front of the throne and only wished to spend a hundred years together. -
Data generation
-
Create original file directory
mkdir /export/data/momo_init
-
Upload simulation data program
cd /export/data/momo_init rz
-
Create simulation data directory
mkdir /export/data/momo_data
-
Run the program to generate data
-
grammar
java -jar /export/data/momo_init/MoMo_DataGen.jar Original data path Simulated data path Randomly generated data interval ms time
-
Test: Generate a piece of data every 500ms
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 500
-
Result: The simulation data file MOMO_DATA.dat is generated, and the field separator in each piece of data is \001
-
-
-
-
Summary
- Understand the format of data sources and generate simulation data
05: Technical architecture and technology selection
-
Goal: Master the technical architecture and technology selection of real-time cases
-
Path
- step1: demand analysis
- step2: technology selection
- step3: technical architecture
-
Implementation
-
Requirements analysis
- Offline storage calculation
- Provide offline T+1 statistical analysis
- Provide instant query of offline data
- real-time storage computing
- Provide real-time statistical analysis
- Offline storage calculation
-
Technical Selection
- Offline
- Data collection: Flume
- Offline storage: Hbase
- Offline Analysis: Hive: Complex Calculations
- Instant Query: Phoenix: Efficient Querying
- real time
- Data collection: Flume
- Real-time storage: Kafka
- Real-time computing: Flink
- Real-time application: MySQL + FineBI or Redis + JavaWeb visualization
- Offline
-
Technical Architecture
- Why not directly give Flume data to Hbase, but uniformly give it to Kafka, and then transfer it from Kafka to Hbase?
- Avoid high machine load caused by high concurrent writing, achieve architecture decoupling, and achieve asynchronous efficiency
- Ensure data consistency
- Why not directly give Flume data to Hbase, but uniformly give it to Kafka, and then transfer it from Kafka to Hbase?
-
-
Summary
- Master the technical architecture and technology selection of real-time cases
06: Flume review and installation
-
Objective: Review the basic use of Flume and implement Flume installation testing
-
Path
- step1: Flume review
- step2: Flume installation
- step3: Flume test
-
Implementation
-
Flume’s review
- Function: Monitor and collect data streams on files or network ports in real time
- Scenario: Real-time collection of files
- develop
- step1: First develop a configuration file: properties [K=V]
- step2: Run this file
- composition
- Agent: An Agent is a Flume program
- Source: Responsible for monitoring the data source, converting the dynamic data of the data source into each piece of Event data, and putting the Event data stream into the Channel
- Channel: Responsible for temporarily storing data sent by Source for Sink to retrieve data.
- Sink: Responsible for pulling data from the Channel and writing it to the destination.
- Event: represents a data object
- head: Map collection [KV]
- body:byte[]
-
Flume installation
-
Upload installation package
cd /export/software/ rz
-
Unzip and install
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /export/server/ cd /export/server mv apache-flume-1.9.0-bin flume-1.9.0-bin
-
Change setting
#Integrate HDFS and copy HDFS configuration files cd /export/server/flume-1.9.0-bin cp /export/server/hadoop/etc/hadoop/core-site.xml ./conf/ #Modify Flume environment variables cd /export/server/flume-1.9.0-bin/conf/ mv flume-env.sh.template flume-env.sh vim flume-env.sh
#Modify line 22 export JAVA_HOME=/export/server/jdk1.8.0_65 #Modify line 34 export HADOOP_HOME=/export/server/hadoop-3.3.0
-
Delete Flume’s own guava package and replace it with Hadoop’s
cd /export/server/flume-1.9.0-bin rm -rf lib/guava-11.0.2.jar cp /export/server/hadoop/share/hadoop/common/lib/guava-27.0-jre.jar lib/
-
Create a directory
cd /export/server/flume-1.9.0-bin #Program configuration file storage directory mkdir usercase #Taildir metadata storage directory mkdir position
-
-
Flume’s Test
-
Requirements: Collect chat data and write to HDFS
-
analyze
- Source: taildir: dynamically monitor multiple files to achieve real-time data collection
- Channel: mem: cache data in memory
- Sink: hdfs
-
develop
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_hdfs.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #Specify a metadata record file a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_hdfs.json #Convert all data sources that need to be monitored into a group a1.sources.s1.filegroups = f1 #Specify who f1 is: monitor all files in the directory a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #Specify that the header of the data collected by f1 contains a KV pair a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /flume/momo/test/daystr=%Y-%m-%d a1.sinks.k1.hdfs.fileType = DataStream #Specify to generate files based on time, generally closed a1.sinks.k1.hdfs.rollInterval = 0 #Specify the file size to generate the file, generally the number of bytes corresponding to 120 ~ 125M a1.sinks.k1.hdfs.rollSize = 102400 #Specify the number of events to generate a file, usually closed a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.filePrefix = momo a1.sinks.k1.hdfs.fileSuffix = .log a1.sinks.k1.hdfs.useLocalTimeStamp = true #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
Start HDFS
start-dfs.sh
-
Run Flume
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_hdfs.properties -Dflume.root.logger=INFO,console
-
Run simulated data
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 100
-
View Results
-
-
-
Summary
- Review the basic use of Flume and implement Flume installation testing
07: Development of Flume collection program
-
Goal: Realize the development of case Flume collection program
-
Path
- step1: demand analysis
- step2: program development
- step3: test implementation
-
Implementation
-
Requirements analysis
-
Requirements: Collect chat data and write it to Kafka in real time
-
Source:taildir
-
Channel:mem
-
Sink: Kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.topic = mytopic a1.sinks.k1.kafka.flumeBatchSize = 20 a1.sinks.k1.kafka.producer.linger.ms = 1 a1.sinks.k1.kafka.producer.compression.type = snappy
-
-
Program Development
vim /export/server/flume-1.9.0-bin/usercase/momo_mem_kafka.properties
# define a1 a1.sources = s1 a1.channels = c1 a1.sinks = k1 #define s1 a1.sources.s1.type = TAILDIR #Specify a metadata record file a1.sources.s1.positionFile = /export/server/flume-1.9.0-bin/position/taildir_momo_kafka.json #Convert all data sources that need to be monitored into a group a1.sources.s1.filegroups = f1 #Specify who f1 is: monitor all files in the directory a1.sources.s1.filegroups.f1 = /export/data/momo_data/.* #Specify that the header of the data collected by f1 contains a KV pair a1.sources.s1.headers.f1.type = momo a1.sources.s1.fileHeader = true #define c1 a1.channels.c1.type = memory a1.channels.c1.capacity = 10000 a1.channels.c1.transactionCapacity = 1000 #define k1 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic = MOMO_MSG a1.sinks.k1.kafka.bootstrap.servers = node1:9092,node2:9092,node3:9092 a1.sinks.k1.kafka.flumeBatchSize = 10 a1.sinks.k1.kafka.producer.acks = 1 a1.sinks.k1.kafka.producer.linger.ms = 100 #bound a1.sources.s1.channels = c1 a1.sinks.k1.channel = c1
-
Test implementation
-
Start Kafka
start-zk-all.sh start-kafka.sh
-
Create Topic
kafka-topics.sh --create --topic MOMO_MSG --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
Note: Kafka2.11 version uses –zookeeper instead
kafka-topics.sh –create –topic MOMO_MSG –partitions 3 –replication-factor 2 –zookeeper node01:9092 -
enumerate
kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
-
Start consumer
kafka-console-consumer.sh --topic MOMO_MSG --bootstrap-server node1:9092,node2:9092,node3:9092
-
Start the Flume program
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
Start simulation data
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 50
-
Observation results
-
-
-
Summary
- Realize the development of case Flume collection program