Flume custom Sink collects txt data to Greenplum

1 FlumeInstallation Deployment < /strong>

1.1 Installation address

(1) Flume official website address: Welcome to Apache Flume – Apache Flume

(2) Document viewing address: Flume 1.11.0 User Guide – Apache Flume

(3) Download address: http://archive.apache.org/dist/flume/

1.2 Installation and Deployment

(1) Upload apache-flume-1.9.0-bin.tar.gz to the /app/local directory of IDC 53

(2) Unzip apache-flume-1.9.0-bin.tar.gz to the /app/local directory

tar -zxvf /app/local/apache-flume-1.9.0-bin.tar.gz -C /app/local

(3) Modify the name of apache-flume-1.9.0-bin to flume-1.9.0

mv /app/local/apache-flume-1.9.0-bin /app/local/flume-1.9.0

(4) Create a new job directory and deploy the file-flume-gp6.conf configuration file

cd flume-1.9.0/

mkdir job

cd job/

vim file-flume-gp6.conf

# Define proxy name

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Configure data source and monitor text files

a1.sources.r1.type = TAILDIR

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = "Path to read files"

a1.sources.r1.positionFile = /app/local/flume-1.9.0/taildir_position.json



# Configure Sink to send data to Greenplum

a1.sinks.k1.type = org.example.GreenplumSink #Custom sink class name

a1.sinks.k1.driver = org.postgresql.Driver

#jdbc:postgresql://gphost number:5432/database?currentSchema=schema name

a1.sinks.k1.url = jdbc:postgresql://10.180.59.53:5432/test?currentSchema=wytest

a1.sinks.k1.username = gp username

a1.sinks.k1.password = gp password

a1.sinks.k1.batchSize = 100

a1.sinks.k1.tablename = written table name



#Configure channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# connect

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

2 Custom sink code

2.1 Environment Deployment

(1) Create a new maven project and add postgresql and flume dependencies to the pom file.

org.postgresql

postgresql

42.6.0

org.apache.flume

flume-ng-core

1.9.0

(2) Create the GP database table wytest.flume2gp, mode.table name

CREATE TABLE wytest.flume2gp (

id SERIAL PRIMARY KEY,

createtime VARCHAR(255),

content VARCHAR(4096)

);

2.2 Code Deployment

(3) Create a new GreenplumSink class

package org.example;

import org.apache.flume.*;

import org.apache.flume.conf.Configurable;

import org.apache.flume.sink.AbstractSink;


import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.text.SimpleDateFormat;

import java.util.Date;

/**

 * Customize GreenplumSink

 */

public class GreenplumSink extends AbstractSink implements Configurable {

    private String url = "";

    private String username = "";

    private String password = "";

    private String tableName = "";

    Connection con = null;

    @Override

    public Status process(){

        Status status = null;

        // Start transaction to obtain the Channel object

        Channel ch = getChannel();

        Transaction txn = ch.getTransaction();

        txn.begin();

        try

        {
            Event event = ch.take();

            if (event.getBody().length>0)

            {
                //Get the data in body

                String body = new String(event.getBody(), "UTF-8");

                System.out.println(body + "-------body");

                //Save to GreenplumSink

                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                String createtime = df.format(new Date());

                System.out.println(createtime);

                String sql = "insert into " + tableName + " (createtime, "content") values ('" + createtime + "','" + body + "')";

                System.out.println(sql);

                PreparedStatement stmt = con.prepareStatement(sql);

                System.out.println(body);

                stmt.executeUpdate();

                System.out.println("1");

                stmt.close();

                System.out.println("2");

                status = Status.READY;

            }else {

                status = Status.BACKOFF;

            }

            txn.commit();

        } catch (Throwable t){

            System.out.println("txn.rollback()");
            txn.rollback();
        // t.getCause().printStackTrace();
            status = Status.BACKOFF;
        } finally{
            txn.close();
        }
        return status;

    }

    /**

     * Get the parameters specified in the configuration file

     * @param context

     */

    @Override

    public void configure(Context context) {

        url = context.getString("url");
        username = context.getString("username");
        password = context.getString("password");
        tableName = context.getString("tablename");

    }
    @Override
    public synchronized void start() {
        try{
            //Initialize database connection
            con = DriverManager.getConnection(url, username, password);
            super.start();
            System.out.println("finish start");
        }catch (Exception ex){
            ex.printStackTrace();
        }
    }
    @Override
    public synchronized void stop(){
        try{
            con.close();
        }catch(SQLException e) {
            e.printStackTrace();
        }
        super.stop();
    }
}

(4) The code is packaged and uploaded to the lib directory of flume

(5) Upload postgresql-42.6.0.jar to the lib directory of flume

3 Start flume and test writing is successful

  1. Run the startup command in the flume directory

cd /app/local/flume-1.9.0/

bin/flume-ng agent -c conf/ -n a1 -f job/file-flume-gp6.conf -Dflume.root.logger=INFO,console

2. Verify that the data enters the gp database

echo “test” >> file path

New requirements (written to different fields)

It needs to be segmented in the custom sink code when obtaining the body data in the channel.

//Get the data in body
String body = new String(event.getBody(), "UTF-8");
System.out.println(body + "-------body");
String[] arr = body.split("\001");
String slc = arr[0];
String time = arr[1];
String t1 = arr[2];
String t2 = arr[3];
//Save to GreenplumSink
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createtime = df.format(new Date());
System.out.println(createtime);
String sql = "insert into " + tableName + " (createtime, "slc", "time", "t1", "t2") values ('" + createtime + "','" + slc + "','" + time + "','" + t1 + "','" + t2 + "')";
                

The separator is very special. It is the default of hive. In vim, enter ctrlV ctrl A, and enter ^A. It is a character and is blue in vim. SOH is displayed in notepad++, it is a small box in txt, and it is represented by \001 in java code.

syntaxbug.com © 2021 All Rights Reserved.