Netty中的水平触发和边缘触发
背景
水平触发和边缘触发
[1]
在非阻塞IO中,通过Selector选出准备好的fd进行操作。有两种模式,一是水平触发(Level Trigger),二是边缘触发(Edge Trigger)。
水平触发模式:只要某个fd还有数据没读完,那么下次轮询还会被选出。
边缘触发模式:只有fd状态发生改变后,该fd才会被再次选出。
工程区别:边缘触发模式下的一次轮询必须处理完本次轮询出的fd的所有数据,否则该fd将不会在下次轮询中被选出。
Netty实现
在Netty中,NioChannel体系是水平触发,EpollChannel体系是边缘触发。
水平触发
在 AbstractNioMessageChannel.read()
中:
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
[1]
1.使用allocHandle维护一个大小合适的缓冲区
2.循环调用doReadMessage(readBuf),该方法内部是调用java nio中的serverSocketChannel.accept() 获得SocketChannel,并包装成netty中的NioSocketChannel,然后放入readBuf。
3.pipeline.fireChannelRead(readBuf.get(i)),让每一个NioSocketChannel经过NioServerSocketChannel的handler链进行处理。
4.finally{...}中,调用removeReadOp(),移除该Channel的SelectionKey中的OP_ACCEPT值
水平触发主要体现在4,Netty为了使每次轮询负载均衡,不至于一次轮询要的readBuf内存过大,所以限制了readBuf大小,导致每次轮询所能够处理的数据有限,这就可能使一次轮询不回读完fd中的数据。在finally{...}中移除了OP_ACCEPT是因为他工作在LT触发模式下,即使移除了,只要fd中还有数据,下次轮询仍然会把该fd选出进行处理。
边缘触发
在 AbstractEpollServerChannel.epollInReady()
中:
@Override
void epollInReady() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (shouldBreakEpollInReady(config)) {
clearEpollIn0();
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));
final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
allocHandle.attemptedBytesRead(1);
epollInBefore();
Throwable exception = null;
try {
try {
do {
// lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
// EpollRecvByteAllocatorHandle knows if it should try to read again or not when autoRead is
// enabled.
allocHandle.lastBytesRead(socket.accept(acceptedAddress));
if (allocHandle.lastBytesRead() == -1) {
// this means everything was handled for now
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(newChildChannel(allocHandle.lastBytesRead(), acceptedAddress, 1,
acceptedAddress[0]));
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
}
}
}
epoll同样采用了allocHandle来使每次轮询负载均衡,不同的是finally{...}移除SeletionKey中该事件的处理。
其中 epollInFinally(config)
为:
final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.isEdgeTriggered() && allocHandle.maybeMoreDataToRead();
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
clearEpollIn();
} else if (readPending && maybeMoreDataToRead) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
//
// It is possible the last fireChannelRead call could cause the user to call read() again, or if
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable(config);
}
}
注释中作出了说明:
如果fd中仍有未读完的数据,必须调用executeEpollReadRunnable(config),自己触发EPOLLIN event,否则会因为没有读完数据导致socket不可用。