“Beyond Limits – How to Use Netty to Process Large Data Efficiently?” – Master Netty Skills to Easily Handle Massive Data Processing!

1 Write large data

How to efficiently write large chunks of data in an asynchronous framework is a special problem because of the possibility of network saturation. Since the write operation is non-blocking, even if not all data has been written, the write operation will return and notify the ChannelFuture when complete. When this happens, you risk running out of memory if you still keep writing. So when writing large data, you need to be prepared to deal with the case where the connection to the remote node is a slow connection, which will cause a delay in memory release.

Consider writing the contents of a file out to the network. When discussing transfers (see Section 4.2), NIO’s zero-copy is mentioned, which eliminates the copying process that moves file contents from the file system to the network stack. All of this happens in the core of Netty, so all the application needs to do is to use the FileRegion interface, which is defined in Netty’s API documentation as: “Files sent through a Channel that supports zero-copy file transfer area.”

Listing 11-11 shows how to use the zero-copy feature to transfer the contents of a file by creating a DefaultFileRegion from the FileInputStream and writing it to the Channel (you can even use io.netty.channel.ChannelProgressivePromise to get the progress of the transfer in real time).

package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.File;
import java.io.FileInputStream;

/**
 * Listing 11.11 Using FileRegion to transfer file content
 */
public class FileRegionWriteHandler extends ChannelInboundHandlerAdapter {<!-- -->
    private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();
    private static final File FILE_FROM_SOMEWHERE = new File("");

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {<!-- -->
        File file = FILE_FROM_SOMEWHERE; //get reference from somewhere
        Channel channel = CHANNEL_FROM_SOMEWHERE; //get reference from somewhere
        //...
        // Create a FileInputStream
        FileInputStream in = new FileInputStream(file);

        // Create a new DefaultFileRegion with the full length of the file
        FileRegion region = new DefaultFileRegion(in. getChannel(), 0, file. length());
        // Send the DefaultFileRegion and register a ChannelFutureListener
        channel.writeAndFlush(region).addListener(
                new ChannelFutureListener() {<!-- -->
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {<!-- -->
                        if (!future.isSuccess()) {<!-- -->
                            // handle failure
                            Throwable cause = future. cause();
                            // Do something
                        }
                    }
                });
    }
}

This example applies only to the direct transfer of file content and does not include any processing of the data by the application. When data needs to be copied from the file system into user memory, ChunkedWriteHandler can be used, which supports asynchronous writing of large data streams without incurring large memory consumption.

The key is interface ChunkedInput, the type parameter B is the type returned by the readChunk() method. Netty presets 4 implementations of this interface, as shown in Table 11-7:

Table 11-7: Implementation of ChunkedInputEach of them represents a ChunkedWriteHandler Handle data streams of variable length.

Code Listing 11-12 illustrates ChunkedStream usage, the most commonly used implementation. The class shown is instantiated using File and SslContext. When initChannel() is called, it will initialize the Channel with the ChannelHandler chain shown.

Table 11-7: Implementation of ChunkedInput

When the state of the Channel becomes active, the WriteStreamHandler will write the data from the file as a ChunkedStream chunk by chunk. Data will be encrypted by SslHandler before transmission.

package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.io.File;
import java.io.FileInputStream;

