diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java index eabee894ee..abcf989f68 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueEventLoop.java @@ -101,7 +101,13 @@ final class KQueueEventLoop extends SingleThreadEventLoop { void remove(AbstractKQueueChannel ch) { assert inEventLoop(); - channels.remove(ch.fd().intValue()); + int fd = ch.fd().intValue(); + // Remove only if that's the same channel + // Due to file descriptor reuse another channel might have overwritten + // the removed channel in the map + if (channels.get(fd) == ch) { + channels.remove(fd); + } } /** diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java new file mode 100644 index 0000000000..5d99c07c3a --- /dev/null +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueDomainSocketReuseFdTest.java @@ -0,0 +1,172 @@ +/* + * Copyright 2016 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.channel.kqueue; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.testsuite.transport.TestsuitePermutation; +import io.netty.testsuite.transport.socket.SocketStringEchoTest; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.ImmediateEventExecutor; +import io.netty.util.concurrent.Promise; +import org.junit.Test; + +import java.io.IOException; +import java.net.SocketAddress; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class KQueueDomainSocketReuseFdTest extends SocketStringEchoTest { + @Override + protected SocketAddress newSocketAddress() { + return KQueueSocketTestPermutation.newSocketAddress(); + } + + @Override + protected List> newFactories() { + return KQueueSocketTestPermutation.INSTANCE.domainSocket(); + } + + @Test(timeout = 60000) + public void testReuseFd() throws Throwable { + run(); + } + + public void testReuseFd(ServerBootstrap sb, Bootstrap cb) throws Throwable { + sb.childOption(ChannelOption.AUTO_READ, true); + cb.option(ChannelOption.AUTO_READ, true); + int numChannels = 1000; + final AtomicReference globalException = new AtomicReference(); + final AtomicInteger serverRemaining = new AtomicInteger(numChannels); + final AtomicInteger clientRemaining = new AtomicInteger(numChannels); + final Promise serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise(); + final Promise clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise(); + + sb.childHandler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) { + ReuseFdHandler sh = new ReuseFdHandler( + false, + globalException, + serverRemaining, + serverDonePromise); + sch.pipeline().addLast("handler", sh); + } + }); + + cb.handler(new ChannelInitializer() { + @Override + public void initChannel(Channel sch) { + ReuseFdHandler ch = new ReuseFdHandler( + true, + globalException, + clientRemaining, + clientDonePromise); + sch.pipeline().addLast("handler", ch); + } + }); + + Channel sc = sb.bind().sync().channel(); + for (int i = 0; i < numChannels; i++) { + cb.connect(sc.localAddress()); + } + + clientDonePromise.sync(); + serverDonePromise.sync(); + sc.close().sync(); + + if (globalException.get() != null && !(globalException.get() instanceof IOException)) { + throw globalException.get(); + } + } + + static class ReuseFdHandler extends ChannelInboundHandlerAdapter { + private final Promise donePromise; + private final AtomicInteger remaining; + private final boolean client; + volatile Channel channel; + volatile boolean complete; + final AtomicReference globalException; + final AtomicReference exception = new AtomicReference(); + final StringBuilder received = new StringBuilder(); + + ReuseFdHandler( + boolean client, + AtomicReference globalException, + AtomicInteger remaining, + Promise donePromise) { + this.client = client; + this.globalException = globalException; + this.remaining = remaining; + this.donePromise = donePromise; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + channel = ctx.channel(); + if (client) { + ctx.writeAndFlush(Unpooled.copiedBuffer("payload", CharsetUtil.US_ASCII)); + } + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (msg instanceof ByteBuf) { + ByteBuf buf = (ByteBuf) msg; + received.append(buf.toString(CharsetUtil.US_ASCII)); + buf.release(); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + complete = true; + if (client) { + ctx.close(); + } else { + ctx.writeAndFlush(Unpooled.copiedBuffer("payload".getBytes())); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (exception.compareAndSet(null, cause)) { + donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause)); + ctx.close(); + } + globalException.compareAndSet(null, cause); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + if (remaining.decrementAndGet() == 0) { + if (received.toString().equals("payload")) { + donePromise.setSuccess(null); + } else { + donePromise.tryFailure(new Exception("Unexpected payload:" + received)); + } + } + } + } +}