KQueueEventLoop | EpollEventLoop may incorrectly update registration when FD is reused.

Motivation:

The current KQueueEventLoop implementation does not process concurrent domain socket channel registration/unregistration in the order they actual
happen since unregistration are delated by an event loop task scheduling. When a domain socket is closed, it's file descriptor might be reused
quickly and therefore trigger a new channel registration using the same descriptor.

Consequently the KQueueEventLoop#add(AbstractKQueueChannel) method will overwrite the current inactive channels having the same descriptor
and the delayed KQueueEventLoop#remove(AbstractKQueueChannel) will remove the active channel that replaced the inactive one.

As active channels are registered, events for this file descriptor won't be processed anymore and the channels will never be closed.

The same problem can also happen in EpollEventLoop. Beside this we also may never remove the AbstractEpollChannel from the internal map
when it is unregistered which will prevent it from be GC'ed

Modifications:

- Change logic of native KQueue and Epoll implementations to ensure we correctly handle the case of FD reuse
- Only try to update kevent / epoll if the Channel is still open (as otherwise it will be handled by kqueue / epoll itself)
- Correctly remove AbstractEpollChannel from internal map in all cases
- Make implementation of closeAll() consistent for Epoll and KQueueEventLoop

Result:

KQueue and Epoll native transports correctly handle FD reuse

Co-authored-by: Norman Maurer <norman_maurer@apple.com>
This commit is contained in:
Julien Viet 2019-05-01 11:24:33 +02:00 committed by Norman Maurer
parent af98b62150
commit e348bd9217
6 changed files with 315 additions and 16 deletions

View File

@ -0,0 +1,180 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.testsuite.transport.socket;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.ImmediateEventExecutor;
import io.netty.util.concurrent.Promise;
import org.junit.Test;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractSocketReuseFdTest extends AbstractSocketTest {
@Override
protected abstract SocketAddress newSocketAddress();
@Override
protected abstract List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories();
@Test(timeout = 60000)
public void testReuseFd() throws Throwable {
run();
}
public void testReuseFd(ServerBootstrap sb, Bootstrap cb) throws Throwable {
sb.childOption(ChannelOption.AUTO_READ, true);
cb.option(ChannelOption.AUTO_READ, true);
// Use a number which will typically not exceed /proc/sys/net/core/somaxconn (which is 128 on linux by default
// often).
int numChannels = 100;
final AtomicReference<Throwable> globalException = new AtomicReference<Throwable>();
final AtomicInteger serverRemaining = new AtomicInteger(numChannels);
final AtomicInteger clientRemaining = new AtomicInteger(numChannels);
final Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
final Promise<Void> clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel sch) {
ReuseFdHandler sh = new ReuseFdHandler(
false,
globalException,
serverRemaining,
serverDonePromise);
sch.pipeline().addLast("handler", sh);
}
});
cb.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel sch) {
ReuseFdHandler ch = new ReuseFdHandler(
true,
globalException,
clientRemaining,
clientDonePromise);
sch.pipeline().addLast("handler", ch);
}
});
ChannelFutureListener listener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
clientDonePromise.tryFailure(future.cause());
}
}
};
Channel sc = sb.bind().sync().channel();
for (int i = 0; i < numChannels; i++) {
cb.connect(sc.localAddress()).addListener(listener);
}
clientDonePromise.sync();
serverDonePromise.sync();
sc.close().sync();
if (globalException.get() != null && !(globalException.get() instanceof IOException)) {
throw globalException.get();
}
}
static class ReuseFdHandler extends ChannelInboundHandlerAdapter {
private static final String EXPECTED_PAYLOAD = "payload";
private final Promise<Void> donePromise;
private final AtomicInteger remaining;
private final boolean client;
volatile Channel channel;
final AtomicReference<Throwable> globalException;
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
final StringBuilder received = new StringBuilder();
ReuseFdHandler(
boolean client,
AtomicReference<Throwable> globalException,
AtomicInteger remaining,
Promise<Void> donePromise) {
this.client = client;
this.globalException = globalException;
this.remaining = remaining;
this.donePromise = donePromise;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
channel = ctx.channel();
if (client) {
ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
received.append(buf.toString(CharsetUtil.US_ASCII));
buf.release();
if (received.toString().equals(EXPECTED_PAYLOAD)) {
if (client) {
ctx.close();
} else {
ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII));
}
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (exception.compareAndSet(null, cause)) {
donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause));
ctx.close();
}
globalException.compareAndSet(null, cause);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
if (remaining.decrementAndGet() == 0) {
if (received.toString().equals(EXPECTED_PAYLOAD)) {
donePromise.setSuccess(null);
} else {
donePromise.tryFailure(new Exception("Unexpected payload:" + received));
}
}
}
}
}

View File

