Java network programming – asynchronous channels and asynchronous operation results

Starting from JDK7, the AsynchronousSockerChannel class and the AsynchronousServerSocketChannel class representing asynchronous channels have been introduced. The functions of these two classes are the same as those of the SocketChannel class and ServerSockelChannel is similar, the difference is that some methods of asynchronous channels always use non-blocking mode, and their non-blocking methods will immediately return a Future object to store the asynchronous operation results of the method

The AsynchronousSocketChannel class has the following non-blocking methods:

// connect to the remote host
Future<Void> connect(SocketAddress remote);
// Read data from the channel and store it in ByteBuffer
// The Future object contains the actual number of bytes read from the channel
Future<Inleger> read(ByteBuffer dst);
// Write the ByteBuffer data into the channel
// The Future object contains the number of bytes actually written to the channel
Future<Integer> write(ByteBuffer src);

The AsynchronousServerSocketChannel class has the following non-blocking methods:

// Accept client connection request
// The Future object contains the AsynchronousSockelChannel object created after the connection is successfully established
Future<AsynchronousSocketChannel> accept();

Using asynchronous channels, programs can be made to perform multiple asynchronous operations in parallel, for example:

SocketAddress socketAddress = ...;
AsynchronousSocketChannel client = AsynchronousSocketChannel. open();

//Request to establish a connection
Future<Void> connected = client. connect(socketAddress);
ByteBuffer byteBuffer = ByteBuffer. allocate(128);

// perform other operations
//...

//Wait for the connection to complete
connected. get();

// read data
Future<Integer> future = client. read(byteBuffer);

// perform other operations
//...

//Wait for the completion of reading data from the channel
future. get();

byteBuffer. flip();
WritableByteChannel out = Channels. newChannel(System. out);
out.write(byteBuffer);

The code in the following example demonstrates the use of an asynchronous channel, which continuously receives the domain name entered by the user and tries to establish a connection, and finally prints the time it takes to establish the connection. If the program cannot connect to the specified host, it prints the relevant error message. If the user enters bye, end the program

//Represents the result of connecting to a host
class PingResult {<!-- -->
    
    InetSocketAddress address;
    long connectStart; //Time to start connection
    long connectFinish = 0; //The time when the connection is successful
    String failure;
    Future<Void> connectResult; //Asynchronous operation result of connection operation
    AsynchronousSocketChannel socketChannel;
    String host;
    final String ERROR = "Connection failed";
        
    PingResult(String host) {<!-- -->
        try {<!-- -->
            this.host = host;
            address = new InetSocketAddress(InetAddress. getByName(host), 80);
        } catch (IOException x) {<!-- -->
            failure = ERROR;
        }
    }
    
    //Print the execution result of connecting to a host
    public void print() {<!-- -->
        String result;
        if (connectFinish != 0) {<!-- -->
            result = Long.toString(connectFinish - connectStart) + "ms";
        } else if (failure != null) {<!-- -->
result = failure;
        } else {<!-- -->
            result = "Timed out";
        }
        System,out,println("ping " + host + "result" + ":" + result);
    }
    
    public class PingClient {<!-- -->
        //Store the queue of all PingResult results
        private LinkedList<PingResult> pingResults = new Linkedlist<PingResult>();
        boolean shutdown = false;
        ExecutorService executorService;
        
        public PingClient() throws IOException {<!-- -->
            executorService = Executors. newFixedThreadPool(4);
            executorService. execute(new Printer());
            receivePingAddress();
        }
    }
    
    public static void main(String args[]) throws IOException {<!-- -->
        new PingClient();
    }
    
    /* Receive the host address entered by the user, and execute the PingHandler task by the thread pool */
    public void receivePingAddress() {<!-- -->
        try {<!-- -->
            BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = null;
            //Receive the host address entered by the user
            while((msg = localReader. readLine()) != null) {<!-- -->
                if(msg.equals("bye")) {<!-- -->
                    shutdown = true;
                    executorService. shutdown();
                    break;
                }
                executorService. execute(new PingHandler(msg));
            }
        } catch(IOException e) {<!-- -->}
    }
    
    /* Try to connect to a specific host, generate a PingResult object, and add it to the PingResults result queue */
    public class PingHandler implements Runnable {<!-- -->
        String msg;
        public PingHandler(String msg) {<!-- -->
            this.msg = msg;
        }
        public void run() {<!-- -->
            if(!msg.equals("bye")) {<!-- -->
                PingResult pingResult = new PingResult(msg);
                AsynchronousSocketChannel socketChannel = null;
                try {<!-- -->
                    socketChannel = AsynchronousSocketChannel. open();
                    pingResult.connectStart = System.currentTimeMillis();
                    synchronized (pingResults) {<!-- -->
                        //Add a PingResult object to the pingResults queue
                        pingResults. add(pingResult);
                        pingResults, notify();
                    }
                    Future<Void> connectResult = socketChannel.connect(pingResult.address);
                    pingResult.connectResult = connectResult;
                } catch (Exception x) {<!-- -->
                    if (socketChannel != null) {<!-- -->
                        try {<!-- --> socketChannel. close();} catch (IOException e) {<!-- -->)
                    }
                    pingResult. failure = pingResult. ERROR;
                }
            }
        }
    }
    
    /* Print the results of tasks that have been executed in the PingResults result queue */
    public class Printer implements Runnable {<!-- -->
        public void run() {<!-- -->
            PingResult pingResult = null;
            while(!shutdown) {<!-- -->
                synchronized (pingResults) {<!-- -->
                    while (!shutdown & amp; & amp; pingResults. size() == 0 ) {<!-- -->
                        try {<!-- -->
                            pingResults.wait(100);
                        } catch(InterruptedException e) {<!-- -->
                            e.printStackTrace();
                        }
                    }
                    if(shutdown & amp; & amp; pingResults. size() == 0 ) break;
                    pingResult = pingResults. getFirst();
                    
                    try {<!-- -->
                        if(pingResult.connectResult != null) {<!-- -->
                            pingResult.connectResult.get(500, TimeUnit, MILLISECONDS);
                        } catch(Exception e) {<!-- -->
                            pingResult. failure = pingResult. ERROR;
                        }
                    }
                    
                    if(pingResult.connectResult != null & amp; & amp; pingResult.connectResult.isDone()) {<!-- -->
                        pingResult.connectFinish = System.currentTimeMillis();
                    }
                    
                    if(pingResult,connectResult != null & amp; & amp; pingResult.connectResult.isDone() || || pingResult,failure != null) {<!-- -->
                        pingResult. print();
                        pingResults. removeFirst();
                        try {<!-- -->
                            pingResult.socketChannel.close();
                        } catch (IOException e) {<!-- -->}
                    }
                }
            }
        }
    }
}

The PingClient class defines two inner classes that represent specific tasks:

  • PingHandler: Responsible for trying to connect to the host address input by the client through an asynchronous channel, and create a PingResult object, which contains the asynchronous operation result of the connection operation, and then add it PingResults result queue
  • Printer: Responsible for printing the task results that have been executed in the PingResults result queue, and the printed PingResult object will be deleted from the queue