diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketReuseFdTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketReuseFdTest.java new file mode 100644 index 0000000000..6e8ae87eef --- /dev/null +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/AbstractSocketReuseFdTest.java @@ -0,0 +1,180 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.testsuite.transport.socket; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; +import org.junit.Test; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public abstract class AbstractSocketReuseFdTest extends AbstractSocketTest { + @Override + protected abstract SocketAddress newSocketAddress(); + + @Override + protected abstract List> newFactories(); + + @Test(timeout = 60000) + public void testReuseFd() throws Throwable { + run(); + } + + public void testReuseFd(ServerBootstrap sb, Bootstrap cb) throws Throwable { + sb.childOption(ChannelOption.AUTO_READ, true); + cb.option(ChannelOption.AUTO_READ, true); + + // Use a number which will typically not exceed /proc/sys/net/core/somaxconn (which is 128 on linux by default + // often). + int numChannels = 100; + final AtomicReference globalException = new AtomicReference(); + final AtomicInteger serverRemaining = new AtomicInteger(numChannels); + final AtomicInteger clientRemaining = new AtomicInteger(numChannels); + final Promise serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise(); + + sb.childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) { + ReuseFdHandler sh = new ReuseFdHandler( + false, + globalException, + serverRemaining, + serverDonePromise); + sch.pipeline().addLast("handler", sh); + } + }); + + cb.handler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) { + ReuseFdHandler ch = new ReuseFdHandler( + true, + globalException, + clientRemaining, + clientDonePromise); + sch.pipeline().addLast("handler", ch); + } + }); + + ChannelFutureListener listener = new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + if (!future.isSuccess()) { + clientDonePromise.tryFailure(future.cause()); + } + } + }; + + Channel sc = sb.bind().sync().channel(); + for (int i = 0; i < numChannels; i++) { + cb.connect(sc.localAddress()).addListener(listener); + } + + clientDonePromise.sync(); + serverDonePromise.sync(); + sc.close().sync(); + + if (globalException.get() != null && !(globalException.get() instanceof IOException)) { + throw globalException.get(); + } + } + + static class ReuseFdHandler extends ChannelInboundHandlerAdapter { + private static final String EXPECTED_PAYLOAD = "payload"; + + private final Promise donePromise; + private final AtomicInteger remaining; + private final boolean client; + volatile Channel channel; + final AtomicReference globalException; + final AtomicReference exception = new AtomicReference(); + final StringBuilder received = new StringBuilder(); + + ReuseFdHandler( + boolean client, + AtomicReference globalException, + AtomicInteger remaining, + Promise donePromise) { + this.client = client; + this.globalException = globalException; + this.remaining = remaining; + this.donePromise = donePromise; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + channel = ctx.channel(); + if (client) { + ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII)); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + received.append(buf.toString(CharsetUtil.US_ASCII)); + buf.release(); + + if (received.toString().equals(EXPECTED_PAYLOAD)) { + if (client) { + ctx.close(); + } else { + ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII)); + } + } + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (exception.compareAndSet(null, cause)) { + donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause)); + ctx.close(); + } + globalException.compareAndSet(null, cause); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + if (remaining.decrementAndGet() == 0) { + if (received.toString().equals(EXPECTED_PAYLOAD)) { + donePromise.setSuccess(null); + } else { + donePromise.tryFailure(new Exception("Unexpected payload:" + received)); + } + } + } + } +} diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index e7bfad61af..eadfc519db 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -175,7 +175,11 @@ class EpollEventLoop extends SingleThreadEventLoop { assert inEventLoop(); int fd = ch.socket.intValue(); Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags); - channels.put(fd, ch); + AbstractEpollChannel old = channels.put(fd, ch); + + // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already + // closed. + assert old == null || !old.isOpen(); } /** @@ -191,14 +195,19 @@ class EpollEventLoop extends SingleThreadEventLoop { */ void remove(AbstractEpollChannel ch) throws IOException { assert inEventLoop(); + int fd = ch.socket.intValue(); - if (ch.isOpen()) { - int fd = ch.socket.intValue(); - if (channels.remove(fd) != null) { - // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically - // removed once the file-descriptor is closed. - Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue()); - } + AbstractEpollChannel old = channels.remove(fd); + if (old != null && old != ch) { + // The Channel mapping was already replaced due FD reuse, put back the stored Channel. + channels.put(fd, old); + + // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. + assert !ch.isOpen(); + } else if (ch.isOpen()) { + // Remove the epoll. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + Native.epollCtlDel(epollFd.intValue(), fd); } } @@ -380,11 +389,12 @@ class EpollEventLoop extends SingleThreadEventLoop { } catch (IOException ignore) { // ignore on close } + // Using the intermediate collection to prevent ConcurrentModificationException. // In the `close()` method, the channel is deleted from `channels` map. AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]); - for (AbstractEpollChannel ch : localChannels) { + for (AbstractEpollChannel ch: localChannels) { ch.unsafe().close(ch.unsafe().voidPromise()); } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketReuseFdTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketReuseFdTest.java new file mode 100644 index 0000000000..487ea64a19 --- /dev/null +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollDomainSocketReuseFdTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.epoll; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.AbstractSocketReuseFdTest; + +import java.net.SocketAddress; +import java.util.List; + +public class EpollDomainSocketReuseFdTest extends AbstractSocketReuseFdTest { + @Override + protected SocketAddress newSocketAddress() { + return EpollSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return EpollSocketTestPermutation.INSTANCE.domainSocket(); + } +} diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index d7546527d4..c38a338e77 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -144,12 +144,19 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan @Override protected void doDeregister() throws Exception { + ((KQueueEventLoop) eventLoop()).remove(this); + + // As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters + // to false to ensure a consistent state in all cases. + readFilterEnabled = false; + writeFilterEnabled = false; + } + + void unregisterFilters() throws Exception { // Make sure we unregister our filters from kqueue! readFilter(false); writeFilter(false); evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0); - - ((KQueueEventLoop) eventLoop()).remove(this); } @Override @@ -335,7 +342,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } private void evSet(short filter, short flags) { - if (isOpen() && isRegistered()) { + if (isRegistered()) { evSet0(filter, flags); } } @@ -345,7 +352,10 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan } private void evSet0(short filter, short flags, int fflags) { - ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags); + // Only try to add to changeList if the FD is still open, if not we already closed it in the meantime. + if (isOpen()) { + ((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags); + } } abstract class AbstractKQueueUnsafe extends AbstractUnsafe { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index eabee894ee..a968118a67 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -92,16 +92,35 @@ final class KQueueEventLoop extends SingleThreadEventLoop { void add(AbstractKQueueChannel ch) { assert inEventLoop(); - channels.put(ch.fd().intValue(), ch); + AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch); + // We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already + // closed. + assert old == null || !old.isOpen(); } void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) { + assert inEventLoop(); changeList.evSet(ch, filter, flags, fflags); } - void remove(AbstractKQueueChannel ch) { + void remove(AbstractKQueueChannel ch) throws Exception { assert inEventLoop(); - channels.remove(ch.fd().intValue()); + int fd = ch.fd().intValue(); + + AbstractKQueueChannel old = channels.remove(fd); + if (old != null && old != ch) { + // The Channel mapping was already replaced due FD reuse, put back the stored Channel. + channels.put(fd, old); + + // If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed. + assert !ch.isOpen(); + } else if (ch.isOpen()) { + // Remove the filters. This is only needed if it's still open as otherwise it will be automatically + // removed once the file-descriptor is closed. + // + // See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2 + ch.unregisterFilters(); + } } /** @@ -335,6 +354,14 @@ final class KQueueEventLoop extends SingleThreadEventLoop { } catch (IOException e) { // ignore on close } + + // Using the intermediate collection to prevent ConcurrentModificationException. + // In the `close()` method, the channel is deleted from `channels` map. + AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]); + + for (AbstractKQueueChannel ch: localChannels) { + ch.unsafe().close(ch.unsafe().voidPromise()); + } } private static void handleLoopException(Throwable t) { diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java new file mode 100644 index 0000000000..2e239c21c7 --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java @@ -0,0 +1,36 @@ +/* + * Copyright 2019 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.AbstractSocketReuseFdTest; + +import java.net.SocketAddress; +import java.util.List; + +public class KQueueDomainSocketReuseFdTest extends AbstractSocketReuseFdTest { + @Override + protected SocketAddress newSocketAddress() { + return KQueueSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.domainSocket(); + } +}