websocket server初体验
背景
现在实时语音产品,都使用了websocket协议 [1]。因此,我们自建的语音服务端,也采用websocket协议。
本文将使用Netty,搭建一个最小的websocket服务端。
初体验
最小的websocket服务端,需要3个文件。
依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.74.Final</version>
</dependency>
NettyWebSocketServer
//创建主线程组,接收请求
EventLoopGroup bossGroup = new NioEventLoopGroup();
//创建从线程组,处理主线程组分配下来的io操作
EventLoopGroup workerGroup = new NioEventLoopGroup();
//创建netty服务器
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)//设置主从线程组
.channel(NioServerSocketChannel.class)//设置通道
.childHandler(new NettyWebSocketServerInitializer());//子处理器,用于处理workerGroup中的操作
//启动server
ChannelFuture channelFuture = serverBootstrap.bind(appConfiguration.getWebSocketPort()).sync();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (channelFuture.isSuccess()) {
log.info("listen web socket port {} succeed. ", appConfiguration.getWebSocketPort());
} else {
log.error("listen web socket port {} failed. ", appConfiguration.getWebSocketPort());
}
}
});
//监听关闭channel
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
System.exit(0);
} finally {
//关闭主线程
bossGroup.shutdownGracefully();
//关闭从线程
workerGroup.shutdownGracefully();
}
NettyWebSocketServerInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class NettyWebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
//以下三个是Http的支持
//http解码器
pipeline.addLast(new HttpServerCodec());
//支持写大数据流
pipeline.addLast(new ChunkedWriteHandler());
//http聚合器
pipeline.addLast(new HttpObjectAggregator(1024 * 62));
//websocket支持,设置路由
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
//添加自定义的助手类
pipeline.addLast(new NettyWebSocketHandler());
}
}
NettyWebSocketHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("on channel active");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("on channel inactive");
super.channelInactive(ctx);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.info("on channel registered");
super.channelRegistered(ctx);
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
log.info("on channel unregistered");
super.channelUnregistered(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("on channel read, msg = {}", msg);
super.channelRead(ctx, msg);
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {
log.info("on channel read0");
if (webSocketFrame instanceof TextWebSocketFrame) {
// messages receiving
String requestStr = ((TextWebSocketFrame) webSocketFrame).text();
log.info("The message I received is {}", requestStr);
// Message reply
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("echo");
channelHandlerContext.channel().writeAndFlush(textWebSocketFrame);
} else {
log.info("This is a non text message and will not be processed");
}
}
}
这样就可以运行起来了,剩下的都是业务逻辑了。
原理介绍
来自[2]