Flume installation and deployment

1. Installation and deployment

(1) Upload apache-flume-1.10.1-bin.tar.gz to the /opt/software directory of Linux
(2) Unzip apache-flume-1.10.1-bin.tar.gz to the /opt/module/ directory

[xxx@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.10.1-bin.tar.gz -C /opt/module/

(3) Modify the name of apache-flume-1.10.1-bin to flume

[xxx@hadoop102 module]$ mv /opt/module/apache-flume-1.10.1-bin /opt/module/flume

(4) Modify the log4j2.xml configuration file in the conf directory and configure the log file path

[xxx@hadoop102 conf]$ vim log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>


<Configuration status="ERROR">


    <Property name="LOG_DIR">/opt/module/flume/log</Property>



    <Console name="Console" target="SYSTEM_ERR">

      <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />


    <RollingFile name="LogFile" fileName="${LOG_DIR}/flume.log" filePattern="${LOG_DIR}/archive/flume.log.%d{yyyyMMdd}-%i">

      <PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" />


        <!-- Roll every night at midnight or when the file reaches 100MB -->

       <SizeBasedTriggeringPolicy size="100 MB"/>

       <CronTriggeringPolicy schedule="0 0 0 * * ?"/>


      <DefaultRolloverStrategy min="1" max="20">

        <Delete basePath="${LOG_DIR}/archive">

          <!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. -->

          <IfFileName glob="flume.log.*">

            <!-- Only allow 1 GB of files to accumulate -->

            <IfAccumulatedFileSize exceeds="1 GB"/>



    <Logger name="org.apache.flume.lifecycle" level="info"/>

    <Logger name="org.jboss" level="WARN"/>

    <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>

    <Logger name="org.apache.hadoop" level="INFO"/>

<Logger name="org.apache.hadoop.hive" level="ERROR"/>

#Introduce console output to facilitate learning and viewing logs

    <Root level="INFO">

      <AppenderRef ref="LogFile" />

      <AppenderRef ref="Console" />





2. Flume transfer configuration file

1. Log collection and transmission

# Name the components on this agent
a1.sources = r1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*

# Use a channel which buffers events in memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_log

a1.channels.c1.parseAsFlumeEvent = false

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.kafka.consumer.group.id = flume1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TimeStampInterceptor$MyBuilder

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#Control output file type
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

The interceptor needs to be uploaded to flume/lib/interceptor

2. Business data transmission

a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume1
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TableNameAndTimeStampInterceptor$MyBuilder
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{<!-- -->tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## Assembling
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1