Netty初体验-HTTP服务端和客户端
背景
前文介绍了使用Netty实现的TCP Server和Client。本文介绍对应的HTTP Server和Client。
初体验
依赖
<!-- netty all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.45.Final</version>
</dependency>
服务端
import org.apache.http.Consts;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpServerExpectContinueHandler;
import io.netty.handler.codec.http.HttpUtil;
public class MainNettyHttpServer {
public static class HttpServerInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
boolean keepAlive = HttpUtil.isKeepAlive(request);
String responseStr = "hello world. ";
FullHttpResponse response = new DefaultFullHttpResponse(request.protocolVersion(),
HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseStr.getBytes(Consts.UTF_8)));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
.setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
if (!request.protocolVersion().isKeepAliveDefault()) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
} else {
// Tell the client we're going to close the connection.
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
}
ChannelFuture f = ctx.write(response);
if (!keepAlive) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
public static void main(String[] args) throws InterruptedException {
int port = 6061;
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpServerExpectContinueHandler());
ch.pipeline().addLast(new HttpServerInboundHandler());
}
}).option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
测试
% curl -s -vv "localhost:6061"
* Trying ::1...
* TCP_NODELAY set
* Connected to localhost (::1) port 6061 (#0)
> GET / HTTP/1.1
> Host: localhost:6061
> User-Agent: curl/7.64.1
> Accept: */*
>
< HTTP/1.1 200 OK
< content-type: text/plain
< content-length: 13
<
* Connection #0 to host localhost left intact
hello world. * Closing connection 0
客户端
import org.apache.http.Consts;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
public class MainNettyHttpClient {
private static class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
// Write request
FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
// 构建http请求
request.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=UTF-8");
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, request.content().readableBytes());
// 发送http请求
ctx.writeAndFlush(request);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof FullHttpResponse) {
FullHttpResponse response = (FullHttpResponse) msg;
ByteBuf content = response.content();
System.out.println("response: " + content.toString(Consts.UTF_8));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
public static void main(String[] args) throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 包含编码器和解码器
ch.pipeline().addLast(new HttpClientCodec());
// 聚合
ch.pipeline().addLast(new HttpObjectAggregator(1024 * 10 * 1024));
// 解压
ch.pipeline().addLast(new HttpContentDecompressor());
ch.pipeline().addLast(new HttpClientInboundHandler());
}
});
String host = "localhost";
int port = 6061;
// Start the client.
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
测试
response: hello world.