KQueue detect peer close without EVFILT_READ

Motivation:
The EPOLL transport uses EPOLLRDHUP to detect when the peer closes the write side of the socket. Currently KQueue is not able to mimic this behavior and the only way to detect if the peer has closed is to read. It may not always be appropriate to read for backpressure and other reasons at the application level.

Modifications:
- Support EVFILT_SOCK filter which provides notification when the peer closes the socket

Result:
KQueue transport has more consistent behavior with Epoll transport for detecting peer closure.
This commit is contained in:
Scott Mitchell 2017-08-16 23:44:57 -07:00
parent ba27456653
commit 1d7c3fb7ee
8 changed files with 327 additions and 2 deletions

View File

@ -0,0 +1,38 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.tests.DetectPeerCloseWithoutReadTest;
public class EpollDetectPeerCloseWithoutReadTest extends DetectPeerCloseWithoutReadTest {
@Override
protected EventLoopGroup newGroup() {
return new EpollEventLoopGroup(2);
}
@Override
protected Class<? extends ServerChannel> serverChannel() {
return EpollServerSocketChannel.class;
}
@Override
protected Class<? extends Channel> clientChannel() {
return EpollSocketChannel.class;
}
}

View File

@ -35,6 +35,35 @@
#include "netty_unix_socket.h"
#include "netty_unix_util.h"
// Currently only macOS supports EVFILT_SOCK, and it is currently only available in internal APIs.
// To make compiling easier we redefine the values here if they are not present.
#ifdef __APPLE__
#ifndef EVFILT_SOCK
#define EVFILT_SOCK -13
#endif /* EVFILT_SOCK */
#ifndef NOTE_CONNRESET
#define NOTE_CONNRESET 0x00000001
#endif /* NOTE_CONNRESET */
#ifndef NOTE_READCLOSED
#define NOTE_READCLOSED 0x00000002
#endif /* NOTE_READCLOSED */
#ifndef NOTE_DISCONNECTED
#define NOTE_DISCONNECTED 0x00001000
#endif /* NOTE_DISCONNECTED */
#else
#ifndef EVFILT_SOCK
#define EVFILT_SOCK 0 // Disabled
#endif /* EVFILT_SOCK */
#ifndef NOTE_CONNRESET
#define NOTE_CONNRESET 0
#endif /* NOTE_CONNRESET */
#ifndef NOTE_READCLOSED
#define NOTE_READCLOSED 0
#endif /* NOTE_READCLOSED */
#ifndef NOTE_DISCONNECTED
#define NOTE_DISCONNECTED 0
#endif /* NOTE_DISCONNECTED */
#endif /* __APPLE__ */
clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock
@ -162,6 +191,10 @@ static jshort netty_kqueue_native_evfiltUser(JNIEnv* env, jclass clazz) {
return EVFILT_USER;
}
static jshort netty_kqueue_native_evfiltSock(JNIEnv* env, jclass clazz) {
return EVFILT_SOCK;
}
static jshort netty_kqueue_native_evAdd(JNIEnv* env, jclass clazz) {
return EV_ADD;
}
@ -190,18 +223,34 @@ static jshort netty_kqueue_native_evError(JNIEnv* env, jclass clazz) {
return EV_ERROR;
}
static jshort netty_kqueue_native_noteConnReset(JNIEnv* env, jclass clazz) {
return NOTE_CONNRESET;
}
static jshort netty_kqueue_native_noteReadClosed(JNIEnv* env, jclass clazz) {
return NOTE_READCLOSED;
}
static jshort netty_kqueue_native_noteDisconnected(JNIEnv* env, jclass clazz) {
return NOTE_DISCONNECTED;
}
// JNI Method Registration Table Begin
static const JNINativeMethod statically_referenced_fixed_method_table[] = {
{ "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead },
{ "evfiltWrite", "()S", (void *) netty_kqueue_native_evfiltWrite },
{ "evfiltUser", "()S", (void *) netty_kqueue_native_evfiltUser },
{ "evfiltSock", "()S", (void *) netty_kqueue_native_evfiltSock },
{ "evAdd", "()S", (void *) netty_kqueue_native_evAdd },
{ "evEnable", "()S", (void *) netty_kqueue_native_evEnable },
{ "evDisable", "()S", (void *) netty_kqueue_native_evDisable },
{ "evDelete", "()S", (void *) netty_kqueue_native_evDelete },
{ "evClear", "()S", (void *) netty_kqueue_native_evClear },
{ "evEOF", "()S", (void *) netty_kqueue_native_evEOF },
{ "evError", "()S", (void *) netty_kqueue_native_evError }
{ "evError", "()S", (void *) netty_kqueue_native_evError },
{ "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed },
{ "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset },
{ "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected }
};
static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]);
static const JNINativeMethod fixed_method_table[] = {

View File

@ -166,6 +166,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
// Make sure we unregister our filters from kqueue!
readFilter(false);
writeFilter(false);
evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
((KQueueEventLoop) eventLoop()).remove(this);
@ -204,6 +205,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
if (readFilterEnabled) {
evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
}
evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
}
@Override
@ -382,7 +384,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
private void evSet0(short filter, short flags) {
((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, 0);
evSet0(filter, flags, 0);
}
private void evSet0(short filter, short flags, int fflags) {
((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
}
abstract class AbstractKQueueUnsafe extends AbstractUnsafe {

View File

@ -196,6 +196,8 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
} else if (filter == Native.EVFILT_READ) {
// Check READ before EOF to ensure all data is read before shutting down the input.
unsafe.readReady(eventList.data(i));
} else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) {
unsafe.readEOF();
}
// Check if EV_EOF was set, this will notify us for connection-reset in which case

View File

@ -37,7 +37,13 @@ final class KQueueStaticallyReferencedJniMethods {
static native short evEOF();
static native short evError();
// data/hint fflags for EVFILT_SOCK, shared with userspace.
static native short noteReadClosed();
static native short noteConnReset();
static native short noteDisconnected();
static native short evfiltRead();
static native short evfiltWrite();
static native short evfiltUser();
static native short evfiltSock();
}

View File

@ -31,8 +31,12 @@ import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evEOF
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evEnable;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evError;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltRead;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltSock;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltUser;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltWrite;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.noteConnReset;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.noteDisconnected;
import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.noteReadClosed;
import static io.netty.channel.unix.Errors.newIOException;
/**
@ -59,6 +63,12 @@ final class Native {
static final short EV_ERROR = evError();
static final short EV_EOF = evEOF();
static final int NOTE_READCLOSED = noteReadClosed();
static final int NOTE_CONNRESET = noteConnReset();
static final int NOTE_DISCONNECTED = noteDisconnected();
static final int NOTE_RDHUP = NOTE_READCLOSED | NOTE_CONNRESET | NOTE_DISCONNECTED;
// Commonly used combinations of EV defines
static final short EV_ADD_CLEAR_ENABLE = (short) (EV_ADD | EV_CLEAR | EV_ENABLE);
static final short EV_DELETE_DISABLE = (short) (EV_DELETE | EV_DISABLE);
@ -66,6 +76,7 @@ final class Native {
static final short EVFILT_READ = evfiltRead();
static final short EVFILT_WRITE = evfiltWrite();
static final short EVFILT_USER = evfiltUser();
static final short EVFILT_SOCK = evfiltSock();
static FileDescriptor newKQueue() {
return new FileDescriptor(kqueueCreate());

View File

@ -0,0 +1,38 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.unix.tests.DetectPeerCloseWithoutReadTest;
public class KQueueDetectPeerCloseWithoutReadTest extends DetectPeerCloseWithoutReadTest {
@Override
protected EventLoopGroup newGroup() {
return new KQueueEventLoopGroup(2);
}
@Override
protected Class<? extends ServerChannel> serverChannel() {
return KQueueServerSocketChannel.class;
}
@Override
protected Class<? extends Channel> clientChannel() {
return KQueueSocketChannel.class;
}
}

View File

@ -0,0 +1,175 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.unix.tests;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.SimpleChannelInboundHandler;
import org.junit.Test;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public abstract class DetectPeerCloseWithoutReadTest {
protected abstract EventLoopGroup newGroup();
protected abstract Class<? extends ServerChannel> serverChannel();
protected abstract Class<? extends Channel> clientChannel();
@Test(timeout = 10000)
public void clientCloseWithoutServerReadIsDetected() throws InterruptedException {
EventLoopGroup serverGroup = null;
EventLoopGroup clientGroup = null;
Channel serverChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger bytesRead = new AtomicInteger();
final int expectedBytes = 100;
serverGroup = newGroup();
clientGroup = newGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(serverChannel());
sb.childOption(ChannelOption.AUTO_READ, false);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TestHandler(bytesRead, latch));
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(serverGroup);
cb.channel(clientChannel());
cb.handler(new ChannelInboundHandlerAdapter());
Channel clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
ByteBuf buf = clientChannel.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes);
clientChannel.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
latch.await();
assertEquals(expectedBytes, bytesRead.get());
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (serverGroup != null) {
serverGroup.shutdownGracefully();
}
if (clientGroup != null) {
clientGroup.shutdownGracefully();
}
}
}
@Test(timeout = 10000)
public void serverCloseWithoutClientReadIsDetected() throws InterruptedException {
EventLoopGroup serverGroup = null;
EventLoopGroup clientGroup = null;
Channel serverChannel = null;
Channel clientChannel = null;
try {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger bytesRead = new AtomicInteger();
final int expectedBytes = 100;
serverGroup = newGroup();
clientGroup = newGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(serverChannel());
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = ctx.alloc().buffer(expectedBytes);
buf.writerIndex(buf.writerIndex() + expectedBytes);
ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
ctx.fireChannelActive();
}
});
}
});
serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel();
Bootstrap cb = new Bootstrap();
cb.group(serverGroup);
cb.channel(clientChannel());
cb.option(ChannelOption.AUTO_READ, false);
cb.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TestHandler(bytesRead, latch));
}
});
clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
latch.await();
assertEquals(expectedBytes, bytesRead.get());
} finally {
if (serverChannel != null) {
serverChannel.close().syncUninterruptibly();
}
if (clientChannel != null) {
clientChannel.close().syncUninterruptibly();
}
if (serverGroup != null) {
serverGroup.shutdownGracefully();
}
if (clientGroup != null) {
clientGroup.shutdownGracefully();
}
}
}
private static final class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final AtomicInteger bytesRead;
private final CountDownLatch latch;
TestHandler(AtomicInteger bytesRead, CountDownLatch latch) {
this.bytesRead = bytesRead;
this.latch = latch;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
bytesRead.addAndGet(msg.readableBytes());
// Because autoread is off, we call read to consume all data until we detect the close.
ctx.read();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
latch.countDown();
ctx.fireChannelInactive();
}
}
}