@ -175,7 +175,11 @@ class EpollEventLoop extends SingleThreadEventLoop {
assert inEventLoop();
int fd = ch.socket.intValue();
Native.epollCtlAdd(epollFd.intValue(), fd, ch.flags);
channels.put(fd, ch);
AbstractEpollChannel old = channels.put(fd, ch);
// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
// closed.
assert old == null || !old.isOpen();
}
/**
@ -191,14 +195,19 @@ class EpollEventLoop extends SingleThreadEventLoop {
*/
void remove(AbstractEpollChannel ch) throws IOException {
assert inEventLoop();
int fd = ch.socket.intValue();
if (ch.isOpen()) {
int fd = ch.socket.intValue();
if (channels.remove(fd) != null) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd.intValue(), ch.fd().intValue());
}
AbstractEpollChannel old = channels.remove(fd);
if (old != null && old != ch) {
// The Channel mapping was already replaced due FD reuse, put back the stored Channel.
channels.put(fd, old);
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
assert !ch.isOpen();
} else if (ch.isOpen()) {
// Remove the epoll. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
Native.epollCtlDel(epollFd.intValue(), fd);
}
}
@ -380,11 +389,12 @@ class EpollEventLoop extends SingleThreadEventLoop {
} catch (IOException ignore) {
// ignore on close
}
// Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map.
AbstractEpollChannel[] localChannels = channels.values().toArray(new AbstractEpollChannel[0]);
for (AbstractEpollChannel ch : localChannels) {
for (AbstractEpollChannel ch: localChannels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.AbstractSocketReuseFdTest;
import java.net.SocketAddress;
import java.util.List;
public class EpollDomainSocketReuseFdTest extends AbstractSocketReuseFdTest {
@Override
protected SocketAddress newSocketAddress() {
return EpollSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return EpollSocketTestPermutation.INSTANCE.domainSocket();
}
}

View File

@ -144,12 +144,19 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
@Override
protected void doDeregister() throws Exception {
((KQueueEventLoop) eventLoop()).remove(this);
// As unregisteredFilters() may have not been called because isOpen() returned false we just set both filters
// to false to ensure a consistent state in all cases.
readFilterEnabled = false;
writeFilterEnabled = false;
}
void unregisterFilters() throws Exception {
// Make sure we unregister our filters from kqueue!
readFilter(false);
writeFilter(false);
evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
((KQueueEventLoop) eventLoop()).remove(this);
}
@Override
@ -335,7 +342,7 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
private void evSet(short filter, short flags) {
if (isOpen() && isRegistered()) {
if (isRegistered()) {
evSet0(filter, flags);
}
}
@ -345,7 +352,10 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
private void evSet0(short filter, short flags, int fflags) {
((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
// Only try to add to changeList if the FD is still open, if not we already closed it in the meantime.
if (isOpen()) {
((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
}
}
abstract class AbstractKQueueUnsafe extends AbstractUnsafe {

View File

@ -92,16 +92,35 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
void add(AbstractKQueueChannel ch) {
assert inEventLoop();
channels.put(ch.fd().intValue(), ch);
AbstractKQueueChannel old = channels.put(ch.fd().intValue(), ch);
// We either expect to have no Channel in the map with the same FD or that the FD of the old Channel is already
// closed.
assert old == null || !old.isOpen();
}
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
assert inEventLoop();
changeList.evSet(ch, filter, flags, fflags);
}
void remove(AbstractKQueueChannel ch) {
void remove(AbstractKQueueChannel ch) throws Exception {
assert inEventLoop();
channels.remove(ch.fd().intValue());
int fd = ch.fd().intValue();
AbstractKQueueChannel old = channels.remove(fd);
if (old != null && old != ch) {
// The Channel mapping was already replaced due FD reuse, put back the stored Channel.
channels.put(fd, old);
// If we found another Channel in the map that is mapped to the same FD the given Channel MUST be closed.
assert !ch.isOpen();
} else if (ch.isOpen()) {
// Remove the filters. This is only needed if it's still open as otherwise it will be automatically
// removed once the file-descriptor is closed.
//
// See also https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
ch.unregisterFilters();
}
}
/**
@ -335,6 +354,14 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
} catch (IOException e) {
// ignore on close
}
// Using the intermediate collection to prevent ConcurrentModificationException.
// In the `close()` method, the channel is deleted from `channels` map.
AbstractKQueueChannel[] localChannels = channels.values().toArray(new AbstractKQueueChannel[0]);
for (AbstractKQueueChannel ch: localChannels) {
ch.unsafe().close(ch.unsafe().voidPromise());
}
}
private static void handleLoopException(Throwable t) {

View File

@ -0,0 +1,36 @@
/*
* Copyright 2019 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.testsuite.transport.TestsuitePermutation;
import io.netty.testsuite.transport.socket.AbstractSocketReuseFdTest;
import java.net.SocketAddress;
import java.util.List;
public class KQueueDomainSocketReuseFdTest extends AbstractSocketReuseFdTest {
@Override
protected SocketAddress newSocketAddress() {
return KQueueSocketTestPermutation.newSocketAddress();
}
@Override
protected List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories() {
return KQueueSocketTestPermutation.INSTANCE.domainSocket();
}
}