Background description:
In order to meet the real-time monitoring of unstructured files in specific directories on the Linux server and upload them to HDFS
usage instructions
Apache’s Commons-IO to implement file monitoring functions
required pom
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.0.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>1.3.9</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.4</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.28</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.26</version> </dependency> <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.22</version> </dependency> </dependencies>
public static void copyFile2HDFS(URI hdfsURI, String username, String srcPath, String newPath) { try { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(hdfsURI, conf, username); Path src = new Path(srcPath); Path dst = new Path(newPath); if (fs.exists(dst)) { fs.delete(dst, true); } fs.copyFromLocalFile(src, dst); fs.close(); System.out.println("Upload Successfully!"); } catch (Exception e) { e.printStackTrace(); StaticLog.info("Failed to copy file{}", e.getMessage()); } }
public static String getHDFSPath(File file) { // Determine the file format, including video, pictures, text and audio, etc. You can modify it according to actual needs String fileName = file.getName(); String extension = fileName.substring(fileName.lastIndexOf(".") + 1).toLowerCase(); if (extension.equals("mp4") || extension.equals("avi") || extension.equals("mov")) { return "/data/shipin/" + file.getName(); } else if (extension.equals("jpg") || extension.equals("png")) { return "/data/txt/" + file.getName(); } else if (extension.equals("m4a") || extension.equals("wav")) { return "/data/yuyin/" + file.getName(); } else if (extension.equals("txt")) { return "/data/wenjian/" + file.getName(); } else { return "/data/" + file.getName(); } }
FileMonitorTest.java
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package com.xxx.fileSync; import java.util.concurrent.TimeUnit; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.io.filefilter.IOFileFilter; import org.apache.commons.io.monitor.FileAlterationMonitor; import org.apache.commons.io.monitor.FileAlterationObserver; public class FileMonitorTest { public FileMonitorTest() { } public static void main(String[] arugs) throws Exception { String absolateDir = "/opt/xxxx"; long intervalTime = TimeUnit.SECONDS.toMillis(5L); new FileAlterationObserver(absolateDir, FileFilterUtils.and(new IOFileFilter[]{FileFilterUtils.fileFileFilter(), FileFilterUtils.suffixFileFilter(".success")})); FileAlterationObserver observer = new FileAlterationObserver(absolateDir); observer.addListener(new FileListener()); FileAlterationMonitor monitor = new FileAlterationMonitor(intervalTime, new FileAlterationObserver[]{observer}); monitor.start(); } }
FileListener.java overriding method
// // Source code recreated from a .class file by IntelliJ IDEA // (powered by FernFlower decompiler) // package com.xxx.fileSync; import java.io.File; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; import java.nio.file.StandardCopyOption; import org.apache.commons.io.monitor.FileAlterationListenerAdaptor; import org.apache.commons.io.monitor.FileAlterationObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class FileListener extends FileAlterationListenerAdaptor { private static final Logger log = LoggerFactory.getLogger(FileListener.class); URI uri = new URI("hdfs://xxxxx:802xx0"); String newPath = ""; String newHDFSPath = ""; String userName = "root"; public FileListener() throws URISyntaxException { } public void onStart(FileAlterationObserver observer) { super.onStart(observer); } public void onDirectoryCreate(File directory) { this.newPath = "/data" + directory.getName(); System.out.println("File path:" + directory.getAbsolutePath() + "Folder creation:" + directory.getName()); FileUtil.newDir2HDFS(this.uri, this.userName, this.newPath); log.info("[Deleted Directory] : {}", directory.getAbsolutePath()); } public void onDirectoryChange(File directory) { log.info("[Changed Directory] : {}", directory.getAbsolutePath()); } public void onDirectoryDelete(File directory) { log.info("[Created Directory] : {}", directory.getAbsolutePath()); } public void onFileCreate(File file) { try { log.info("[Created File] : {}", file.getAbsolutePath()); this.newHDFSPath = FileUtil.getHDFSPath(file); this.newPath = FileUtil.getDestPath(file); System.out.println("Monitoring source file path:" + file.toPath()); System.out.println("Monitoring source file path:" + file.getAbsolutePath() + "Target HDFS file creation:" + this.newHDFSPath); System.out.println("Monitoring source file path:" + file.getAbsolutePath() + "Target Linux file creation:" + this.newPath); FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newHDFSPath); Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING); } catch (Throwable var3) { throw var3; } } public void onFileChange(File file) { try { log.info("[Amended File] : {}", file.getAbsolutePath()); this.newPath = FileUtil.getDestPath(file); FileUtil.copyFile2HDFS(this.uri, this.userName, file.getAbsolutePath(), this.newPath); Files.copy(file.toPath(), (new File(this.newPath)).toPath(), StandardCopyOption.REPLACE_EXISTING); } catch (Throwable var3) { throw var3; } } public void onFileDelete(File file) { try { log.info("[Deleted File] : {}", file.getAbsolutePath()); this.newHDFSPath = FileUtil.getHDFSPath(file); this.newPath = FileUtil.getDestPath(file); FileUtil.delFile2HDFS(this.uri, this.userName, this.newHDFSPath); Files.delete((new File(this.newPath)).toPath()); } catch (Throwable var3) { throw var3; } } public void onStop(FileAlterationObserver observer) { super.onStop(observer); } }
The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java Skill TreeHomepageOverview 137777 people are learning the system