Java uses FTP cross-server dynamic monitoring to read file data in the specified directory

Background:

1. The file data is on server A (windows) (generated in the specified directory from time to time), and the project application is deployed on server B (Linux);
2. The project is applied on server B, monitors the directory specified by server A, and if there are newly generated files, reads the file information and persists the data;
3. Provide two pieces of content. The first is to install the windows FTP service; the second is the project source code. I hope it can help you.

There are 4 options in total. The third option was adopted for trial and error, and the fourth option was not tried.

1. Use the method provided by jcsh.jar to read file information, but it requires server A to open an SSH remote connection. Generally, Linux servers are enabled by default and can directly read the connection. Windows systems need to install SSH. Due to the on-site environment, server A It’s Windows 2003, so give up this method.
2. To save the country through curves, use a script (script monitoring is difficult, so I gave up) to regularly store the A server information into the B server (Linux), and then read the file information through jcsh.jar.
3. Install the FTP service through server A, install the FTP client on server B, and use java to dynamically monitor the generated file reading information in the directory.
4. Share the directory specified by server A (equivalent to the shared directory being the directory of service B), and then read it. Since the third option was successful, the fourth option was not tried.

Windows installation FTP service

1. Turn on the ftp service: Control Panel – Programs and Features – Enable or turn off Windows functions – Turn on all in the red box – Click OK

2. Create a new site:
Control Panel – Large Icons – Administrative Tools

IIS Manager

Websites – Add FTP site




The above is the process of installing the FTP service in Windows. I demonstrated the anonymous creation of a site that anyone can access. You can also create a new user, which requires the user to log in before accessing.

Source code

Introduce this dependency

<dependency>
   <groupId>commons-net</groupId>
    <artifactId>commons-net</artifactId>
    <version>3.6</version>
</dependency>

FileChangeData

@Data
public class FileChangeData {

    /**
     *File information
     * */
    private FTPFile ftpFile;

    /**
     * File change type
     * */
    private FileChangeType eventType;

    /**
     * file name
     * */
    private String fileName;

    /**
     * File size
     * */
    private Long fileSize;

    /**
     * FTPClient
     * */
    private FTPClient ftpClient;

    /**
     * Get the file input stream
     * @return InputStream
     * */
    public InputStream getInputStream(String filePathName) {
        //If the event is deleted, the stream cannot be obtained
        if (Objects.equals(eventType, FileChangeType.FILE_DELETED)) {
            return null;
        }

        try {
            return ftpClient.retrieveFileStream(filePathName);
        } catch (IOException e) {
            return null;
        }
    }
}

FileChangeEvent

public interface FileChangeEvent {

    /**
     * This method is triggered when the file changes
     * @param fileChangeData The file has changed
     * */
    @Function
    void change(FileChangeData fileChangeData) throws IOException;
}

FTPService

public interface FTPService {

    /**
     * ftp login
     * @return boolean whether the login is successful
     * */
    FTPClient login();

    /**
     * ftp logout
     * @return boolean whether the logout was successful
     * */
    boolean loginOut();

    /**
     * Get file list
     * @return FTPFile[] file list
     * */
    FTPFile[] listFile();

    /**
     * Monitor folder changes
     * @param fileChangeEvent file change event
     * */
    void addListenerFileChange(FileChangeEvent fileChangeEvent);
}

ListenerChangeRunnable

public interface ListenerChangeRunnable extends Runnable {

    /**
     * Stop listening for files
     * @return boolean whether the stop was successful
     * */
    boolean stopListener();
}

FTPServiceImpl

@Service
public class FTPServiceImpl implements FTPService {

    @Autowired
    private FTPConfig ftpConfig;

    private String SPLIT = ":";

    private ThreadLocal<FTPClient> currentFTPClient;

    private ThreadLocal<ListenerChangeRunnable> currentListener;

    public FTPServiceImpl() {
        this.currentFTPClient = new ThreadLocal<>();
        this.currentListener = new ThreadLocal<>();
    }

