1 FlumeInstallation Deployment < /strong>
1.1 Installation address
(1) Flume official website address: Welcome to Apache Flume – Apache Flume
(2) Document viewing address: Flume 1.11.0 User Guide – Apache Flume
(3) Download address: http://archive.apache.org/dist/flume/
1.2 Installation and Deployment
(1) Upload apache-flume-1.9.0-bin.tar.gz to the /app/local directory of IDC 53
(2) Unzip apache-flume-1.9.0-bin.tar.gz to the /app/local directory
tar -zxvf /app/local/apache-flume-1.9.0-bin.tar.gz -C /app/local
(3) Modify the name of apache-flume-1.9.0-bin to flume-1.9.0
mv /app/local/apache-flume-1.9.0-bin /app/local/flume-1.9.0
(4) Create a new job directory and deploy the file-flume-gp6.conf configuration file
cd flume-1.9.0/
mkdir job
cd job/
vim file-flume-gp6.conf
# Define proxy name a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Configure data source and monitor text files a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = "Path to read files" a1.sources.r1.positionFile = /app/local/flume-1.9.0/taildir_position.json # Configure Sink to send data to Greenplum a1.sinks.k1.type = org.example.GreenplumSink #Custom sink class name a1.sinks.k1.driver = org.postgresql.Driver #jdbc:postgresql://gphost number:5432/database?currentSchema=schema name a1.sinks.k1.url = jdbc:postgresql://10.180.59.53:5432/test?currentSchema=wytest a1.sinks.k1.username = gp username a1.sinks.k1.password = gp password a1.sinks.k1.batchSize = 100 a1.sinks.k1.tablename = written table name #Configure channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # connect a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2 Custom sink code
2.1 Environment Deployment
(1) Create a new maven project and add postgresql and flume dependencies to the pom file.
org.postgresql
postgresql
42.6.0
org.apache.flume
flume-ng-core
1.9.0
(2) Create the GP database table wytest.flume2gp, mode.table name
CREATE TABLE wytest.flume2gp (
id SERIAL PRIMARY KEY,
createtime VARCHAR(255),
content VARCHAR(4096)
);
2.2 Code Deployment
(3) Create a new GreenplumSink class
package org.example; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.Date; /** * Customize GreenplumSink */ public class GreenplumSink extends AbstractSink implements Configurable { private String url = ""; private String username = ""; private String password = ""; private String tableName = ""; Connection con = null; @Override public Status process(){ Status status = null; // Start transaction to obtain the Channel object Channel ch = getChannel(); Transaction txn = ch.getTransaction(); txn.begin(); try { Event event = ch.take(); if (event.getBody().length>0) { //Get the data in body String body = new String(event.getBody(), "UTF-8"); System.out.println(body + "-------body"); //Save to GreenplumSink SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); System.out.println(createtime); String sql = "insert into " + tableName + " (createtime, "content") values ('" + createtime + "','" + body + "')"; System.out.println(sql); PreparedStatement stmt = con.prepareStatement(sql); System.out.println(body); stmt.executeUpdate(); System.out.println("1"); stmt.close(); System.out.println("2"); status = Status.READY; }else { status = Status.BACKOFF; } txn.commit(); } catch (Throwable t){ System.out.println("txn.rollback()"); txn.rollback(); // t.getCause().printStackTrace(); status = Status.BACKOFF; } finally{ txn.close(); } return status; } /** * Get the parameters specified in the configuration file * @param context */ @Override public void configure(Context context) { url = context.getString("url"); username = context.getString("username"); password = context.getString("password"); tableName = context.getString("tablename"); } @Override public synchronized void start() { try{ //Initialize database connection con = DriverManager.getConnection(url, username, password); super.start(); System.out.println("finish start"); }catch (Exception ex){ ex.printStackTrace(); } } @Override public synchronized void stop(){ try{ con.close(); }catch(SQLException e) { e.printStackTrace(); } super.stop(); } }
(4) The code is packaged and uploaded to the lib directory of flume
(5) Upload postgresql-42.6.0.jar to the lib directory of flume
3 Start flume and test writing is successful
- Run the startup command in the flume directory
cd /app/local/flume-1.9.0/
bin/flume-ng agent -c conf/ -n a1 -f job/file-flume-gp6.conf -Dflume.root.logger=INFO,console
2. Verify that the data enters the gp database
echo “test” >> file path
New requirements (written to different fields)
It needs to be segmented in the custom sink code when obtaining the body data in the channel.
//Get the data in body String body = new String(event.getBody(), "UTF-8"); System.out.println(body + "-------body"); String[] arr = body.split("\001"); String slc = arr[0]; String time = arr[1]; String t1 = arr[2]; String t2 = arr[3]; //Save to GreenplumSink SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String createtime = df.format(new Date()); System.out.println(createtime); String sql = "insert into " + tableName + " (createtime, "slc", "time", "t1", "t2") values ('" + createtime + "','" + slc + "','" + time + "','" + t1 + "','" + t2 + "')";
The separator is very special. It is the default of hive. In vim, enter ctrlV ctrl A, and enter ^A. It is a character and is blue in vim. SOH is displayed in notepad++, it is a small box in txt, and it is represented by \001 in java code.