Communication (2): WebSocket real-time monitoring log

refer to :

Java monitors local logs and realizes real-time viewing Develop Paper

HTML5 – Detailed explanation of Web Socket usage (with samples)

SpringBoot uses WebSocket_springboot websocket_Looking at the Milky Way Blog-CSDN Blog

java

maven dependencies

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
FileMonitor
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FileMonitor {

    /**
     * Binding websocket
     */
    private String sessionId;

    /**
     * Bound monitoring log path
     */
    private String logPath;

    /**
     * Monitoring time interval, unit ms
     */
    private Long monitorDelay;

    public FileMonitor(String sessionId, String logPath) {
        this.sessionId = sessionId;
        this. logPath = logPath;
        this.monitorDelay = 500L;
        startFileMonitor(monitorDelay);
    }

    public FileMonitor(String sessionId, String logPath, Long monitorDelay) {
        this.sessionId = sessionId;
        this. logPath = logPath;
        this.monitorDelay = monitorDelay;
        startFileMonitor(monitorDelay);
    }

    private void startFileMonitor(Long monitorDelay) {
        Thread thread = new Thread(new FileMonitorRunnable(sessionId, logPath, monitorDelay));
        thread. start();
    }
}
FileMonitorRunnable
import com.alibaba.gts.flm.common.utils.ShellUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.StandardCharsets;

@Slf4j
public class FileMonitorRunnable implements Runnable {

    private ByteBuffer byteBuffer = ByteBuffer. allocate(1024 * 100);

    private CharBuffer charBuffer = CharBuffer. allocate(1024 * 50);

    private CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();

    private boolean isRunning = true;

    private String sessionId;

    private String logPath;

    private Long monitorDelay;

    public FileMonitorRunnable(String sessionId, String logPath, Long monitorDelay) {
        this.sessionId = sessionId;
        this. logPath = logPath;
        this.monitorDelay = monitorDelay;
    }

    @Override
    public void run() {
        File file = new File(logPath);
        FileChannel channel = null;
        try {
            channel = new FileInputStream(file).getChannel();
            channel. position(channel. size());
        } catch (Exception e) {
            log.info("Failed to monitor the file, check whether the path is correct");
            WebSocketUtils.endMonitor(sessionId);
            e.printStackTrace();
        }
        long lastModified = file. lastModified();
        // initial connection
        WebSocketUtils.sendMessageTo(sessionId, ShellUtil.exec("tail -n 100 " + logPath));
        // keep listening
        while (isRunning) {
            long now = file. lastModified();
            if (now != lastModified) {
                log.info("{}'s connection is monitoring {}'s file update via thread {}", sessionId, Thread.currentThread().getName(), logPath);
                String newContent = getNewContent(channel);
                WebSocketUtils.sendMessageTo(sessionId, newContent);
                lastModified = now;
            }
            try {
                Thread. sleep(monitorDelay);
            } catch (InterruptedException e) {
                e.printStackTrace();
                WebSocketUtils.endMonitor(sessionId);
            }
            isRunning = WebSocketUtils. currentSessionAlive(sessionId);
        }
    }

    private String getNewContent(FileChannel channel) {
        try {
            byteBuffer. clear();
            charBuffer. clear();
            int length = channel. read(byteBuffer);
            if (length != -1) {
                byteBuffer. flip();
                decoder.decode(byteBuffer, charBuffer, true);
                charBuffer. flip();
                return charBuffer.toString();
            } else {
                channel. position(channel. size());
            }
        } catch (Exception e) {
            e.printStackTrace();
            WebSocketUtils.endMonitor(sessionId);
        }
        return null;
    }
}
WebSocketConfig
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

import javax.servlet.ServletContext;
import javax.servlet.ServletException;

/**
 * @author: xxt
 * @date: 2022/5/23 16:22
 * @Description: Enable WebSocket support
 */

@Configuration
public class WebSocketConfig implements ServletContextInitializer {

    /**
     * The registration of this bean is used to scan the annotation with @ServerEndpoint to become websocket. If you use an external tomcat, you don't need this configuration file
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

    @Override
    public void onStartup(ServletContext servletContext) throws ServletException {

    }

}
WebSocketSeverRealTimeLogs
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

/**
 * @author: xxt
 * @date: 2022/5/23 16:27
 * @Description: WebSocket operation class
 */
@ServerEndpoint("/websocket/realTimeLogs/{fileName}")
@Component
@Slf4j
public class WebSocketSeverRealTimeLogs {

    private Session session;

