Flume installation and deployment
1. Installation and deployment
(1) Upload apache-flume-1.10.1-bin.tar.gz to the /opt/software directory of Linux
(2) Unzip apache-flume-1.10.1-bin.tar.gz to the /opt/module/ directory
[xxx@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.10.1-bin.tar.gz -C /opt/module/
(3) Modify the name of apache-flume-1.10.1-bin to flume
[xxx@hadoop102 module]$ mv /opt/module/apache-flume-1.10.1-bin /opt/module/flume
(4) Modify the log4j2.xml configuration file in the conf directory and configure the log file path
[xxx@hadoop102 conf]$ vim log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <Configuration status="ERROR"> <Properties> <Property name="LOG_DIR">/opt/module/flume/log</Property> </Properties> <Appenders> <Console name="Console" target="SYSTEM_ERR"> <PatternLayout pattern="%d (%t) [%p - %l] %m%n" /> </Console> <RollingFile name="LogFile" fileName="${LOG_DIR}/flume.log" filePattern="${LOG_DIR}/archive/flume.log.%d{yyyyMMdd}-%i"> <PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" /> <Policies> <!-- Roll every night at midnight or when the file reaches 100MB --> <SizeBasedTriggeringPolicy size="100 MB"/> <CronTriggeringPolicy schedule="0 0 0 * * ?"/> </Policies> <DefaultRolloverStrategy min="1" max="20"> <Delete basePath="${LOG_DIR}/archive"> <!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. --> <IfFileName glob="flume.log.*"> <!-- Only allow 1 GB of files to accumulate --> <IfAccumulatedFileSize exceeds="1 GB"/> </IfFileName> </Delete> </DefaultRolloverStrategy> </RollingFile> </Appenders> <Loggers> <Logger name="org.apache.flume.lifecycle" level="info"/> <Logger name="org.jboss" level="WARN"/> <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/> <Logger name="org.apache.hadoop" level="INFO"/> <Logger name="org.apache.hadoop.hive" level="ERROR"/> #Introduce console output to facilitate learning and viewing logs <Root level="INFO"> <AppenderRef ref="LogFile" /> <AppenderRef ref="Console" /> </Root> </Loggers> </Configuration>
2. Flume transfer configuration file
1. Log collection and transmission
(1)file_to_kafka.conf
# Name the components on this agent a1.sources = r1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* # Use a channel which buffers events in memory a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false # Bind the source and sink to the channel a1.sources.r1.channels = c1
(2)kafka_to_hdfs_log.conf
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topics = topic_log a1.sources.r1.kafka.consumer.group.id = flume1 #a1.sources.r1.kafka.consumer.auto.offset.reset=earliest a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TimeStampInterceptor$MyBuilder # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = log a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 #Control output file type a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip # Use a channel which buffers events in file a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1 a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
The interceptor needs to be uploaded to flume/lib/interceptor
2. Business data transmission
kafka_to_hdfs_db.conf
#Rename a1.sources = r1 a1.channels = c1 a1.sinks = k1 #Source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize = 5000 a1.sources.r1.batchDurationMillis = 2000 a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics = topic_db a1.sources.r1.kafka.consumer.group.id = flume1 a1.sources.r1.kafka.consumer.auto.offset.reset=earliest a1.sources.r1.setTopicHeader = true a1.sources.r1.topicHeader = topic a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TableNameAndTimeStampInterceptor$MyBuilder #channel a1.channels.c1.type = file a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/ a1.channels.c1.maxFileSize = 2146435071 a1.channels.c1.capacity = 1000000 a1.channels.c1.keep-alive = 6 ## sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{<!-- -->tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix = db a1.sinks.k1.hdfs.round = false a1.sinks.k1.hdfs.rollInterval = 10 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.fileType = CompressedStream a1.sinks.k1.hdfs.codeC = gzip ## Assembling a1.sources.r1.channels = c1 a1.sinks.k1.channel= c1