1 Write large data
How to efficiently write large chunks of data in an asynchronous framework is a special problem because of the possibility of network saturation. Since the write operation is non-blocking, even if not all data has been written, the write operation will return and notify the ChannelFuture when complete. When this happens, you risk running out of memory if you still keep writing. So when writing large data, you need to be prepared to deal with the case where the connection to the remote node is a slow connection, which will cause a delay in memory release.
Consider writing the contents of a file out to the network. When discussing transfers (see Section 4.2), NIO’s zero-copy is mentioned, which eliminates the copying process that moves file contents from the file system to the network stack. All of this happens in the core of Netty, so all the application needs to do is to use the FileRegion interface, which is defined in Netty’s API documentation as: “Files sent through a Channel that supports zero-copy file transfer area.”
Listing 11-11 shows how to use the zero-copy feature to transfer the contents of a file by creating a DefaultFileRegion from the FileInputStream and writing it to the Channel (you can even use io.netty.channel.ChannelProgressivePromise to get the progress of the transfer in real time).
package io.netty.example.cp11; import io.netty.channel.*; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.File; import java.io.FileInputStream; /** * Listing 11.11 Using FileRegion to transfer file content */ public class FileRegionWriteHandler extends ChannelInboundHandlerAdapter {<!-- --> private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel(); private static final File FILE_FROM_SOMEWHERE = new File(""); @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception {<!-- --> File file = FILE_FROM_SOMEWHERE; //get reference from somewhere Channel channel = CHANNEL_FROM_SOMEWHERE; //get reference from somewhere //... // Create a FileInputStream FileInputStream in = new FileInputStream(file); // Create a new DefaultFileRegion with the full length of the file FileRegion region = new DefaultFileRegion(in. getChannel(), 0, file. length()); // Send the DefaultFileRegion and register a ChannelFutureListener channel.writeAndFlush(region).addListener( new ChannelFutureListener() {<!-- --> @Override public void operationComplete(ChannelFuture future) throws Exception {<!-- --> if (!future.isSuccess()) {<!-- --> // handle failure Throwable cause = future. cause(); // Do something } } }); } }
This example applies only to the direct transfer of file content and does not include any processing of the data by the application. When data needs to be copied from the file system into user memory, ChunkedWriteHandler can be used, which supports asynchronous writing of large data streams without incurring large memory consumption.
The key is interface ChunkedInput, the type parameter B is the type returned by the readChunk() method. Netty presets 4 implementations of this interface, as shown in Table 11-7:
Each of them represents a ChunkedWriteHandler Handle data streams of variable length.
Code Listing 11-12 illustrates ChunkedStream usage, the most commonly used implementation. The class shown is instantiated using File and SslContext. When initChannel() is called, it will initialize the Channel with the ChannelHandler chain shown.
When the state of the Channel becomes active, the WriteStreamHandler will write the data from the file as a ChunkedStream chunk by chunk. Data will be encrypted by SslHandler before transmission.
package io.netty.example.cp11; import io.netty.channel.*; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslHandler; import io.netty.handler.stream.ChunkedStream; import io.netty.handler.stream.ChunkedWriteHandler; import java.io.File; import java.io.FileInputStream; /** * 11.12 Use ChunkedStream to transfer file content */ public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {<!-- --> private final File file; private final SslContext sslCtx; public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {<!-- --> this.file = file; this.sslCtx = sslCtx; } @Override protected void initChannel(Channel ch) throws Exception {<!-- --> ChannelPipeline pipeline = ch. pipeline(); // Add SslHandler to ChannelPipeline pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc()))); // Add ChunkedWriteHandler to handle data passed in as ChunkedInput pipeline. addLast(new ChunkedWriteHandler()); // Once the connection is established, WriteStreamHandler starts writing file data pipeline. addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {<!-- --> @Override public void channelActive(ChannelHandlerContext ctx) throws Exception {<!-- --> // When the connection is established, channelActive() will use ChunkedInput to write file data super. channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } } }
Input block by block
To use your own implementation of ChunkedInput, install a ChunkedWriteHandler in the ChannelPipeline.
This section discusses how to efficiently transfer files by using the zero-copy feature, and how to write large data without risking OOM by using ChunkedWriteHandler. The next section examines several ways to serialize POJOs.
2 Serialized data
JDK provides ObjectOutputStream and ObjectInputStream for serializing and deserializing POJO’s basic data types and graphs over the network. The API is uncomplicated and can be applied to any object that implements the java.io.Serializable interface. But its performance is not very efficient. In this section, we’ll see what Netty has to offer for this.
2.1 JDK Serialization
If your application must interact with remote nodes that use ObjectOutputStream and ObjectInputStream, and compatibility is your main concern, then JDK serialization will be the right choice. Table 11-8 lists the options provided by Netty for and Serialization classes for JDK interoperability:
The CompatibleObjectDecoder class has been deprecated in Netty 3.1 and does not exist in Netty 4.x: https://issues.jboss.org/browse/NETTY-136
If you are free to use external dependencies, JBoss Marshalling is ideal: it is up to 3 times faster than JDK serialization and more compact. An overview on the homepage of the JBoss Marshalling official website defines it: JBoss Marshalling is an optional serialization API that fixes many of the problems found in the JDK serialization API while retaining compatibility with java.io.Serializable and its related classes and added several new tunable parameters and extra features, all of which are pluggable through factory configuration (such as external serializers, class/instance lookup tables, class resolution, and object replacement, etc.).
2.2 Serialization with JBoss Marshalling
Netty provides support for Boss Marshalling through two sets of decoder/encoder pairs shown in Table 11-9:
- The first set of compatible remote nodes that only use JDK serialization
- The second set provides maximum performance and is suitable for use with remote nodes using JBoss Marshalling
Listing 11-13 shows how to use MarshallingDecoder and MarshallingEncoder. Again, pretty much just configuring the ChannelPipeline appropriately.
package io.netty.example.cp11; import io.netty.channel.*; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import java.io.Serializable; /** * 11.13 Using JBoss Marshalling */ public class MarshallingInitializer extends ChannelInitializer<Channel> {<!-- --> private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {<!-- --> this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel channel) throws Exception {<!-- --> ChannelPipeline pipeline = channel. pipeline(); // Add MarshallingDecoder to convert ByteBuf to POJO pipeline.addLast(new MarshallingDecoder(unmarshallerProvider)); // Add MarshallingEncoder to convert POJO to ByteBuf pipeline.addLast(new MarshallingEncoder(marshallerProvider)); pipeline. addLast(new ObjectHandler()); } // Add ObjectHandler to handle ordinary POJOs that implement the Serializable interface public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {<!-- --> @Override public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {<!-- --> // Do something } } }
2.3 Serialization via Protocol Buffers
The final solution for Netty serialization is to use the codec of Protocol Buffers (https://protobuf.dev/), a data exchange format developed by Google and now open source. Source code can be found at https://github.com/google/protobuf. Protocol Buffers encode and decode structured data in a compact and efficient manner. It has bindings for many programming languages, making it ideal for cross-language projects. Table 11-10 shows the ChannelHandler implementations provided by Netty to support protobuf.
Using protobuf is just a matter of adding the correct ChannelHandler to the ChannelPipeline, as shown in Listing 11-14.
package io.netty.example.cp11; import com.google.protobuf.MessageLite; import io.netty.channel.*; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; /** * Listing 11.14 Using protobuf */ public class ProtoBufInitializer extends ChannelInitializer<Channel> {<!-- --> private final MessageLite lite; public ProtoBufInitializer(MessageLite lite) {<!-- --> this. lite = lite; } @Override protected void initChannel(Channel ch) throws Exception {<!-- --> ChannelPipeline pipeline = ch. pipeline(); // Add ProtobufVarint32FrameDecoder to separate frames pipeline.addLast(new ProtobufVarint32FrameDecoder()); // Also need to add a corresponding ProtobufVarint32LengthFieldPrepender before the current ProtobufEncoder to encode the frame length information // Add ProtobufEncoder to handle encoding of messages pipeline. addLast(new ProtobufEncoder()); // Add ProtobufDecoder to decode messages pipeline.addLast(new ProtobufDecoder(lite)); // Add ObjectHandler to handle decoding messages pipeline. addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {<!-- --> @Override public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {<!-- --> // Do something with the object } } }
This section explores the different serialization options supported by Netty-specific decoders and encoders: standard JDK serialization, JBoss Marshalling, and Google’s Protocol Buffers.
3 Summary
The codecs and various ChannelHandlers provided by Netty can be combined and extended to achieve a very wide range of processing scenarios. Furthermore, they are proven, robust components that have been used by many large systems.
We only cover the most common examples; Netty’s API documentation provides more comprehensive coverage.
In the next chapter, learn another advanced protocol – WebSocket, which was developed to improve the performance and responsiveness of web applications. Netty provides the tools you’ll need to take advantage of its powerful capabilities quickly and easily.