Netty framework learning-transmission

Overview

The data flowing through the network always has the same type: bytes. How these bytes are transmitted mainly depends on what we call network transmission. The user does not care about the details of the transmission, only whether the bytes are sent and received reliably

If you use Java network programming, you will find that sometimes when you need to support high concurrent connections, and then you try to switch from blocking transmission to non-blocking transmission, then you will encounter problems because of the completely different APIs. Netty provides a common API, which makes conversion easier.

Traditional transmission method

Here is an introduction to using only the JDK API to implement the blocking (OIO) and non-blocking version (NIO) of the application

The blocking network programming is as follows:

public class PlainOioServer {
    public void server(int port) throws IOException {
        // Bind the server to the specified port
        final ServerSocket socket = new ServerSocket(port);
        try {
            while (true) {
                // Receiving connections
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from " + clientSocket);
                // Create a new thread to handle the connection
                new Thread(() -> {
                    OutputStream out;
                    try {
                        out = clientSocket.getOutputStream();
                        // Writing a message to a connected client
                        out.write("Hi\r\n".getBytes(StandardCharsets.UTF_8));
                        out.flush();
                        // Close connection x
                        clientSocket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

This code can handle a moderate number of concurrent clients, but as the number of concurrent connections increases, you decide to switch to asynchronous network programming, but the asynchronous API is completely different

The non-blocking version is as follows:

public class PlainNioServer {
    public void server(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ssocket = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        // Bind the server to the selected port
        ssocket.bind(address);
        // Open Selector to handle Channel
        Selector selector = Selector.open();
        // Register ServerSocket to Selector to accept connections
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi\r\n".getBytes());
        while (true) {
            try {
                // Waiting for a new event to be processed, blocking will continue until the next incoming event
                selector.select();
            } catch (IOException e) {
                e.printStackTrace();
                break;
            }
            Set<SelectionKey> readKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    // Check if the event is a new connection that is ready to be accepted
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        // Accept the client and register it to the selector
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                        System.out.println("Accepted connection from " + client);
                    }
                    // Check if the socket is ready to write data
                    if (key.isWritable()) {
                        SocketChannel client = (SocketChannel) key.channel();
                        ByteBuffer buffer = (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            // Writing data to a connected client
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException exception) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        cex.printStackTrace();
                    }
                }
            }
        }
    }
}

As you can see, blocking and non-blocking code are completely different. If the program is completely rewritten in order to achieve non-blocking, it is undoubtedly very difficult

Netty-based transmission

The blocking network processing using Netty is as follows:

public class NettyOioServer {
    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi\n\r", StandardCharsets.UTF_8));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    // Using blocking mode
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(
                                    new SimpleChannelInboundHandler<>() {
                                        @Override
                                        protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            ctx.writeAndFlush(buf.duplicate())
                                                    .addListener(ChannelFutureListener.CLOSE);
                                        }
                                    });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

The non-blocking version is almost the same as the blocking version, only two changes are required

EventLoopGroup group = new NioEventLoopGroup();
b.group(group).channel(NioServerSocketChannel.class);

Transport API

The core of the transmission API is the interface Channel, which is used for all IO operations. Each Channel will be assigned a ChannelPipeline and ChannelConfig. ChannelConfig contains all the configuration settings of the Channel. ChannelPipeline holds all ChannelHandler instances that will be applied to inbound and outbound data and events

In addition to accessing the allocated ChannelPipeline and ChannelConfig, other methods of Channel can also be used

Method namedescription
eventLoopReturn the EventLoop assigned to the Channel
pipelineReturn the ChannelPipeline assigned to the Channel
isActiveIf the Channel is active, return true
localAddressReturn the local SocketAddress
remoteAddressReturns the remote SocketAddress
writeWrite data to remote node
flushFlush the previously written data to the bottom layer for transmission
writeAndFlushEquivalent to calling write() followed by flush()

Built-in transmission

Netty has some built-in transmissions that can be used out of the box, but they support different protocols, so you must choose a transmission that is compatible with the protocol used by your application

namepackagedescription
NINEio.netty.channel.socket.nioUse the java.nio.channels package as a base
Epollio.netty.channel.epollEpoll() and non-blocking IO driven by JNI can support multiple features only available on Linux, faster than NIO transmission, and completely non-blocking
OIOio.netty.channel.socket.oioUse java.net package as a base
Localio.netty.channel.localLocal transmission that can communicate through pipes within the VM
Embeddedio.netty.channel.embeddedEmbedded transmission, allowing the use of ChannelHandler without the need for a real network-based transmission, mainly used for testing

Leave a Reply