diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDetectPeerCloseWithoutReadTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDetectPeerCloseWithoutReadTest.java new file mode 100644 index 0000000000..82a8685671 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDetectPeerCloseWithoutReadTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.unix.tests.DetectPeerCloseWithoutReadTest; + +public class EpollDetectPeerCloseWithoutReadTest extends DetectPeerCloseWithoutReadTest { + @Override + protected EventLoopGroup newGroup() { + return new EpollEventLoopGroup(2); + } + + @Override + protected Class serverChannel() { + return EpollServerSocketChannel.class; + } + + @Override + protected Class clientChannel() { + return EpollSocketChannel.class; + } +} diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_native.c b/transport-native-kqueue/src/main/c/netty_kqueue_native.c index b9ff0f6f67..b1a65a9333 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_native.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_native.c @@ -35,6 +35,35 @@ #include "netty_unix_socket.h" #include "netty_unix_util.h" +// Currently only macOS supports EVFILT_SOCK, and it is currently only available in internal APIs. +// To make compiling easier we redefine the values here if they are not present. +#ifdef __APPLE__ +#ifndef EVFILT_SOCK +#define EVFILT_SOCK -13 +#endif /* EVFILT_SOCK */ +#ifndef NOTE_CONNRESET +#define NOTE_CONNRESET 0x00000001 +#endif /* NOTE_CONNRESET */ +#ifndef NOTE_READCLOSED +#define NOTE_READCLOSED 0x00000002 +#endif /* NOTE_READCLOSED */ +#ifndef NOTE_DISCONNECTED +#define NOTE_DISCONNECTED 0x00001000 +#endif /* NOTE_DISCONNECTED */ +#else +#ifndef EVFILT_SOCK +#define EVFILT_SOCK 0 // Disabled +#endif /* EVFILT_SOCK */ +#ifndef NOTE_CONNRESET +#define NOTE_CONNRESET 0 +#endif /* NOTE_CONNRESET */ +#ifndef NOTE_READCLOSED +#define NOTE_READCLOSED 0 +#endif /* NOTE_READCLOSED */ +#ifndef NOTE_DISCONNECTED +#define NOTE_DISCONNECTED 0 +#endif /* NOTE_DISCONNECTED */ +#endif /* __APPLE__ */ clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock @@ -162,6 +191,10 @@ static jshort netty_kqueue_native_evfiltUser(JNIEnv* env, jclass clazz) { return EVFILT_USER; } +static jshort netty_kqueue_native_evfiltSock(JNIEnv* env, jclass clazz) { + return EVFILT_SOCK; +} + static jshort netty_kqueue_native_evAdd(JNIEnv* env, jclass clazz) { return EV_ADD; } @@ -190,18 +223,34 @@ static jshort netty_kqueue_native_evError(JNIEnv* env, jclass clazz) { return EV_ERROR; } +static jshort netty_kqueue_native_noteConnReset(JNIEnv* env, jclass clazz) { + return NOTE_CONNRESET; +} + +static jshort netty_kqueue_native_noteReadClosed(JNIEnv* env, jclass clazz) { + return NOTE_READCLOSED; +} + +static jshort netty_kqueue_native_noteDisconnected(JNIEnv* env, jclass clazz) { + return NOTE_DISCONNECTED; +} + // JNI Method Registration Table Begin static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead }, { "evfiltWrite", "()S", (void *) netty_kqueue_native_evfiltWrite }, { "evfiltUser", "()S", (void *) netty_kqueue_native_evfiltUser }, + { "evfiltSock", "()S", (void *) netty_kqueue_native_evfiltSock }, { "evAdd", "()S", (void *) netty_kqueue_native_evAdd }, { "evEnable", "()S", (void *) netty_kqueue_native_evEnable }, { "evDisable", "()S", (void *) netty_kqueue_native_evDisable }, { "evDelete", "()S", (void *) netty_kqueue_native_evDelete }, { "evClear", "()S", (void *) netty_kqueue_native_evClear }, { "evEOF", "()S", (void *) netty_kqueue_native_evEOF }, - { "evError", "()S", (void *) netty_kqueue_native_evError } + { "evError", "()S", (void *) netty_kqueue_native_evError }, + { "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed }, + { "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset }, + { "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected } }; static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod fixed_method_table[] = { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 0e490b9235..94a9fc637e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -166,6 +166,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan // Make sure we unregister our filters from kqueue! readFilter(false); writeFilter(false); + evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0); ((KQueueEventLoop) eventLoop()).remove(this); @@ -204,6 +205,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan if (readFilterEnabled) { evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE); } + evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP); } @Override @@ -382,7 +384,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } private void evSet0(short filter, short flags) { - ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, 0); + evSet0(filter, flags, 0); + } + + private void evSet0(short filter, short flags, int fflags) { + ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags); } abstract class AbstractKQueueUnsafe extends AbstractUnsafe { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index ccb91173ee..9fbd0b26aa 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -196,6 +196,8 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } else if (filter == Native.EVFILT_READ) { // Check READ before EOF to ensure all data is read before shutting down the input. unsafe.readReady(eventList.data(i)); + } else if (filter == Native.EVFILT_SOCK && (eventList.fflags(i) & Native.NOTE_RDHUP) != 0) { + unsafe.readEOF(); } // Check if EV_EOF was set, this will notify us for connection-reset in which case diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java index 0e8ed698c3..71c3955703 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java @@ -37,7 +37,13 @@ final class KQueueStaticallyReferencedJniMethods { static native short evEOF(); static native short evError(); + // data/hint fflags for EVFILT_SOCK, shared with userspace. + static native short noteReadClosed(); + static native short noteConnReset(); + static native short noteDisconnected(); + static native short evfiltRead(); static native short evfiltWrite(); static native short evfiltUser(); + static native short evfiltSock(); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java index f64eeb45a0..d1c2bff5d7 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java @@ -31,8 +31,12 @@ import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evEOF import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evEnable; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evError; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltRead; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltSock; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltUser; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evfiltWrite; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.noteConnReset; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.noteDisconnected; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.noteReadClosed; import static io.netty.channel.unix.Errors.newIOException; /** @@ -59,6 +63,12 @@ final class Native { static final short EV_ERROR = evError(); static final short EV_EOF = evEOF(); + static final int NOTE_READCLOSED = noteReadClosed(); + static final int NOTE_CONNRESET = noteConnReset(); + static final int NOTE_DISCONNECTED = noteDisconnected(); + + static final int NOTE_RDHUP = NOTE_READCLOSED | NOTE_CONNRESET | NOTE_DISCONNECTED; + // Commonly used combinations of EV defines static final short EV_ADD_CLEAR_ENABLE = (short) (EV_ADD | EV_CLEAR | EV_ENABLE); static final short EV_DELETE_DISABLE = (short) (EV_DELETE | EV_DISABLE); @@ -66,6 +76,7 @@ final class Native { static final short EVFILT_READ = evfiltRead(); static final short EVFILT_WRITE = evfiltWrite(); static final short EVFILT_USER = evfiltUser(); + static final short EVFILT_SOCK = evfiltSock(); static FileDescriptor newKQueue() { return new FileDescriptor(kqueueCreate()); diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDetectPeerCloseWithoutReadTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDetectPeerCloseWithoutReadTest.java new file mode 100644 index 0000000000..4f139a0561 --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDetectPeerCloseWithoutReadTest.java @@ -0,0 +1,38 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.unix.tests.DetectPeerCloseWithoutReadTest; + +public class KQueueDetectPeerCloseWithoutReadTest extends DetectPeerCloseWithoutReadTest { + @Override + protected EventLoopGroup newGroup() { + return new KQueueEventLoopGroup(2); + } + + @Override + protected Class serverChannel() { + return KQueueServerSocketChannel.class; + } + + @Override + protected Class clientChannel() { + return KQueueSocketChannel.class; + } +} diff --git a/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java b/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java new file mode 100644 index 0000000000..ef5482d11b --- /dev/null +++ b/transport-native-unix-common-tests/src/main/java/io/netty/channel/unix/tests/DetectPeerCloseWithoutReadTest.java @@ -0,0 +1,175 @@ +/* + * Copyright 2017 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.unix.tests; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.ServerChannel; +import io.netty.channel.SimpleChannelInboundHandler; +import org.junit.Test; + +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; + +public abstract class DetectPeerCloseWithoutReadTest { + protected abstract EventLoopGroup newGroup(); + protected abstract Class serverChannel(); + protected abstract Class clientChannel(); + + @Test(timeout = 10000) + public void clientCloseWithoutServerReadIsDetected() throws InterruptedException { + EventLoopGroup serverGroup = null; + EventLoopGroup clientGroup = null; + Channel serverChannel = null; + try { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger bytesRead = new AtomicInteger(); + final int expectedBytes = 100; + serverGroup = newGroup(); + clientGroup = newGroup(); + ServerBootstrap sb = new ServerBootstrap(); + sb.group(serverGroup); + sb.channel(serverChannel()); + sb.childOption(ChannelOption.AUTO_READ, false); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new TestHandler(bytesRead, latch)); + } + }); + + serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); + + Bootstrap cb = new Bootstrap(); + cb.group(serverGroup); + cb.channel(clientChannel()); + cb.handler(new ChannelInboundHandlerAdapter()); + Channel clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); + ByteBuf buf = clientChannel.alloc().buffer(expectedBytes); + buf.writerIndex(buf.writerIndex() + expectedBytes); + clientChannel.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE); + + latch.await(); + assertEquals(expectedBytes, bytesRead.get()); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (serverGroup != null) { + serverGroup.shutdownGracefully(); + } + if (clientGroup != null) { + clientGroup.shutdownGracefully(); + } + } + } + + @Test(timeout = 10000) + public void serverCloseWithoutClientReadIsDetected() throws InterruptedException { + EventLoopGroup serverGroup = null; + EventLoopGroup clientGroup = null; + Channel serverChannel = null; + Channel clientChannel = null; + try { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger bytesRead = new AtomicInteger(); + final int expectedBytes = 100; + serverGroup = newGroup(); + clientGroup = newGroup(); + ServerBootstrap sb = new ServerBootstrap(); + sb.group(serverGroup); + sb.channel(serverChannel()); + sb.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + ByteBuf buf = ctx.alloc().buffer(expectedBytes); + buf.writerIndex(buf.writerIndex() + expectedBytes); + ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE); + ctx.fireChannelActive(); + } + }); + } + }); + + serverChannel = sb.bind(new InetSocketAddress(0)).syncUninterruptibly().channel(); + + Bootstrap cb = new Bootstrap(); + cb.group(serverGroup); + cb.channel(clientChannel()); + cb.option(ChannelOption.AUTO_READ, false); + cb.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) throws Exception { + ch.pipeline().addLast(new TestHandler(bytesRead, latch)); + } + }); + clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel(); + + latch.await(); + assertEquals(expectedBytes, bytesRead.get()); + } finally { + if (serverChannel != null) { + serverChannel.close().syncUninterruptibly(); + } + if (clientChannel != null) { + clientChannel.close().syncUninterruptibly(); + } + if (serverGroup != null) { + serverGroup.shutdownGracefully(); + } + if (clientGroup != null) { + clientGroup.shutdownGracefully(); + } + } + } + + private static final class TestHandler extends SimpleChannelInboundHandler { + private final AtomicInteger bytesRead; + private final CountDownLatch latch; + + TestHandler(AtomicInteger bytesRead, CountDownLatch latch) { + this.bytesRead = bytesRead; + this.latch = latch; + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + bytesRead.addAndGet(msg.readableBytes()); + // Because autoread is off, we call read to consume all data until we detect the close. + ctx.read(); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + latch.countDown(); + ctx.fireChannelInactive(); + } + } +}