/**
 * 11.12 Use ChunkedStream to transfer file content
 */
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {<!-- -->

    private final File file;

    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {<!-- -->
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {<!-- -->
        ChannelPipeline pipeline = ch. pipeline();
        // Add SslHandler to ChannelPipeline
        pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc())));
        // Add ChunkedWriteHandler to handle data passed in as ChunkedInput
        pipeline. addLast(new ChunkedWriteHandler());
        // Once the connection is established, WriteStreamHandler starts writing file data
        pipeline. addLast(new WriteStreamHandler());
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {<!-- -->

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {<!-- -->
            // When the connection is established, channelActive() will use ChunkedInput to write file data
            super. channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}

Input block by block

To use your own implementation of ChunkedInput, install a ChunkedWriteHandler in the ChannelPipeline.

This section discusses how to efficiently transfer files by using the zero-copy feature, and how to write large data without risking OOM by using ChunkedWriteHandler. The next section examines several ways to serialize POJOs.

2 Serialized data

JDK provides ObjectOutputStream and ObjectInputStream for serializing and deserializing POJO’s basic data types and graphs over the network. The API is uncomplicated and can be applied to any object that implements the java.io.Serializable interface. But its performance is not very efficient. In this section, we’ll see what Netty has to offer for this.

2.1 JDK Serialization

If your application must interact with remote nodes that use ObjectOutputStream and ObjectInputStream, and compatibility is your main concern, then JDK serialization will be the right choice. Table 11-8 lists the options provided by Netty for and Serialization classes for JDK interoperability:

The CompatibleObjectDecoder class has been deprecated in Netty 3.1 and does not exist in Netty 4.x: https://issues.jboss.org/browse/NETTY-136

If you are free to use external dependencies, JBoss Marshalling is ideal: it is up to 3 times faster than JDK serialization and more compact. An overview on the homepage of the JBoss Marshalling official website defines it: JBoss Marshalling is an optional serialization API that fixes many of the problems found in the JDK serialization API while retaining compatibility with java.io.Serializable and its related classes and added several new tunable parameters and extra features, all of which are pluggable through factory configuration (such as external serializers, class/instance lookup tables, class resolution, and object replacement, etc.).

2.2 Serialization with JBoss Marshalling

Netty provides support for Boss Marshalling through two sets of decoder/encoder pairs shown in Table 11-9:

  • The first set of compatible remote nodes that only use JDK serialization
  • The second set provides maximum performance and is suitable for use with remote nodes using JBoss Marshalling

Table 11-9: JBoss Marshalling Codec

Listing 11-13 shows how to use MarshallingDecoder and MarshallingEncoder. Again, pretty much just configuring the ChannelPipeline appropriately.

package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import java.io.Serializable;

/**
 * 11.13 Using JBoss Marshalling
 */
public class MarshallingInitializer extends ChannelInitializer<Channel> {<!-- -->
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {<!-- -->
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {<!-- -->
        ChannelPipeline pipeline = channel. pipeline();
        // Add MarshallingDecoder to convert ByteBuf to POJO
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        // Add MarshallingEncoder to convert POJO to ByteBuf
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline. addLast(new ObjectHandler());
    }

    // Add ObjectHandler to handle ordinary POJOs that implement the Serializable interface
    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {<!-- -->

        @Override
        public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {<!-- -->
            // Do something
        }
    }
}

2.3 Serialization via Protocol Buffers

The final solution for Netty serialization is to use the codec of Protocol Buffers (https://protobuf.dev/), a data exchange format developed by Google and now open source. Source code can be found at https://github.com/google/protobuf. Protocol Buffers encode and decode structured data in a compact and efficient manner. It has bindings for many programming languages, making it ideal for cross-language projects. Table 11-10 shows the ChannelHandler implementations provided by Netty to support protobuf.

Table 11-10: Protobuf codec

Using protobuf is just a matter of adding the correct ChannelHandler to the ChannelPipeline, as shown in Listing 11-14.

package io.netty.example.cp11;

import com.google.protobuf.MessageLite;
import io.netty.channel.*;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

/**
 * Listing 11.14 Using protobuf
 */
public class ProtoBufInitializer extends ChannelInitializer<Channel> {<!-- -->
    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {<!-- -->
        this. lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {<!-- -->
        ChannelPipeline pipeline = ch. pipeline();
        // Add ProtobufVarint32FrameDecoder to separate frames
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        // Also need to add a corresponding ProtobufVarint32LengthFieldPrepender before the current ProtobufEncoder to encode the frame length information
        // Add ProtobufEncoder to handle encoding of messages
        pipeline. addLast(new ProtobufEncoder());
        // Add ProtobufDecoder to decode messages
        pipeline.addLast(new ProtobufDecoder(lite));
        // Add ObjectHandler to handle decoding messages
        pipeline. addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {<!-- -->

        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {<!-- -->
            // Do something with the object
        }
    }
}

This section explores the different serialization options supported by Netty-specific decoders and encoders: standard JDK serialization, JBoss Marshalling, and Google’s Protocol Buffers.

3 Summary

The codecs and various ChannelHandlers provided by Netty can be combined and extended to achieve a very wide range of processing scenarios. Furthermore, they are proven, robust components that have been used by many large systems.

We only cover the most common examples; Netty’s API documentation provides more comprehensive coverage.

In the next chapter, learn another advanced protocol – WebSocket, which was developed to improve the performance and responsiveness of web applications. Netty provides the tools you’ll need to take advantage of its powerful capabilities quickly and easily.