Netty初体验-TCP客户端和服务端

  |   0 评论   |   0 浏览

背景

这篇Netty的入门文章姗姗来迟。

初体验

依赖

		<!-- netty all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.45.Final</version>
		</dependency>

服务端

搭建一个Netty Server


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * from https://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-5
 * 
 */
public class MainNettyServer {

	public static class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
			// Discard the received data silently.
			((ByteBuf) msg).release(); // (3)
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
			// Close the connection when an exception is raised.
			cause.printStackTrace();
			ctx.close();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		int port = 6061;

		EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap(); // (2)
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
					.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
						@Override
						public void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast(new DiscardServerHandler());
						}
					}).option(ChannelOption.SO_BACKLOG, 128) // (5)
					.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

			// Bind and start to accept incoming connections.
			ChannelFuture f = b.bind(port).sync(); // (7)

			// Wait until the server socket is closed.
			// In this example, this does not happen, but you can do that to gracefully
			// shut down your server.
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}
}

测试:

% telnet localhost 6061
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello world
^]
telnet> Connection closed.

可见本机启动了 6061端口。

显示接收到的请求内容

修改上面例子中的DiscardServerHandler类,在控制台打印出接收到的内容,如下:

	public static class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
			ByteBuf in = (ByteBuf) msg;
			try {
				while (in.isReadable()) { // (1)
					System.out.print((char) in.readByte());
					System.out.flush();
				}
			} finally {
				ReferenceCountUtil.release(msg); // (2)
			}
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
			// Close the connection when an exception is raised.
			cause.printStackTrace();
			ctx.close();
		}
	}

测试

% telnet localhost 6061
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello
world

回显用户输入的内容

修改上面例子中的DiscardServerHandler类,回写出接收到的内容,如下:

	public static class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)

		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
			ctx.write(msg); // (1)
			ctx.flush(); // (2)
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
			// Close the connection when an exception is raised.
			cause.printStackTrace();
			ctx.close();
		}
	}

测试

% telnet localhost 6061
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
hello world
hello world
hello
hello
world
world

时间服务器

这个例子中,当连接建立后,我们会忽略用户输入的内容,并且尽快发送现在的时间,然后关闭连接。

具体的handler代码如下:

	public static class TimeServerHandler extends ChannelInboundHandlerAdapter {

		@Override
		public void channelActive(final ChannelHandlerContext ctx) { // (1)
			final ByteBuf time = ctx.alloc().buffer(4); // (2)
			time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

			final ChannelFuture f = ctx.writeAndFlush(time); // (3)
			f.addListener(new ChannelFutureListener() {
				@Override
				public void operationComplete(ChannelFuture future) {
					assert f == future;
					ctx.close();
				}
			}); // (4)
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
			cause.printStackTrace();
			ctx.close();
		}
	}

测试:

  1. 安装 rdate
brew install rdate
  1. 将server的端口改为37

  2. 执行命令

% rdate -p localhost
rdate: [localhost]	Thu Jan 23 14:30:09 2020

客户端

时间客户端


import java.util.Date;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class MainNettyClient {

	private static class TimeClientHandler extends ChannelInboundHandlerAdapter {
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) {
			ByteBuf m = (ByteBuf) msg; // (1)
			try {
				long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
				System.out.println(new Date(currentTimeMillis));
				ctx.close();
			} finally {
				m.release();
			}
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
			cause.printStackTrace();
			ctx.close();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		String host = "localhost";
		int port = 37;
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			Bootstrap b = new Bootstrap(); // (1)
			b.group(workerGroup); // (2)
			b.channel(NioSocketChannel.class); // (3)
			b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
			b.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new TimeClientHandler());
				}
			});

			// Start the client.
			ChannelFuture f = b.connect(host, port).sync(); // (5)

			// Wait until the connection is closed.
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
		}
	}
}

测试

Thu Jan 23 14:41:09 CST 2020

带buffer的时间客户端

TCP 协议可以保证整体收到的bytes和发送的bytes是一模一样的,但是不能保证收到的每个包中的bytes和发送的包中的bytes是一模一样的。所以我们需要在客户端做一个合包的逻辑。在收到的字节数小于4时,先存着。在达到4时,再显示出来。具体示例如下:

	private static class TimeClientHandler extends ChannelInboundHandlerAdapter {
		private ByteBuf buf;

		@Override
		public void handlerAdded(ChannelHandlerContext ctx) {
			buf = ctx.alloc().buffer(4); // (1)
		}

		@Override
		public void handlerRemoved(ChannelHandlerContext ctx) {
			buf.release(); // (1)
			buf = null;
		}

		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) {
			ByteBuf m = (ByteBuf) msg;
			buf.writeBytes(m); // (2)
			m.release();

			if (buf.readableBytes() >= 4) { // (3)
				long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
				System.out.println(new Date(currentTimeMillis));
				ctx.close();
			}
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
			cause.printStackTrace();
			ctx.close();
		}
	}

测试

Thu Jan 23 14:51:27 CST 2020

两个Handler的时间客户端

在上面的例子中,一段逻辑是处理请求碎片的,另一段逻辑是处理业务逻辑的。这里可以使用pipeline支持多个Handler的特点,来创建两个Handler,分别处理各自的业务逻辑,来使代码清晰可读。具体如下:


