Make it possible to use a wrapped EventLoop with a Channel (#8677)

Motiviation:

Because of how we implemented the registration / deregistration of an EventLoop it was not possible to wrap an EventLoop implementation and use it with a Channel.

Modification:

- Introduce EventLoop.Unsafe which is responsible for the actual registration.
- Move validation of EventLoop / Channel combo to the EventLoop
- Add unit test that verifies that wrapping works

Result:

Be able to wrap an EventLoop and so add some extra functionality.
This commit is contained in:
Norman Maurer 2019-01-17 09:17:51 +01:00 committed by GitHub
parent 90e343d378
commit 1fe931b6e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
61 changed files with 726 additions and 534 deletions

View File

@ -17,7 +17,7 @@ package io.netty.handler.codec.http2;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.handler.codec.http2.Http2Connection.Endpoint;
import io.netty.handler.codec.http2.Http2Stream.State;
import io.netty.util.concurrent.Future;
@ -61,7 +61,7 @@ public class DefaultHttp2ConnectionTest {
private DefaultHttp2Connection server;
private DefaultHttp2Connection client;
private static DefaultEventLoopGroup group;
private static LocalEventLoopGroup group;
@Mock
private Http2Connection.Listener clientListener;
@ -71,7 +71,7 @@ public class DefaultHttp2ConnectionTest {
@BeforeClass
public static void beforeClass() {
group = new DefaultEventLoopGroup(2);
group = new LocalEventLoopGroup(2);
}
@AfterClass

View File

@ -29,7 +29,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
@ -1136,7 +1136,7 @@ public class Http2ConnectionRoundtripTest {
final AtomicReference<Http2ConnectionHandler> serverHandlerRef = new AtomicReference<Http2ConnectionHandler>();
final CountDownLatch serverInitLatch = new CountDownLatch(1);
sb.group(new DefaultEventLoopGroup());
sb.group(new LocalEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
@ -1156,7 +1156,7 @@ public class Http2ConnectionRoundtripTest {
}
});
cb.group(new DefaultEventLoopGroup());
cb.group(new LocalEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override

View File

@ -27,10 +27,10 @@ import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import org.junit.After;
import org.junit.AfterClass;
@ -61,7 +61,7 @@ public class Http2MultiplexCodecBuilderTest {
@BeforeClass
public static void init() {
group = new DefaultEventLoop();
group = new LocalEventLoopGroup(1);
}
@Before

View File

@ -25,7 +25,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
@ -508,7 +508,7 @@ public class HttpToHttp2ConnectionHandlerTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new DefaultEventLoopGroup());
sb.group(new LocalEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
@ -525,7 +525,7 @@ public class HttpToHttp2ConnectionHandlerTest {
}
});
cb.group(new DefaultEventLoopGroup());
cb.group(new LocalEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override

View File

@ -26,7 +26,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
@ -667,7 +667,7 @@ public class InboundHttp2ToHttpAdapterTest {
sb = new ServerBootstrap();
cb = new Bootstrap();
sb.group(new DefaultEventLoopGroup());
sb.group(new LocalEventLoopGroup());
sb.channel(LocalServerChannel.class);
sb.childHandler(new ChannelInitializer<Channel>() {
@Override
@ -695,7 +695,7 @@ public class InboundHttp2ToHttpAdapterTest {
}
});
cb.group(new DefaultEventLoopGroup());
cb.group(new LocalEventLoopGroup());
cb.channel(LocalChannel.class);
cb.handler(new ChannelInitializer<Channel>() {
@Override

View File

@ -20,7 +20,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
@ -40,7 +40,7 @@ public final class LocalEcho {
// Address to bind on / connect to.
final LocalAddress addr = new LocalAddress(PORT);
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
EventLoopGroup serverGroup = new LocalEventLoopGroup();
EventLoopGroup clientGroup = new NioEventLoopGroup(); // NIO event loops are also OK
try {
// Note that we can use any event loop to ensure certain local channels

View File

@ -24,7 +24,7 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.local.LocalAddress;
@ -75,7 +75,7 @@ public class CipherSuiteCanaryTest {
@BeforeClass
public static void init() throws Exception {
GROUP = new DefaultEventLoopGroup();
GROUP = new LocalEventLoopGroup();
CERT = new SelfSignedCertificate();
}

View File

@ -22,7 +22,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
@ -79,7 +79,7 @@ final class SniClientJava8TestUtil {
final String sniHost = "sni.netty.io";
SelfSignedCertificate cert = new SelfSignedCertificate();
LocalAddress address = new LocalAddress("test");
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
SslContext sslServerContext = null;
SslContext sslClientContext = null;

View File

@ -20,7 +20,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
@ -29,7 +29,6 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import io.netty.util.Mapping;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;
import org.junit.Assert;
@ -96,7 +95,7 @@ public class SniClientTest {
private static void testSniClient(SslProvider sslServerProvider, SslProvider sslClientProvider) throws Exception {
String sniHostName = "sni.netty.io";
LocalAddress address = new LocalAddress("test");
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
SelfSignedCertificate cert = new SelfSignedCertificate();
SslContext sslServerContext = null;
SslContext sslClientContext = null;

View File

@ -27,7 +27,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalAddress;
@ -418,7 +418,7 @@ public class SniHandlerTest {
case OPENSSL_REFCNT:
final String sniHost = "sni.netty.io";
LocalAddress address = new LocalAddress("testReplaceHandler-" + Math.random());
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
Channel sc = null;
Channel cc = null;
SslContext sslContext = null;

View File

@ -32,7 +32,7 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelId;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
@ -451,7 +451,7 @@ public class SslHandlerTest {
final BlockingQueue<Object> events = new LinkedBlockingQueue<Object>();
Channel serverChannel = null;
Channel clientChannel = null;
EventLoopGroup group = new DefaultEventLoopGroup();
EventLoopGroup group = new LocalEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
@ -614,7 +614,7 @@ public class SslHandlerTest {
.trustManager(new SelfSignedCertificate().cert())
.build();
EventLoopGroup group = new NioEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
Channel sc = null;
Channel cc = null;
try {

View File

@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
@ -344,7 +344,7 @@ public class OcspTest {
.build();
try {
EventLoopGroup group = new DefaultEventLoopGroup();
EventLoopGroup group = new LocalEventLoopGroup();
try {
LocalAddress address = new LocalAddress("handshake-" + Math.random());
Channel server = newServer(group, address, serverSslContext, response, serverHandler);

View File

@ -15,8 +15,8 @@
*/
package io.netty.microbench.concurrent;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.microbench.util.AbstractMicrobenchmark;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ScheduledFuture;
@ -36,7 +36,7 @@ import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
public class ScheduledFutureTaskBenchmark extends AbstractMicrobenchmark {
static final EventLoop executor = new DefaultEventLoop();
static final EventLoopGroup executor = new LocalEventLoopGroup(1);
@State(Scope.Thread)
public static class FuturesHolder {

View File

@ -15,7 +15,7 @@
*/
package io.netty.resolver.dns;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -25,7 +25,6 @@ import org.junit.Test;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -39,7 +38,7 @@ public class DefaultAuthoritativeDnsServerCacheTest {
InetAddress.getByAddress("ns1", new byte[] { 10, 0, 0, 1 }), 53);
InetSocketAddress resolved2 = new InetSocketAddress(
InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53);
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -91,7 +90,7 @@ public class DefaultAuthoritativeDnsServerCacheTest {
InetAddress.getByAddress("ns1", new byte[] { 10, 0, 0, 1 }), 53);
InetSocketAddress resolved2 = new InetSocketAddress(
InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53);
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -115,7 +114,7 @@ public class DefaultAuthoritativeDnsServerCacheTest {
InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53);
InetSocketAddress resolved2 = new InetSocketAddress(
InetAddress.getByAddress("ns1", new byte[] { 10, 0, 0, 1 }), 53);
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -153,7 +152,7 @@ public class DefaultAuthoritativeDnsServerCacheTest {
InetSocketAddress unresolved = InetSocketAddress.createUnresolved("ns1", 53);
InetSocketAddress resolved = new InetSocketAddress(
InetAddress.getByAddress("ns2", new byte[] { 10, 0, 0, 2 }), 53);
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();

View File

@ -15,7 +15,7 @@
*/
package io.netty.resolver.dns;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@ -40,7 +40,7 @@ public class DefaultDnsCacheTest {
public void testExpire() throws Throwable {
InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 });
InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 });
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -103,7 +103,7 @@ public class DefaultDnsCacheTest {
public void testAddMultipleAddressesForSameHostname() throws Exception {
InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 });
InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 });
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -123,7 +123,7 @@ public class DefaultDnsCacheTest {
@Test
public void testAddSameAddressForSameHostname() throws Exception {
InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 });
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -148,7 +148,7 @@ public class DefaultDnsCacheTest {
public void testCacheFailed() throws Exception {
InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 });
InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 });
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -177,7 +177,7 @@ public class DefaultDnsCacheTest {
public void testDotHandling() throws Exception {
InetAddress addr1 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 1 });
InetAddress addr2 = InetAddress.getByAddress(new byte[] { 10, 0, 0, 2 });
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();

View File

@ -15,7 +15,7 @@
*/
package io.netty.resolver.dns;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import org.junit.Test;
@ -29,7 +29,7 @@ public class DefaultDnsCnameCacheTest {
@Test
public void testExpire() throws Throwable {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -63,7 +63,7 @@ public class DefaultDnsCnameCacheTest {
}
private static void testExpireWithTTL0(int days) {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -77,7 +77,7 @@ public class DefaultDnsCnameCacheTest {
@Test
public void testMultipleCnamesForSameHostname() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -93,7 +93,7 @@ public class DefaultDnsCnameCacheTest {
@Test
public void testAddSameCnameForSameHostname() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();
@ -109,7 +109,7 @@ public class DefaultDnsCnameCacheTest {
@Test
public void testClear() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
EventLoop loop = group.next();

View File

@ -68,6 +68,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private EpollRegistration registration;
private volatile SocketAddress local;
private volatile SocketAddress remote;
@ -126,6 +127,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
}
protected EpollRegistration registration() {
assert registration != null;
return registration;
}
boolean isFlagSet(int flag) {
return (flags & flag) != 0;
}
@ -199,19 +205,23 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
doClose();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}
@Override
public boolean isOpen() {
return socket.isOpen();
}
@Override
protected void doDeregister() throws Exception {
((EpollEventLoop) eventLoop()).remove(this);
void register0(EpollRegistration registration) throws Exception {
// Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
// make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
epollInReadyRunnablePending = false;
this.registration = registration;
}
void deregister0() throws Exception {
if (registration != null) {
registration.remove();
}
}
@Override
@ -268,20 +278,11 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
}
private void modifyEvents() throws IOException {
if (isOpen() && isRegistered()) {
((EpollEventLoop) eventLoop()).modify(this);
if (isOpen() && isRegistered() && registration != null) {
registration.update();
}
}
@Override
protected void doRegister() throws Exception {
// Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
// make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
epollInReadyRunnablePending = false;
((EpollEventLoop) eventLoop()).add(this);
}
@Override
protected abstract AbstractEpollUnsafe newUnsafe();

View File

@ -58,11 +58,6 @@ public abstract class AbstractEpollServerChannel extends AbstractEpollChannel im
return METADATA;
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}
@Override
protected InetSocketAddress remoteAddress0() {
return null;

View File

@ -513,7 +513,7 @@ public abstract class AbstractEpollStreamChannel extends AbstractEpollChannel im
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
IovArray array = registration().cleanIovArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);

View File

@ -271,7 +271,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
try {
// Check if sendmmsg(...) is supported which is only the case for GLIBC 2.14+
if (Native.IS_SUPPORTING_SENDMMSG && in.size() > 1) {
NativeDatagramPacketArray array = ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
NativeDatagramPacketArray array = registration().cleanDatagramPacketArray();
in.forEachFlushedMessage(array);
int cnt = array.count();
@ -349,7 +349,7 @@ public final class EpollDatagramChannel extends AbstractEpollChannel implements
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
IovArray array = registration().cleanIovArray();
array.add(data);
int cnt = array.count();
assert cnt != 0;

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.epoll;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SelectStrategy;
@ -28,6 +29,7 @@ import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -79,6 +81,49 @@ class EpollEventLoop extends SingleThreadEventLoop {
// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;
private static AbstractEpollChannel cast(Channel channel) {
if (channel instanceof AbstractEpollChannel) {
return (AbstractEpollChannel) channel;
}
throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported");
}
private final Unsafe unsafe = new Unsafe() {
@Override
public void register(Channel channel) throws Exception {
assert inEventLoop();
final AbstractEpollChannel epollChannel = cast(channel);
epollChannel.register0(new EpollRegistration() {
@Override
public void update() throws IOException {
EpollEventLoop.this.modify(epollChannel);
}
@Override
public void remove() throws IOException {
EpollEventLoop.this.remove(epollChannel);
}
@Override
public IovArray cleanIovArray() {
return EpollEventLoop.this.cleanIovArray();
}
@Override
public NativeDatagramPacketArray cleanDatagramPacketArray() {
return EpollEventLoop.this.cleanDatagramPacketArray();
}
});
add(epollChannel);
}
@Override
public void deregister(Channel channel) throws Exception {
assert inEventLoop();
cast(channel).deregister0();
}
};
EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
@ -160,6 +205,11 @@ class EpollEventLoop extends SingleThreadEventLoop {
return datagramPacketArray;
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.compareAndSet(this, 0, 1)) {

View File

@ -0,0 +1,46 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;
import io.netty.channel.unix.IovArray;
import java.io.IOException;
/**
* Registration with an {@link EpollEventLoop}.
*/
interface EpollRegistration {
/**
* Update the registration as some flags did change.
*/
void update() throws IOException;
/**
* Remove the registration. No more IO will be handled for it.
*/
void remove() throws IOException;
/**
* Returns an {@link IovArray} that can be used for {@code writev}.
*/
IovArray cleanIovArray();
/**
* Returns a {@link NativeDatagramPacketArray} that can used for {@code sendmmsg}.
*/
NativeDatagramPacketArray cleanDatagramPacketArray();
}

View File

@ -61,11 +61,6 @@ public final class EpollServerSocketChannel extends AbstractEpollServerChannel i
config = new EpollServerSocketChannelConfig(this);
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);

View File

@ -122,7 +122,7 @@ public final class EpollSocketChannel extends AbstractEpollStreamChannel impleme
// because we try to read or write until the actual close happens which may be later due
// SO_LINGER handling.
// See https://github.com/netty/netty/issues/4449
((EpollEventLoop) eventLoop()).remove(EpollSocketChannel.this);
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {

View File

@ -63,12 +63,14 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
private ChannelPromise connectPromise;
private ScheduledFuture<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
private KQueueRegistration registration;
final BsdSocket socket;
private boolean readFilterEnabled;
private boolean writeFilterEnabled;
boolean readReadyRunnablePending;
boolean inputClosedSeenErrorOnRead;
protected volatile boolean active;
private volatile SocketAddress local;
private volatile SocketAddress remote;
@ -103,6 +105,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
}
protected KQueueRegistration registration() {
assert registration != null;
return registration;
}
@Override
public final FileDescriptor fd() {
return socket;
@ -160,26 +167,11 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
doClose();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof KQueueEventLoop;
}
@Override
public boolean isOpen() {
return socket.isOpen();
}
@Override
protected void doDeregister() throws Exception {
// Make sure we unregister our filters from kqueue!
readFilter(false);
writeFilter(false);
evSet0(Native.EVFILT_SOCK, Native.EV_DELETE, 0);
((KQueueEventLoop) eventLoop()).remove(this);
}
@Override
protected final void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
@ -198,23 +190,31 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
}
}
@Override
protected void doRegister() throws Exception {
void register0(KQueueRegistration registration) {
this.registration = registration;
// Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
// make sure the readReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
readReadyRunnablePending = false;
((KQueueEventLoop) eventLoop()).add(this);
// Add the write event first so we get notified of connection refused on the client side!
if (writeFilterEnabled) {
evSet0(Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
evSet0(registration, Native.EVFILT_WRITE, Native.EV_ADD_CLEAR_ENABLE);
}
if (readFilterEnabled) {
evSet0(Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
evSet0(registration, Native.EVFILT_READ, Native.EV_ADD_CLEAR_ENABLE);
}
evSet0(registration, Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
}
void deregister0() throws IOException {
// Make sure we unregister our filters from kqueue!
readFilter(false);
writeFilter(false);
if (registration != null) {
evSet0(registration, Native.EVFILT_SOCK, Native.EV_DELETE, 0);
registration = null;
}
evSet0(Native.EVFILT_SOCK, Native.EV_ADD, Native.NOTE_RDHUP);
}
@Override
@ -360,16 +360,16 @@ abstract class AbstractKQueueChannel extends AbstractChannel implements UnixChan
private void evSet(short filter, short flags) {
if (isOpen() && isRegistered()) {
evSet0(filter, flags);
evSet0(registration, filter, flags);
}
}
private void evSet0(short filter, short flags) {
evSet0(filter, flags, 0);
private void evSet0(KQueueRegistration registration, short filter, short flags) {
evSet0(registration, filter, flags, 0);
}
private void evSet0(short filter, short flags, int fflags) {
((KQueueEventLoop) eventLoop()).evSet(this, filter, flags, fflags);
private void evSet0(KQueueRegistration registration, short filter, short flags, int fflags) {
registration.evSet(filter, flags, fflags);
}
abstract class AbstractKQueueUnsafe extends AbstractUnsafe {

View File

@ -53,11 +53,6 @@ public abstract class AbstractKQueueServerChannel extends AbstractKQueueChannel
return METADATA;
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof KQueueEventLoop;
}
@Override
protected InetSocketAddress remoteAddress0() {
return null;

View File

@ -344,7 +344,7 @@ public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
IovArray array = registration().cleanArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);

View File

@ -321,7 +321,7 @@ public final class KQueueDatagramChannel extends AbstractKQueueChannel implement
remoteAddress.getAddress(), remoteAddress.getPort());
}
} else if (data.nioBufferCount() > 1) {
IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
IovArray array = registration().cleanArray();
array.add(data);
int cnt = array.count();
assert cnt != 0;

View File

@ -15,6 +15,7 @@
*/
package io.netty.channel.kqueue;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SelectStrategy;
@ -28,6 +29,7 @@ import io.netty.util.collection.IntObjectMap;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -70,6 +72,43 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
private volatile int wakenUp;
private volatile int ioRatio = 50;
private static AbstractKQueueChannel cast(Channel channel) {
if (channel instanceof AbstractKQueueChannel) {
return (AbstractKQueueChannel) channel;
}
throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported");
}
private final Unsafe unsafe = new Unsafe() {
@Override
public void register(Channel channel) {
assert inEventLoop();
final AbstractKQueueChannel kQueueChannel = cast(channel);
final int id = kQueueChannel.fd().intValue();
channels.put(id, kQueueChannel);
kQueueChannel.register0(new KQueueRegistration() {
@Override
public void evSet(short filter, short flags, int fflags) {
KQueueEventLoop.this.evSet(kQueueChannel, filter, flags, fflags);
}
@Override
public IovArray cleanArray() {
return KQueueEventLoop.this.cleanArray();
}
});
}
@Override
public void deregister(Channel channel) throws Exception {
assert inEventLoop();
AbstractKQueueChannel kQueueChannel = cast(channel);
channels.remove(kQueueChannel.fd().intValue());
kQueueChannel.deregister0();
}
};
KQueueEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
@ -90,24 +129,19 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
}
}
void add(AbstractKQueueChannel ch) {
assert inEventLoop();
channels.put(ch.fd().intValue(), ch);
@Override
public Unsafe unsafe() {
return unsafe;
}
void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
private void evSet(AbstractKQueueChannel ch, short filter, short flags, int fflags) {
changeList.evSet(ch, filter, flags, fflags);
}
void remove(AbstractKQueueChannel ch) {
assert inEventLoop();
channels.remove(ch.fd().intValue());
}
/**
* Return a cleared {@link IovArray} that can be used for writes in this {@link EventLoop}.
*/
IovArray cleanArray() {
private IovArray cleanArray() {
iovArray.clear();
return iovArray;
}
@ -167,7 +201,7 @@ final class KQueueEventLoop extends SingleThreadEventLoop {
// This may happen if the channel has already been closed, and it will be removed from kqueue anyways.
// We also handle EV_ERROR above to skip this even early if it is a result of a referencing a closed and
// thus removed from kqueue FD.
logger.warn("events[{}]=[{}, {}] had no channel!", i, eventList.fd(i), filter);
logger.warn("events[{}]=[{}, {}] had no channel!", i, fd, filter);
continue;
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.kqueue;
import io.netty.channel.unix.IovArray;
/**
* Registration with an {@link KQueueEventLoop}.
*/
interface KQueueRegistration {
/**
* Update the event-set for the registration.
*/
void evSet(short filter, short flags, int fflags);
/**
* Returns an {@link IovArray} that can be used for {@code writev}.
*/
IovArray cleanArray();
}

View File

@ -52,11 +52,6 @@ public final class KQueueServerSocketChannel extends AbstractKQueueServerChannel
config = new KQueueServerSocketChannelConfig(this);
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof KQueueEventLoop;
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
super.doBind(localAddress);

View File

@ -80,7 +80,7 @@ public final class KQueueSocketChannel extends AbstractKQueueStreamChannel imple
// because we try to read or write until the actual close happens which may be later due
// SO_LINGER handling.
// See https://github.com/netty/netty/issues/4449
((KQueueEventLoop) eventLoop()).remove(KQueueSocketChannel.this);
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {

View File

@ -38,11 +38,6 @@ final class FailedChannel extends AbstractChannel {
return new FailedChannelUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return null;

View File

@ -102,10 +102,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
private EventLoop validateEventLoop(EventLoop eventLoop) {
if (!isCompatible(ObjectUtil.checkNotNull(eventLoop, "eventLoop"))) {
throw new IllegalArgumentException("incompatible event loop type: " + eventLoop.getClass().getName());
}
return eventLoop;
return ObjectUtil.checkNotNull(eventLoop, "eventLoop");
}
@Override
@ -1035,11 +1032,6 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
}
}
/**
* Return {@code true} if the given {@link EventLoop} is compatible with this instance.
*/
protected abstract boolean isCompatible(EventLoop loop);
/**
* Returns the {@link SocketAddress} which is bound locally.
*/
@ -1056,7 +1048,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* Sub-classes may override this method
*/
protected void doRegister() throws Exception {
// NOOP
eventLoop().unsafe().register(this);
}
/**
@ -1089,7 +1081,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
* Sub-classes may override this method
*/
protected void doDeregister() throws Exception {
// NOOP
eventLoop().unsafe().deregister(this);
}
/**

View File

@ -1,63 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
public class DefaultEventLoop extends SingleThreadEventLoop {
public DefaultEventLoop() {
this((EventLoopGroup) null);
}
public DefaultEventLoop(ThreadFactory threadFactory) {
this(null, threadFactory);
}
public DefaultEventLoop(Executor executor) {
this(null, executor);
}
public DefaultEventLoop(EventLoopGroup parent) {
this(parent, new DefaultThreadFactory(DefaultEventLoop.class));
}
public DefaultEventLoop(EventLoopGroup parent, ThreadFactory threadFactory) {
super(parent, threadFactory, true);
}
public DefaultEventLoop(EventLoopGroup parent, Executor executor) {
super(parent, executor, true);
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
}

View File

@ -1,66 +0,0 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
*/
public class DefaultEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance with the default number of threads.
*/
public DefaultEventLoopGroup() {
this(0);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
*/
public DefaultEventLoopGroup(int nThreads) {
this(nThreads, (ThreadFactory) null);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param threadFactory the {@link ThreadFactory} or {@code null} to use the default
*/
public DefaultEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param executor the Executor to use, or {@code null} if the default should be used.
*/
public DefaultEventLoopGroup(int nThreads, Executor executor) {
super(nThreads, executor);
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new DefaultEventLoop(this, executor);
}
}

View File

@ -27,4 +27,25 @@ import io.netty.util.concurrent.OrderedEventExecutor;
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
@Override
EventLoopGroup parent();
/**
* Returns an <em>internal-use-only</em> object that provides unsafe operations.
*/
Unsafe unsafe();
/**
* <em>Unsafe</em> operations that should <em>never</em> be called from user-code. These methods
* are only provided to implement the actual transport, and must be invoked from the {@link EventLoop} itself.
*/
interface Unsafe {
/**
* Register the {@link Channel} to the {@link EventLoop}.
*/
void register(Channel channel) throws Exception;
/**
* Deregister the {@link Channel} from the {@link EventLoop}.
*/
void deregister(Channel channel) throws Exception;
}
}

View File

@ -18,7 +18,6 @@ package io.netty.channel;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.RejectedExecutionHandlers;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.SystemPropertyUtil;
import java.util.concurrent.Executor;

View File

@ -673,11 +673,6 @@ public class EmbeddedChannel extends AbstractChannel {
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EmbeddedEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return isActive()? LOCAL_ADDRESS : null;
@ -688,8 +683,7 @@ public class EmbeddedChannel extends AbstractChannel {
return isActive()? REMOTE_ADDRESS : null;
}
@Override
protected void doRegister() throws Exception {
void setActive() {
state = State.ACTIVE;
}

View File

@ -15,10 +15,12 @@
*/
package io.netty.channel.embedded;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.AbstractScheduledEventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.internal.StringUtil;
import java.util.ArrayDeque;
import java.util.Queue;
@ -28,6 +30,31 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements
private final Queue<Runnable> tasks = new ArrayDeque<Runnable>(2);
private static EmbeddedChannel cast(Channel channel) {
if (channel instanceof EmbeddedChannel) {
return (EmbeddedChannel) channel;
}
throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported");
}
private final Unsafe unsafe = new Unsafe() {
@Override
public void register(Channel channel) {
assert inEventLoop();
cast(channel).setActive();
}
@Override
public void deregister(Channel channel) {
assert inEventLoop();
}
};
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
public EventLoopGroup parent() {
return (EventLoopGroup) super.parent();

View File

@ -26,10 +26,8 @@ import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import io.netty.util.internal.InternalThreadLocalMap;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ThrowableUtil;
@ -144,11 +142,6 @@ public class LocalChannel extends AbstractChannel {
return new LocalUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleThreadEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return localAddress;
@ -159,43 +152,6 @@ public class LocalChannel extends AbstractChannel {
return remoteAddress;
}
@Override
protected void doRegister() throws Exception {
// Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
// This is needed as a peer may not be null also if a LocalChannel was connected before and
// deregistered / registered later again.
//
// See https://github.com/netty/netty/issues/2400
if (peer != null && parent() != null) {
// Store the peer in a local variable as it may be set to null if doClose() is called.
// See https://github.com/netty/netty/issues/2144
final LocalChannel peer = this.peer;
state = State.CONNECTED;
peer.remoteAddress = parent() == null ? null : parent().localAddress();
peer.state = State.CONNECTED;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelActive
// event is triggered *after* this channel's channelRegistered event, so that this channel's
// pipeline is fully initialized by ChannelInitializer before any channelRead events.
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
ChannelPromise promise = peer.connectPromise;
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
// connectPromise may be set to null if doClose() was called in the meantime.
if (promise != null && promise.trySuccess()) {
peer.pipeline().fireChannelActive();
peer.readIfIsAutoRead();
}
}
});
}
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
this.localAddress =
@ -294,12 +250,6 @@ public class LocalChannel extends AbstractChannel {
}
}
@Override
protected void doDeregister() throws Exception {
// Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
}
private void readInbound() {
RecvByteBufAllocator.Handle handle = unsafe().recvBufAllocHandle();
handle.reset(config());
@ -456,7 +406,7 @@ public class LocalChannel extends AbstractChannel {
}
}
private class LocalUnsafe extends AbstractUnsafe {
private class LocalUnsafe extends AbstractUnsafe implements LocalChannelUnsafe {
@Override
public void connect(final SocketAddress remoteAddress,
@ -506,5 +456,48 @@ public class LocalChannel extends AbstractChannel {
LocalServerChannel serverChannel = (LocalServerChannel) boundChannel;
peer = serverChannel.serve(LocalChannel.this);
}
@Override
public void register0(LocalEventLoop eventLoop) {
// Check if both peer and parent are non-null because this channel was created by a LocalServerChannel.
// This is needed as a peer may not be null also if a LocalChannel was connected before and
// deregistered / registered later again.
//
// See https://github.com/netty/netty/issues/2400
if (peer != null && parent() != null) {
// Store the peer in a local variable as it may be set to null if doClose() is called.
// See https://github.com/netty/netty/issues/2144
final LocalChannel peer = LocalChannel.this.peer;
state = State.CONNECTED;
peer.remoteAddress = parent() == null ? null : parent().localAddress();
peer.state = State.CONNECTED;
// Always call peer.eventLoop().execute() even if peer.eventLoop().inEventLoop() is true.
// This ensures that if both channels are on the same event loop, the peer's channelActive
// event is triggered *after* this channel's channelRegistered event, so that this channel's
// pipeline is fully initialized by ChannelInitializer before any channelRead events.
peer.eventLoop().execute(new Runnable() {
@Override
public void run() {
ChannelPromise promise = peer.connectPromise;
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
// connectPromise may be set to null if doClose() was called in the meantime.
if (promise != null && promise.trySuccess()) {
peer.pipeline().fireChannelActive();
peer.readIfIsAutoRead();
}
}
});
}
eventLoop.addShutdownHook(shutdownHook);
}
@Override
public void deregister0(LocalEventLoop eventLoop) {
// Just remove the shutdownHook as this Channel may be closed later or registered to another EventLoop
eventLoop.removeShutdownHook(shutdownHook);
}
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright 2018 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.Channel;
interface LocalChannelUnsafe extends Channel.Unsafe {
void register0(LocalEventLoop eventLoop);
void deregister0(LocalEventLoop eventLoop);
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.local;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.internal.StringUtil;
import java.util.concurrent.Executor;
class LocalEventLoop extends SingleThreadEventLoop {
private static LocalChannelUnsafe cast(Channel channel) {
Channel.Unsafe unsafe = channel.unsafe();
if (unsafe instanceof LocalChannelUnsafe) {
return (LocalChannelUnsafe) unsafe;
}
throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported");
}
private final Unsafe unsafe = new Unsafe() {
@Override
public void register(Channel channel) {
assert inEventLoop();
cast(channel).register0(LocalEventLoop.this);
}
@Override
public void deregister(Channel channel) {
assert inEventLoop();
cast(channel).deregister0(LocalEventLoop.this);
}
};
LocalEventLoop(EventLoopGroup parent, Executor executor) {
super(parent, executor, true);
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
}

View File

@ -15,20 +15,23 @@
*/
package io.netty.channel.local;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.MultithreadEventLoopGroup;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
/**
* @deprecated Use {@link DefaultEventLoopGroup} instead.
* {@link MultithreadEventLoopGroup} which must be used for the local transport.
*/
@Deprecated
public class LocalEventLoopGroup extends DefaultEventLoopGroup {
public class LocalEventLoopGroup extends MultithreadEventLoopGroup {
/**
* Create a new instance with the default number of threads.
*/
public LocalEventLoopGroup() { }
public LocalEventLoopGroup() {
this(0);
}
/**
* Create a new instance
@ -36,7 +39,7 @@ public class LocalEventLoopGroup extends DefaultEventLoopGroup {
* @param nThreads the number of threads to use
*/
public LocalEventLoopGroup(int nThreads) {
super(nThreads);
this(nThreads, (ThreadFactory) null);
}
/**
@ -48,4 +51,20 @@ public class LocalEventLoopGroup extends DefaultEventLoopGroup {
public LocalEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
super(nThreads, threadFactory);
}
/**
* Create a new instance
*
* @param nThreads the number of threads to use
* @param executor the Executor to use, or {@code null} if the default should be used.
*/
public LocalEventLoopGroup(int nThreads, Executor executor) {
super(nThreads, executor);
}
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
assert args == null || args.length == 0;
return new LocalEventLoop(this, executor);
}
}

View File

@ -18,14 +18,13 @@ package io.netty.channel.local;
import io.netty.channel.AbstractServerChannel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelConfig;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.PreferHeapByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.ServerChannel;
import io.netty.channel.SingleThreadEventLoop;
import io.netty.util.concurrent.SingleThreadEventExecutor;
import java.net.SocketAddress;
import java.util.ArrayDeque;
@ -79,21 +78,11 @@ public class LocalServerChannel extends AbstractServerChannel {
return state == 1;
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof SingleThreadEventLoop;
}
@Override
protected SocketAddress localAddress0() {
return localAddress;
}
@Override
protected void doRegister() throws Exception {
((SingleThreadEventExecutor) eventLoop()).addShutdownHook(shutdownHook);
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
this.localAddress = LocalChannelRegistry.register(this, this.localAddress, localAddress);
@ -112,11 +101,6 @@ public class LocalServerChannel extends AbstractServerChannel {
}
}
@Override
protected void doDeregister() throws Exception {
((SingleThreadEventExecutor) eventLoop()).removeShutdownHook(shutdownHook);
}
@Override
protected void doBeginRead() throws Exception {
if (acceptInProgress) {
@ -179,4 +163,26 @@ public class LocalServerChannel extends AbstractServerChannel {
readInbound();
}
}
@Override
protected AbstractUnsafe newUnsafe() {
return new DefaultServerUnsafe();
}
private final class DefaultServerUnsafe extends AbstractUnsafe implements LocalChannelUnsafe {
@Override
public void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
safeSetFailure(promise, new UnsupportedOperationException());
}
@Override
public void register0(LocalEventLoop eventLoop) {
eventLoop.addShutdownHook(shutdownHook);
}
@Override
public void deregister0(LocalEventLoop eventLoop) {
eventLoop.removeShutdownHook(shutdownHook);
}
}
}

View File

@ -35,7 +35,6 @@ import io.netty.util.internal.logging.InternalLoggerFactory;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ConnectionPendingException;
import java.nio.channels.SelectableChannel;
@ -115,11 +114,6 @@ public abstract class AbstractNioChannel extends AbstractChannel {
return ch;
}
@Override
public NioEventLoop eventLoop() {
return (NioEventLoop) super.eventLoop();
}
/**
* Return the current {@link SelectionKey}
*/
@ -375,36 +369,14 @@ public abstract class AbstractNioChannel extends AbstractChannel {
}
}
@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof NioEventLoop;
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
eventLoop().unsafe().register(this);
}
@Override
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
eventLoop().unsafe().deregister(this);
}
@Override

View File

@ -17,7 +17,6 @@ package io.netty.channel.nio;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopException;
import io.netty.channel.SelectStrategy;
import io.netty.channel.SingleThreadEventLoop;
@ -25,6 +24,7 @@ import io.netty.util.IntSupplier;
import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.ReflectionUtil;
import io.netty.util.internal.StringUtil;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
@ -116,6 +116,46 @@ public final class NioEventLoop extends SingleThreadEventLoop {
private final SelectorProvider provider;
private static AbstractNioChannel cast(Channel channel) {
if (channel instanceof AbstractNioChannel) {
return (AbstractNioChannel) channel;
}
throw new IllegalArgumentException("Channel of type " + StringUtil.simpleClassName(channel) + " not supported");
}
private final Unsafe unsafe = new Unsafe() {
@Override
public void register(Channel channel) throws Exception {
assert inEventLoop();
AbstractNioChannel nioChannel = cast(channel);
boolean selected = false;
for (;;) {
try {
nioChannel.selectionKey = nioChannel.javaChannel().register(unwrappedSelector(), 0, nioChannel);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
@Override
public void deregister(Channel channel) {
assert inEventLoop();
AbstractNioChannel nioChannel = cast(channel);
cancel(nioChannel.selectionKey());
}
};
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
@ -263,6 +303,11 @@ public final class NioEventLoop extends SingleThreadEventLoop {
return provider;
}
@Override
public Unsafe unsafe() {
return unsafe;
}
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()

View File

@ -26,7 +26,7 @@ import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@ -59,8 +59,8 @@ import static org.junit.Assert.*;
public class BootstrapTest {
private static final EventLoopGroup groupA = new DefaultEventLoopGroup(1);
private static final EventLoopGroup groupB = new DefaultEventLoopGroup(1);
private static final EventLoopGroup groupA = new LocalEventLoopGroup(1);
private static final EventLoopGroup groupB = new LocalEventLoopGroup(1);
private static final ChannelInboundHandler dummyHandler = new DummyHandler();
@AfterClass
@ -145,7 +145,7 @@ public class BootstrapTest {
@Test
public void testLateRegisterSuccess() throws Exception {
DefaultEventLoopGroup group = new DefaultEventLoopGroup(1);
LocalEventLoopGroup group = new LocalEventLoopGroup(1);
try {
LateRegisterHandler registerHandler = new LateRegisterHandler();
ServerBootstrap bootstrap = new ServerBootstrap();
@ -175,7 +175,7 @@ public class BootstrapTest {
@Test
public void testLateRegisterSuccessBindFailed() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
LateRegisterHandler registerHandler = new LateRegisterHandler();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
@ -226,7 +226,7 @@ public class BootstrapTest {
@Test(expected = ConnectException.class, timeout = 10000)
public void testLateRegistrationConnect() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
LateRegisterHandler registerHandler = new LateRegisterHandler();
try {
final Bootstrap bootstrapA = new Bootstrap();

View File

@ -21,11 +21,10 @@ import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import org.junit.Test;
@ -97,7 +96,7 @@ public class ServerBootstrapTest {
}
};
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
Channel sch = null;
Channel cch = null;
try {

View File

@ -31,6 +31,7 @@ public class AbstractChannelTest {
EventLoop eventLoop = mock(EventLoop.class);
// This allows us to have a single-threaded test
when(eventLoop.inEventLoop()).thenReturn(true);
when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class));
TestChannel channel = new TestChannel(eventLoop);
ChannelInboundHandler handler = mock(ChannelInboundHandler.class);
@ -48,6 +49,7 @@ public class AbstractChannelTest {
final EventLoop eventLoop = mock(EventLoop.class);
// This allows us to have a single-threaded test
when(eventLoop.inEventLoop()).thenReturn(true);
when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class));
doAnswer(new Answer() {
@Override
@ -126,11 +128,6 @@ public class AbstractChannelTest {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return null;

View File

@ -20,6 +20,7 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
@ -34,7 +35,7 @@ class BaseChannelTest {
}
ServerBootstrap getLocalServerBootstrap() {
EventLoopGroup serverGroup = new DefaultEventLoopGroup();
EventLoopGroup serverGroup = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(serverGroup);
sb.channel(LocalServerChannel.class);
@ -48,7 +49,7 @@ class BaseChannelTest {
}
Bootstrap getLocalClientBootstrap() {
EventLoopGroup clientGroup = new DefaultEventLoopGroup();
EventLoopGroup clientGroup = new LocalEventLoopGroup();
Bootstrap cb = new Bootstrap();
cb.channel(LocalChannel.class);
cb.group(clientGroup);

View File

@ -18,9 +18,11 @@ package io.netty.channel;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import org.junit.After;
@ -54,7 +56,7 @@ public class ChannelInitializerTest {
@Before
public void setUp() {
group = new DefaultEventLoopGroup(1);
group = new LocalEventLoopGroup(1);
server = new ServerBootstrap()
.group(group)
.channel(LocalServerChannel.class)
@ -264,7 +266,7 @@ public class ChannelInitializerTest {
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
LocalAddress addr = new LocalAddress("test");
final EventExecutor executor = new DefaultEventLoop() {
final EventExecutor executor = new AbstractEventExecutor() {
private final ScheduledExecutorService execService = Executors.newSingleThreadScheduledExecutor();
@Override

View File

@ -18,6 +18,7 @@ package io.netty.channel;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.RejectedExecutionHandlers;
@ -137,7 +138,7 @@ public class ChannelOutboundBufferTest {
private final ChannelConfig config = new DefaultChannelConfig(this);
TestChannel() {
super(null, new DefaultEventLoop());
super(null, new LocalEventLoopGroup(1).next());
}
@Override
@ -145,11 +146,6 @@ public class ChannelOutboundBufferTest {
return new TestUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
throw new UnsupportedOperationException();

View File

@ -19,25 +19,23 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.channel.local.LocalEventLoopGroup;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import io.netty.bootstrap.Bootstrap;
public class DefaultChannelPipelineTailTest {
private static EventLoopGroup GROUP;
@BeforeClass
public static void init() {
GROUP = new DefaultEventLoopGroup(1);
GROUP = new LocalEventLoopGroup(1);
}
@AfterClass
@ -56,19 +54,12 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
myChannel.pipeline().fireChannelActive();
try {
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
channel.close();
myChannel.close();
}
}
@ -83,16 +74,8 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
channel.close().syncUninterruptibly();
myChannel.pipeline().fireChannelInactive();
myChannel.close().syncUninterruptibly();
assertTrue(latch.await(1L, TimeUnit.SECONDS));
}
@ -110,22 +93,13 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
try {
IOException ex = new IOException("testOnUnhandledInboundException");
channel.pipeline().fireExceptionCaught(ex);
myChannel.pipeline().fireExceptionCaught(ex);
assertTrue(latch.await(1L, TimeUnit.SECONDS));
assertSame(ex, causeRef.get());
} finally {
channel.close();
myChannel.close();
}
}
@ -140,20 +114,11 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
try {
channel.pipeline().fireChannelRead("testOnUnhandledInboundMessage");
myChannel.pipeline().fireChannelRead("testOnUnhandledInboundMessage");
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
channel.close();
myChannel.close();
}
}
@ -168,20 +133,11 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
try {
channel.pipeline().fireChannelReadComplete();
myChannel.pipeline().fireChannelReadComplete();
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
channel.close();
myChannel.close();
}
}
@ -196,20 +152,11 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
try {
channel.pipeline().fireUserEventTriggered("testOnUnhandledInboundUserEventTriggered");
myChannel.pipeline().fireUserEventTriggered("testOnUnhandledInboundUserEventTriggered");
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
channel.close();
myChannel.close();
}
}
@ -224,33 +171,11 @@ public class DefaultChannelPipelineTailTest {
}
};
Bootstrap bootstrap = new Bootstrap()
.channelFactory(new MyChannelFactory(myChannel))
.group(loop)
.handler(new ChannelInboundHandlerAdapter())
.remoteAddress(new InetSocketAddress(0));
Channel channel = bootstrap.connect()
.sync().channel();
try {
channel.pipeline().fireChannelWritabilityChanged();
myChannel.pipeline().fireChannelWritabilityChanged();
assertTrue(latch.await(1L, TimeUnit.SECONDS));
} finally {
channel.close();
}
}
private static class MyChannelFactory implements ChannelFactory<MyChannel> {
private final MyChannel channel;
public MyChannelFactory(MyChannel channel) {
this.channel = channel;
}
@Override
public MyChannel newChannel(EventLoop eventLoop) {
return channel;
myChannel.close();
}
}
@ -296,11 +221,6 @@ public class DefaultChannelPipelineTailTest {
return new MyUnsafe();
}
@Override
protected boolean isCompatible(EventLoop loop) {
return true;
}
@Override
protected SocketAddress localAddress0() {
return null;

View File

@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalServerChannel;
@ -69,7 +70,7 @@ import static org.junit.Assert.fail;
public class DefaultChannelPipelineTest {
private static final EventLoopGroup group = new DefaultEventLoopGroup(1);
private static final EventLoopGroup group = new LocalEventLoopGroup(1);
private Channel self;
private Channel peer;
@ -938,7 +939,7 @@ public class DefaultChannelPipelineTest {
@Test(timeout = 3000)
public void testAddBefore() throws Throwable {
EventLoopGroup defaultGroup = new DefaultEventLoopGroup(2);
EventLoopGroup defaultGroup = new LocalEventLoopGroup(2);
try {
EventLoop eventLoop1 = defaultGroup.next();
EventLoop eventLoop2 = defaultGroup.next();
@ -1034,7 +1035,7 @@ public class DefaultChannelPipelineTest {
@Test(timeout = 3000)
public void testUnorderedEventExecutor() throws Throwable {
EventExecutorGroup eventExecutors = new UnorderedThreadPoolEventExecutor(2);
EventLoopGroup defaultGroup = new DefaultEventLoopGroup(1);
EventLoopGroup defaultGroup = new LocalEventLoopGroup(1);
try {
EventLoop eventLoop1 = defaultGroup.next();
ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline();
@ -1103,7 +1104,7 @@ public class DefaultChannelPipelineTest {
@Test(timeout = 3000)
public void testVoidPromiseNotify() throws Throwable {
EventLoopGroup defaultGroup = new DefaultEventLoopGroup(1);
EventLoopGroup defaultGroup = new LocalEventLoopGroup(1);
EventLoop eventLoop1 = defaultGroup.next();
ChannelPipeline pipeline1 = new LocalChannel(eventLoop1).pipeline();
@ -1133,7 +1134,7 @@ public class DefaultChannelPipelineTest {
// Test for https://github.com/netty/netty/issues/8676.
@Test
public void testHandlerRemovedOnlyCalledWhenHandlerAddedCalled() throws Exception {
EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
try {
final AtomicReference<Error> errorRef = new AtomicReference<Error>();

View File

@ -484,6 +484,11 @@ public class SingleThreadEventLoopTest {
protected void cleanup() {
cleanedUp.incrementAndGet();
}
@Override
public Unsafe unsafe() {
return null;
}
}
private static class SingleThreadEventLoopB extends SingleThreadEventLoop {
@ -513,5 +518,20 @@ public class SingleThreadEventLoopTest {
protected void wakeup(boolean inEventLoop) {
interruptThread();
}
@Override
public Unsafe unsafe() {
return new Unsafe() {
@Override
public void register(Channel channel) {
// NOOP
}
@Override
public void deregister(Channel channel) {
// NOOP
}
};
}
}
}

View File

@ -28,7 +28,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
@ -71,9 +70,9 @@ public class LocalChannelTest {
@BeforeClass
public static void beforeClass() {
group1 = new DefaultEventLoopGroup(2);
group2 = new DefaultEventLoopGroup(2);
sharedGroup = new DefaultEventLoopGroup(1);
group1 = new LocalEventLoopGroup(2);
group2 = new LocalEventLoopGroup(2);
sharedGroup = new LocalEventLoopGroup(1);
}
@AfterClass
@ -228,11 +227,11 @@ public class LocalChannelTest {
@Test
public void localChannelRaceCondition() throws Exception {
final CountDownLatch closeLatch = new CountDownLatch(1);
final EventLoopGroup clientGroup = new DefaultEventLoopGroup(1) {
final EventLoopGroup clientGroup = new LocalEventLoopGroup(1) {
@Override
protected EventLoop newChild(Executor threadFactory, Object... args)
throws Exception {
return new SingleThreadEventLoop(this, threadFactory, true) {
return new LocalEventLoop(this, threadFactory) {
@Override
protected void run() {
for (;;) {

View File

@ -24,7 +24,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
@ -50,7 +49,7 @@ public class LocalTransportThreadModelTest {
@BeforeClass
public static void init() {
// Configure a test server
group = new DefaultEventLoopGroup();
group = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
@ -85,7 +84,7 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 5000)
public void testStagedExecution() throws Throwable {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
ThreadNameAuditor h1 = new ThreadNameAuditor();
@ -228,7 +227,7 @@ public class LocalTransportThreadModelTest {
@Test(timeout = 30000)
@Ignore
public void testConcurrentMessageBufferAccess() throws Throwable {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));

View File

@ -22,7 +22,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
@ -41,14 +40,14 @@ public class LocalTransportThreadModelTest2 {
ServerBootstrap serverBootstrap = new ServerBootstrap();
LocalHandler serverHandler = new LocalHandler("SERVER");
serverBootstrap
.group(new DefaultEventLoopGroup(), new DefaultEventLoopGroup())
.group(new LocalEventLoopGroup(), new LocalEventLoopGroup())
.channel(LocalServerChannel.class)
.childHandler(serverHandler);
Bootstrap clientBootstrap = new Bootstrap();
LocalHandler clientHandler = new LocalHandler("CLIENT");
clientBootstrap
.group(new DefaultEventLoopGroup())
.group(new LocalEventLoopGroup())
.channel(LocalChannel.class)
.remoteAddress(new LocalAddress(LOCAL_CHANNEL)).handler(clientHandler);

View File

@ -23,7 +23,6 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
@ -63,7 +62,7 @@ public class LocalTransportThreadModelTest3 {
@BeforeClass
public static void init() {
// Configure a test server
group = new DefaultEventLoopGroup();
group = new LocalEventLoopGroup();
ServerBootstrap sb = new ServerBootstrap();
sb.group(group)
.channel(LocalServerChannel.class)
@ -117,7 +116,7 @@ public class LocalTransportThreadModelTest3 {
}
private static void testConcurrentAddRemove(boolean inbound) throws Exception {
EventLoopGroup l = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
EventLoopGroup l = new LocalEventLoopGroup(4, new DefaultThreadFactory("l"));
EventExecutorGroup e1 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
EventExecutorGroup e2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
EventExecutorGroup e3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));

View File

@ -16,15 +16,19 @@
package io.netty.channel.socket.nio;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.AbstractNioChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.AbstractEventExecutor;
import io.netty.util.concurrent.Future;
import org.junit.Test;
import java.io.IOException;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.channels.NetworkChannel;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@ -84,4 +88,87 @@ public abstract class AbstractNioChannelTest<T extends AbstractNioChannel> {
eventLoopGroup.shutdownGracefully();
}
}
@Test
public void testWrapping() {
final NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
final EventLoop eventLoop = eventLoopGroup.next();
class WrappedEventLoop extends AbstractEventExecutor implements EventLoop {
private final EventLoop eventLoop;
WrappedEventLoop(EventLoop eventLoop) {
super(eventLoop.parent());
this.eventLoop = eventLoop;
}
@Test
public EventLoopGroup parent() {
return eventLoop.parent();
}
@Test
public EventLoop next() {
return this;
}
@Override
public Unsafe unsafe() {
return eventLoop.unsafe();
}
@Override
public void shutdown() {
eventLoop.shutdown();
}
@Override
public boolean inEventLoop(Thread thread) {
return eventLoop.inEventLoop(thread);
}
@Override
public boolean isShuttingDown() {
return eventLoop.isShuttingDown();
}
@Override
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
return eventLoop.shutdownGracefully(quietPeriod, timeout, unit);
}
@Override
public Future<?> terminationFuture() {
return eventLoop.terminationFuture();
}
@Override
public boolean isShutdown() {
return eventLoop.isShutdown();
}
@Override
public boolean isTerminated() {
return eventLoop.isTerminated();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return eventLoop.awaitTermination(timeout, unit);
}
@Override
public void execute(Runnable command) {
eventLoop.execute(command);
}
}
EventLoop wrapped = new WrappedEventLoop(eventLoop);
T channel = newNioChannel(wrapped);
channel.register().syncUninterruptibly();
assertSame(wrapped, channel.eventLoop());
channel.close().syncUninterruptibly();
eventLoopGroup.shutdownGracefully();
}
}