websocket server初体验
背景
现在实时语音产品,都使用了websocket协议 [1]。因此,我们自建的语音服务端,也采用websocket协议。
本文将使用Netty,搭建一个最小的websocket服务端。
初体验
最小的websocket服务端,需要3个文件。
依赖
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.74.Final</version>
        </dependency>
NettyWebSocketServer
//创建主线程组,接收请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        //创建从线程组,处理主线程组分配下来的io操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        //创建netty服务器
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)//设置主从线程组
                    .channel(NioServerSocketChannel.class)//设置通道
                    .childHandler(new NettyWebSocketServerInitializer());//子处理器,用于处理workerGroup中的操作
            //启动server
            ChannelFuture channelFuture = serverBootstrap.bind(appConfiguration.getWebSocketPort()).sync();
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (channelFuture.isSuccess()) {
                        log.info("listen web socket port {} succeed. ", appConfiguration.getWebSocketPort());
                    } else {
                        log.error("listen web socket port {} failed. ", appConfiguration.getWebSocketPort());
                    }
                }
            });
            //监听关闭channel
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(0);
        } finally {
            //关闭主线程
            bossGroup.shutdownGracefully();
            //关闭从线程
            workerGroup.shutdownGracefully();
        }
NettyWebSocketServerInitializer
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class NettyWebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //以下三个是Http的支持
        //http解码器
        pipeline.addLast(new HttpServerCodec());
        //支持写大数据流
        pipeline.addLast(new ChunkedWriteHandler());
        //http聚合器
        pipeline.addLast(new HttpObjectAggregator(1024 * 62));
        //websocket支持,设置路由
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //添加自定义的助手类
        pipeline.addLast(new NettyWebSocketHandler());
    }
}
NettyWebSocketHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel active");
        super.channelActive(ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel inactive");
        super.channelInactive(ctx);
    }
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel registered");
        super.channelRegistered(ctx);
    }
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel unregistered");
        super.channelUnregistered(ctx);
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("on channel read, msg = {}", msg);
        super.channelRead(ctx, msg);
    }
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {
        log.info("on channel read0");
        if (webSocketFrame instanceof TextWebSocketFrame) {
            // messages receiving
            String requestStr = ((TextWebSocketFrame) webSocketFrame).text();
            log.info("The message I received is {}", requestStr);
            // Message reply
            TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("echo");
            channelHandlerContext.channel().writeAndFlush(textWebSocketFrame);
        } else {
            log.info("This is a non text message and will not be processed");
        }
    }
}
这样就可以运行起来了,剩下的都是业务逻辑了。
原理介绍
来自[2]