    @Override
    public FTPClient login() {
        FTPClient ftpClient = new FTPClient();
        try {
            ftpClient.connect(ftpConfig.getFtpIp(), ftpConfig.getFtpPort());
            ftpClient.login(ftpConfig.getUsername(), ftpConfig.getPassword());
// ftpClient.setControlEncoding("gb2312");
            this.currentFTPClient.set(ftpClient);
            return ftpClient;
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public boolean loginOut() {
        try {
            currentFTPClient.get().logout();
            currentFTPClient.get().disconnect();
            return Boolean.TRUE;
        } catch (Exception e) {
            return Boolean.FALSE;
        }
    }

    @Override
    public FTPFile[] listFile() {
        FTPClient ftpClient = this.currentFTPClient.get();
        try {
            return ftpClient.listFiles();
        } catch (Exception e) {
            return null;
        }
    }

    @Override
    public void addListenerFileChange(FileChangeEvent fileChangeEvent) {
        FTPClient ftpClient = this.currentFTPClient.get();
        ListenerFileChangeThreadRunnable listenerFileChangeThread = new ListenerFileChangeThreadRunnable(ftpClient, fileChangeEvent);
        this.currentListener.set(listenerFileChangeThread);
        new Thread(listenerFileChangeThread).start();
    }
}

ListenerFileChangeThreadRunnable

@Slf4j
public class ListenerFileChangeThreadRunnable implements ListenerChangeRunnable {

    private final FTPClient ftpClient;

    private volatile boolean stop;

    private final Map<String, Long> fileMemory;

    private final FileChangeEvent fileChangeEvent;

    public ListenerFileChangeThreadRunnable(FTPClient ftpClient, FileChangeEvent fileChangeEvent) {
        this.ftpClient = ftpClient;
        this.fileChangeEvent = fileChangeEvent;
        this.fileMemory = new HashMap<>();
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                FTPFile[] ftpFiles = ftpClient.listFiles();

                //Determine if the file is deleted
                if (fileMemory.size() > 0) {
                    Set<String> fileNames = new HashSet<>();
                    for (FTPFile ftpFile : ftpFiles) {
                        if (ftpFile.isDirectory()) {
                            log.info("Folder will not be judged for deletion");
                            continue;
                        }
                        fileNames.add(ftpFile.getName());
                    }
                    Set<Map.Entry<String, Long>> entries = fileMemory.entrySet();
                    for (Map.Entry<String, Long> map : entries) {
                        if (!fileNames.contains(map.getKey())) {
                            log.info("File {} was deleted", map.getKey());
                            FileChangeData fileChangeData = new FileChangeData();
                            fileChangeData.setEventType(FileChangeType.FILE_DELETED);
                            fileChangeData.setFileName(map.getKey());
                            fileChangeData.setFileSize(map.getValue());
                            fileMemory.remove(map.getKey());
                            entries.remove(map.getKey());
                            fileChangeEvent.change(fileChangeData);
                        }
                    }
                }
                //Determine whether the file has been changed or added
                for (FTPFile ftpFile: ftpFiles) {
                    //Determine whether it is a folder
                    if (ftpFile.isDirectory()) {
// log.info("{} does not monitor the file", ftpFile.getName());
                        continue;
                    }
                    FileChangeData fileChangeData = new FileChangeData();
                    fileChangeData.setFileName(ftpFile.getName());
// fileChangeData.setFileName("D:\ftptest\aaa" + ftpFile.getName());
                    fileChangeData.setFileSize(ftpFile.getSize());
                    fileChangeData.setFtpFile(ftpFile);
                    //Whether the file exists in the cache file list
                    if (fileMemory.containsKey(ftpFile.getName())) {
// log.info("File {} already exists in the memory, determine the size", ftpFile.getName());
                        if (!Objects.equals(fileMemory.get(ftpFile.getName()), ftpFile.getSize())) {
// log.info("File {} already exists in the memory and the size is inconsistent, update the cache operation", ftpFile.getName());
                            fileMemory.put(ftpFile.getName(), ftpFile.getSize());
                            fileChangeData.setEventType(FileChangeType.FILE_UPDATE);
                            fileChangeEvent.change(fileChangeData);
                        }
                        continue;
                    }
// log.info("File {} does not exist in memory for caching operation", ftpFile.getName());
                    fileMemory.put(ftpFile.getName(), ftpFile.getSize());
                    fileChangeData.setEventType(FileChangeType.FILE_ADD);
                    fileChangeEvent.change(fileChangeData);
                }
            } catch (Exception e) {
                continue;
                //throw new RuntimeException(e);
            }
            try {
                TimeUnit.SECONDS.sleep(20);
            } catch (InterruptedException e) {
                continue;
                //throw new RuntimeException(e);
            }
        }
    }

    @Override
    public boolean stopListener() {
        this.stop = Boolean.TRUE;
        this.fileMemory.clear();
        return this.stop;
    }
}

FileChangeType

public enum FileChangeType {
    FILE_UPDATE(0, "File update"),
    FILE_ADD(1, "File added"),
    FILE_DELETED(2, "File deleted");

    @Getter
    private Integer type;

    @Getter
    private String desc;

    FileChangeType(Integer type, String desc) {
        this.type = type;
        this.desc = desc;
    }
}

FTPConfig

@Data
@Configuration
public class FTPConfig {

    @Value("${ftp.ip:ftp's ip}")
    private String ftpIp;

    @Value("${ftp.port:ftp port, default 21}")
    private Integer ftpPort;

    @Value("${ftp.username:username created by ftp}")
    private String username;

    @Value("${ftp.password:username and password created by ftp}")
    private String password;
}

SendEmailApplicationTests

@Component
class SendEmailApplicationTests implements ApplicationRunner {
    @Autowired
    private FTPService ftpService;
    void ftpTest() {
        FTPClient ftpClient = ftpService.login();
        ftpService.addListenerFileChange(ftpFile -> {
            System.out.println(String.format("File %s has been changed, file change type %s", ftpFile.getFileName(), ftpFile.getEventType().getDesc()));
            InputStream inputStream = ftpClient.retrieveFileStream("/" + ftpFile.getFileName());
            if(inputStream!=null){
                BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream,"GBK"));
                String s = null;
                List<String> listStr = new ArrayList<>();//The read data is in listStr
                while ((s = reader.readLine()) != null) {
                    System.out.println("===================>" + s);
                    listStr.add(s);
                }
                //Process business logic
                
                inputStream.close();
                reader.close();
                ftpClient.completePendingCommand();
            }
        });
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        ftpTest();
    }
}