Flume installation and deployment

Flume installation and deployment

1. Installation and deployment

(1) Upload apache-flume-1.10.1-bin.tar.gz to the /opt/software directory of Linux
(2) Unzip apache-flume-1.10.1-bin.tar.gz to the /opt/module/ directory

[xxx@hadoop102 software]$ tar -zxf /opt/software/apache-flume-1.10.1-bin.tar.gz -C /opt/module/

(3) Modify the name of apache-flume-1.10.1-bin to flume

[xxx@hadoop102 module]$ mv /opt/module/apache-flume-1.10.1-bin /opt/module/flume

(4) Modify the log4j2.xml configuration file in the conf directory and configure the log file path

[xxx@hadoop102 conf]$ vim log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--

 Licensed to the Apache Software Foundation (ASF) under one or more

 contributor license agreements. See the NOTICE file distributed with

 this work for additional information regarding copyright ownership.

 The ASF licenses this file to You under the Apache License, Version 2.0

 (the "License"); you may not use this file except in compliance with

 the License. You may obtain a copy of the License at

 

    http://www.apache.org/licenses/LICENSE-2.0

 

 Unless required by applicable law or agreed to in writing, software

 distributed under the License is distributed on an "AS IS" BASIS,

 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 See the License for the specific language governing permissions and

 limitations under the License.

 

-->

<Configuration status="ERROR">

  <Properties>

    <Property name="LOG_DIR">/opt/module/flume/log</Property>

  </Properties>

  <Appenders>

    <Console name="Console" target="SYSTEM_ERR">

      <PatternLayout pattern="%d (%t) [%p - %l] %m%n" />

    </Console>

    <RollingFile name="LogFile" fileName="${LOG_DIR}/flume.log" filePattern="${LOG_DIR}/archive/flume.log.%d{yyyyMMdd}-%i">

      <PatternLayout pattern="%d{dd MMM yyyy HH:mm:ss,SSS} %-5p [%t] (%C.%M:%L) %equals{%x}{[]}{} - %m%n" />

      <Policies>

        <!-- Roll every night at midnight or when the file reaches 100MB -->

       <SizeBasedTriggeringPolicy size="100 MB"/>

       <CronTriggeringPolicy schedule="0 0 0 * * ?"/>

      </Policies>

      <DefaultRolloverStrategy min="1" max="20">

        <Delete basePath="${LOG_DIR}/archive">

          <!-- Nested conditions: the inner condition is only evaluated on files for which the outer conditions are true. -->

          <IfFileName glob="flume.log.*">

            <!-- Only allow 1 GB of files to accumulate -->

            <IfAccumulatedFileSize exceeds="1 GB"/>
         </IfFileName>
        </Delete>
      </DefaultRolloverStrategy>
    </RollingFile>
  </Appenders>

 

  <Loggers>

    <Logger name="org.apache.flume.lifecycle" level="info"/>

    <Logger name="org.jboss" level="WARN"/>

    <Logger name="org.apache.avro.ipc.netty.NettyTransceiver" level="WARN"/>

    <Logger name="org.apache.hadoop" level="INFO"/>

<Logger name="org.apache.hadoop.hive" level="ERROR"/>

#Introduce console output to facilitate learning and viewing logs

    <Root level="INFO">

      <AppenderRef ref="LogFile" />

      <AppenderRef ref="Console" />

    </Root>

  </Loggers>

 

</Configuration>

2. Flume transfer configuration file

1. Log collection and transmission

(1)file_to_kafka.conf
# Name the components on this agent
a1.sources = r1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*


# Use a channel which buffers events in memory
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_log

a1.channels.c1.parseAsFlumeEvent = false


# Bind the source and sink to the channel
a1.sources.r1.channels = c1

(2)kafka_to_hdfs_log.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_log
a1.sources.r1.kafka.consumer.group.id = flume1
#a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TimeStampInterceptor$MyBuilder

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false

a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

#Control output file type
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip
# Use a channel which buffers events in file
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

The interceptor needs to be uploaded to flume/lib/interceptor

2. Business data transmission

kafka_to_hdfs_db.conf
#Rename
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#Source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics = topic_db
a1.sources.r1.kafka.consumer.group.id = flume1
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.setTopicHeader = true
a1.sources.r1.topicHeader = topic
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.interceptor.TableNameAndTimeStampInterceptor$MyBuilder
#channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/db/%{<!-- -->tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = db
a1.sinks.k1.hdfs.round = false
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip

## Assembling
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1