    /**
     * Establish a WebSocket connection
     */
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "fileName") String fileName) {
        String sessionId = session. getId();
        log.info("WebSocket is establishing a connection, file: {}, session.id", fileName, sessionId);
        Session historySession = WebSocketUtils. getSession(sessionId);
        // historySession is not empty, indicating that someone has logged into the account, and the logged-in WebSocket object should be deleted
        if (historySession != null) {
            log.warn("The old connection is not empty, log out the old connection");
            WebSocketUtils. reduceSession(sessionId);
        }
        // establish connection
        WebSocketUtils. addSession(session);
        this.session = session;
        // TODO cannot directly pass the file path, it can be transferred after converting to base64
        WebSocketUtils.startMonitor(sessionId, "/Users/liyue/Workspace6/flm/logs/" + fileName);
    }

    /**
     * An error occurred
     *
     * @param throwable e
     */
    @OnError
    public void onError(Throwable throwable) {
        throwable. printStackTrace();
    }

    /**
     * Connection closed
     */
    @OnClose
    public void onClose() {
        WebSocketUtils.reduceSession(session.getId());
    }

    /**
     * Receive client messages
     *
     * @param message received message
     */
    @OnMessage
    public void onMessage(String message) {
        log.info("Received a message from the client: {}", message);
    }


}

WebSocketUtils
import lombok.extern.slf4j.Slf4j;

import javax. websocket. Session;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@Slf4j
public class WebSocketUtils {

    /**
     * connected websocket
     */
    private static Map<String, Session> onlineSession = new HashMap<>();

    /**
     * Add user
     */
    public static void addSession(Session session) {
        onlineSession.put(session.getId(), session);
        log.info("The connection is established, sessionId: {}, the current online number is: {}", session.getId(), onlineSession.size());
    }

    /**
     * get user
     */
    public static Session getSession(String sessionId) {
        return onlineSession.get(sessionId);
    }

    /**
     * remove user
     */
    public static void reduceSession(String sessionId) {
        onlineSession. remove(sessionId);
        log.info("sessionId({}) is disconnected, the current online number is: {}", sessionId, onlineSession.size());
    }

    /**
     * Turn on monitoring
     * The essence is to monitor one thread
     */
    public static void startMonitor(String sessionId, String logPath) {
        Session session = onlineSession. get(sessionId);
        new FileMonitor(session. getId(), logPath);
    }

    /**
     * Turn off monitoring
     * When the session is closed, the corresponding thread will also be closed
     */
    public static void endMonitor(String sessionId) {
        Session session = onlineSession. get(sessionId);
        sendMessageTo(sessionId, "<error>ERROR The monitoring thread is abnormal!</error>");
        try {
            session. close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * Send a message to a specified user
     *
     * @param sessionId
     * @param message
     */
    public static void sendMessageTo(String sessionId, String message) {
        Session session = onlineSession. get(sessionId);
        try {
            session.getBasicRemote().sendText(message);
        } catch (Exception e) {
            e.printStackTrace();
            endMonitor(sessionId);
        }
    }

    /**
     * Whether the session is online
     * Used to determine whether the thread is closed
     *
     * @param sessionId
     * @return
     */
    public static boolean currentSessionAlive(String sessionId) {
        return onlineSession. containsKey(sessionId);
    }

    /**
     * Group message
     */
    public static void sendAllMessage(String message) {
        log.info("Send message: {}", message);
        onlineSession. forEach((k, v) -> {
            try {
                v. getBasicRemote(). sendText(message);
            } catch (IOException e) {
                log.error("An error occurred in mass sending message:" + e.getMessage(), e);
            }
        });
    }
}

html

<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>hangge.com</title>
<style>
</style>
<script type="text/javascript">
// used to fill in the input box for sending messages
var messageBox;
// div container for displaying messages
var messageLog;
//WebSocket object
var socket;

//page loaded
window.onload = function() {
messageBox = document.getElementById('messageBox');
messageLog = document.getElementById('messageLog');
//wsAddress = document.getElementById('wsAddress');
}

//Create a socket object and bind all events
function connect() {
//Create socket object
socket = new WebSocket(wsAddress. value);
console.log(wsAddress.value)

//Listen to all Web socket events
socket.onopen = connectionOpen;
socket.onmessage = messageReceived;
socket.onerror = errorOccurred;
socket.onclose = connectionClosed;
}

// Disconnect button click
function disconnect() {
socket. close();
}

//Send message button click
function sendMessage() {
// get the data to send
var message = messageBox. value;

//Send message through socket
socket. send(message);

// tell the user what just happened
messageLog.innerHTML + = "<br>Send: " + message;
}

//Connection established event response
function connectionOpen(e) {
messageLog.innerHTML + = "<br>--- Socket connection successful ---";
}

//Message received event response
function messageReceived(e) {
messageLog.innerHTML + =e.data;
console. log(e. data)
}

// error event response
function errorOccurred(e) {
messageLog.innerHTML + = "<br>An error occurred: " + e.data;
}

//connection close event response
function connectionClosed(e) {
messageLog.innerHTML + = "<br>--- Socket connection closed ---";
}
</script>
</head>
<body>
<input id="wsAddress" value="ws://127.0.0.1:8080/websocket/realTimeLogs/1.log" style="width: 400px;" />
<button onclick="connect()">Connect</button>
<button onclick="disconnect()">Disconnect</button>

<br />
<textarea id="messageLog" readonly cols="200" rows="100"></textarea>
</body>
</html>