import java.util.Date;
import java.util.List;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

public class MainNettyClient {

	public static class TimeDecoder extends ByteToMessageDecoder { // (1)
		@Override
		protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
			if (in.readableBytes() < 4) {
				return; // (3)
			}

			out.add(in.readBytes(4)); // (4)
		}
	}

	private static class TimeClientHandler extends ChannelInboundHandlerAdapter {
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) {
			ByteBuf m = (ByteBuf) msg; // (1)
			try {
				long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
				System.out.println(new Date(currentTimeMillis));
				ctx.close();
			} finally {
				m.release();
			}
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
			cause.printStackTrace();
			ctx.close();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		String host = "localhost";
		int port = 37;
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			Bootstrap b = new Bootstrap(); // (1)
			b.group(workerGroup); // (2)
			b.channel(NioSocketChannel.class); // (3)
			b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
			b.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
				}
			});

			// Start the client.
			ChannelFuture f = b.connect(host, port).sync(); // (5)

			// Wait until the connection is closed.
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
		}
	}
}

测试

Thu Jan 23 15:00:42 CST 2020

客户端使用 POJO Java对象 而不是 ByteBuf

前面的 ByteBuf 处理起来不友好,这里将返回的对象改为 POJO。


import java.util.Date;
import java.util.List;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;

public class MainNettyClient {

	public static class UnixTime {

		private final long value;

		public UnixTime() {
			this(System.currentTimeMillis() / 1000L + 2208988800L);
		}

		public UnixTime(long value) {
			this.value = value;
		}

		public long value() {
			return value;
		}

		@Override
		public String toString() {
			return new Date((value() - 2208988800L) * 1000L).toString();
		}
	}

	public static class TimeDecoder extends ByteToMessageDecoder { // (1)
		@Override
		protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
			if (in.readableBytes() < 4) {
				return; // (3)
			}

			out.add(new UnixTime(in.readUnsignedInt())); // (4)
		}
	}

	private static class TimeClientHandler extends ChannelInboundHandlerAdapter {
		@Override
		public void channelRead(ChannelHandlerContext ctx, Object msg) {
			UnixTime m = (UnixTime) msg;
			System.out.println(m);
			ctx.close();
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
			cause.printStackTrace();
			ctx.close();
		}
	}

	public static void main(String[] args) throws InterruptedException {
		String host = "localhost";
		int port = 37;
		EventLoopGroup workerGroup = new NioEventLoopGroup();

		try {
			Bootstrap b = new Bootstrap(); // (1)
			b.group(workerGroup); // (2)
			b.channel(NioSocketChannel.class); // (3)
			b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
			b.handler(new ChannelInitializer<SocketChannel>() {
				@Override
				public void initChannel(SocketChannel ch) throws Exception {
					ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
				}
			});

			// Start the client.
			ChannelFuture f = b.connect(host, port).sync(); // (5)

			// Wait until the connection is closed.
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
		}
	}
}

测试

Thu Jan 23 15:06:33 CST 2020

服务端也使用 POJO Java对象 而不是 ByteBuf

import com.alibaba.nls.netty.MainNettyClient.UnixTime;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * from https://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-5
 * 
 */
public class MainNettyServer {

	public static class TimeServerHandler extends ChannelInboundHandlerAdapter {

		@Override
		public void channelActive(final ChannelHandlerContext ctx) { // (1)
			ChannelFuture f = ctx.writeAndFlush(new UnixTime());
			f.addListener(ChannelFutureListener.CLOSE);
		}

		@Override
		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
			cause.printStackTrace();
			ctx.close();
		}
	}

	public static class TimeEncoder extends ChannelOutboundHandlerAdapter {
		@Override
		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
			UnixTime m = (UnixTime) msg;
			ByteBuf encoded = ctx.alloc().buffer(4);
			encoded.writeInt((int) m.value());
			ctx.write(encoded, promise); // (1)
		}
	}

	public static void main(String[] args) throws InterruptedException {
		int port = 37;

		EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		try {
			ServerBootstrap b = new ServerBootstrap(); // (2)
			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3)
					.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
						@Override
						public void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
						}
					}).option(ChannelOption.SO_BACKLOG, 128) // (5)
					.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

			// Bind and start to accept incoming connections.
			ChannelFuture f = b.bind(port).sync(); // (7)

			// Wait until the server socket is closed.
			// In this example, this does not happen, but you can do that to gracefully
			// shut down your server.
			f.channel().closeFuture().sync();
		} finally {
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
		}
	}
}

测试

% rdate -p localhost
rdate: [localhost]	Thu Jan 23 15:10:51 2020

其它

服务器端简化的TimeEncoder

TimeEncoder也可以使用MessageToByteEncoder来简化,如下:

	public static class TimeEncoder extends MessageToByteEncoder<UnixTime> {
		@Override
		protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
			out.writeInt((int) msg.value());
		}
	}

客户端简化的TimeDecoder

TimeDecoder可以使用ReplayingDecoder来简化,如下:

	public static class TimeDecoder extends ReplayingDecoder<Void> {
		@Override
		protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
			out.add(new UnixTime(in.readUnsignedInt()));
		}
	}

参考