websocket server初体验

  |   0 评论   |   0 浏览

背景

现在实时语音产品,都使用了websocket协议 [1]。因此,我们自建的语音服务端,也采用websocket协议。

本文将使用Netty,搭建一个最小的websocket服务端。

初体验

最小的websocket服务端,需要3个文件。

依赖

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.74.Final</version>
        </dependency>

NettyWebSocketServer

//创建主线程组,接收请求
        EventLoopGroup bossGroup = new NioEventLoopGroup();

        //创建从线程组,处理主线程组分配下来的io操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        //创建netty服务器
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)//设置主从线程组
                    .channel(NioServerSocketChannel.class)//设置通道
                    .childHandler(new NettyWebSocketServerInitializer());//子处理器,用于处理workerGroup中的操作

            //启动server
            ChannelFuture channelFuture = serverBootstrap.bind(appConfiguration.getWebSocketPort()).sync();

            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (channelFuture.isSuccess()) {
                        log.info("listen web socket port {} succeed. ", appConfiguration.getWebSocketPort());
                    } else {
                        log.error("listen web socket port {} failed. ", appConfiguration.getWebSocketPort());
                    }
                }
            });

            //监听关闭channel
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
            System.exit(0);
        } finally {
            //关闭主线程
            bossGroup.shutdownGracefully();

            //关闭从线程
            workerGroup.shutdownGracefully();
        }

NettyWebSocketServerInitializer

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;

public class NettyWebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        ChannelPipeline pipeline = socketChannel.pipeline();
        //以下三个是Http的支持
        //http解码器
        pipeline.addLast(new HttpServerCodec());
        //支持写大数据流
        pipeline.addLast(new ChunkedWriteHandler());
        //http聚合器
        pipeline.addLast(new HttpObjectAggregator(1024 * 62));

        //websocket支持,设置路由
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        //添加自定义的助手类
        pipeline.addLast(new NettyWebSocketHandler());
    }
}

NettyWebSocketHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel active");
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel inactive");
        super.channelInactive(ctx);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel registered");
        super.channelRegistered(ctx);
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("on channel unregistered");
        super.channelUnregistered(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("on channel read, msg = {}", msg);
        super.channelRead(ctx, msg);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {
        log.info("on channel read0");

        if (webSocketFrame instanceof TextWebSocketFrame) {
            // messages receiving
            String requestStr = ((TextWebSocketFrame) webSocketFrame).text();

            log.info("The message I received is {}", requestStr);

            // Message reply
            TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame("echo");
            channelHandlerContext.channel().writeAndFlush(textWebSocketFrame);
        } else {
            log.info("This is a non text message and will not be processed");
        }
    }
}

这样就可以运行起来了,剩下的都是业务逻辑了。

原理介绍

来自[2]

参考

  1. 阿里云智能语音交互Java SDK
  2. 吃透Netty源码系列四十七之WebSocketServerProtocolHandler详解