Java 8 migration: replace anonymous types with lambda (#8751)
Motivation: We can use lambdas instead of anonymous inner class to improve readablity Modification: Replace anonymous inner class with lambda Result: Cleaner code that uses Java8 features
This commit is contained in:
parent
7b6336f1fd
commit
d08ecccd9a
@ -244,17 +244,14 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
} else {
|
||||
// Registration future is almost always fulfilled already, but just in case it's not.
|
||||
final ChannelPromise promise = channel.newPromise();
|
||||
regFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
Throwable cause = future.cause();
|
||||
if (cause != null) {
|
||||
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
|
||||
// IllegalStateException once we try to access the EventLoop of the Channel.
|
||||
promise.setFailure(cause);
|
||||
} else {
|
||||
doBind0(regFuture, channel, localAddress, promise);
|
||||
}
|
||||
regFuture.addListener((ChannelFutureListener) future -> {
|
||||
Throwable cause = future.cause();
|
||||
if (cause != null) {
|
||||
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
|
||||
// IllegalStateException once we try to access the EventLoop of the Channel.
|
||||
promise.setFailure(cause);
|
||||
} else {
|
||||
doBind0(regFuture, channel, localAddress, promise);
|
||||
}
|
||||
});
|
||||
return promise;
|
||||
@ -271,22 +268,14 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
}
|
||||
|
||||
final ChannelPromise promise = channel.newPromise();
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
init(channel).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isSuccess()) {
|
||||
channel.register(promise);
|
||||
} else {
|
||||
channel.unsafe().closeForcibly();
|
||||
promise.setFailure(future.cause());
|
||||
}
|
||||
}
|
||||
});
|
||||
loop.execute(() -> init(channel).addListener((ChannelFutureListener) future -> {
|
||||
if (future.isSuccess()) {
|
||||
channel.register(promise);
|
||||
} else {
|
||||
channel.unsafe().closeForcibly();
|
||||
promise.setFailure(future.cause());
|
||||
}
|
||||
});
|
||||
}));
|
||||
|
||||
return promise;
|
||||
}
|
||||
@ -301,14 +290,11 @@ public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C
|
||||
|
||||
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
|
||||
// the pipeline in its channelRegistered() implementation.
|
||||
channel.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (regFuture.isSuccess()) {
|
||||
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
} else {
|
||||
promise.setFailure(regFuture.cause());
|
||||
}
|
||||
channel.eventLoop().execute(() -> {
|
||||
if (regFuture.isSuccess()) {
|
||||
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
} else {
|
||||
promise.setFailure(regFuture.cause());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -206,19 +206,16 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel, ChannelFact
|
||||
} else {
|
||||
// Registration future is almost always fulfilled already, but just in case it's not.
|
||||
final ChannelPromise promise = channel.newPromise();
|
||||
regFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
|
||||
// failure.
|
||||
Throwable cause = future.cause();
|
||||
if (cause != null) {
|
||||
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
|
||||
// IllegalStateException once we try to access the EventLoop of the Channel.
|
||||
promise.setFailure(cause);
|
||||
} else {
|
||||
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
|
||||
}
|
||||
regFuture.addListener((ChannelFutureListener) future -> {
|
||||
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
|
||||
// failure.
|
||||
Throwable cause = future.cause();
|
||||
if (cause != null) {
|
||||
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
|
||||
// IllegalStateException once we try to access the EventLoop of the Channel.
|
||||
promise.setFailure(cause);
|
||||
} else {
|
||||
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
|
||||
}
|
||||
});
|
||||
return promise;
|
||||
@ -254,15 +251,12 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel, ChannelFact
|
||||
}
|
||||
|
||||
// Wait until the name resolution is finished.
|
||||
resolveFuture.addListener(new FutureListener<SocketAddress>() {
|
||||
@Override
|
||||
public void operationComplete(Future<SocketAddress> future) throws Exception {
|
||||
if (future.cause() != null) {
|
||||
channel.close();
|
||||
promise.setFailure(future.cause());
|
||||
} else {
|
||||
doConnect(future.getNow(), localAddress, promise);
|
||||
}
|
||||
resolveFuture.addListener((FutureListener<SocketAddress>) future -> {
|
||||
if (future.cause() != null) {
|
||||
channel.close();
|
||||
promise.setFailure(future.cause());
|
||||
} else {
|
||||
doConnect(future.getNow(), localAddress, promise);
|
||||
}
|
||||
});
|
||||
} catch (Throwable cause) {
|
||||
@ -277,16 +271,13 @@ public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel, ChannelFact
|
||||
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
|
||||
// the pipeline in its channelRegistered() implementation.
|
||||
final Channel channel = connectPromise.channel();
|
||||
channel.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (localAddress == null) {
|
||||
channel.connect(remoteAddress, connectPromise);
|
||||
} else {
|
||||
channel.connect(remoteAddress, localAddress, connectPromise);
|
||||
}
|
||||
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
channel.eventLoop().execute(() -> {
|
||||
if (localAddress == null) {
|
||||
channel.connect(remoteAddress, connectPromise);
|
||||
} else {
|
||||
channel.connect(remoteAddress, localAddress, connectPromise);
|
||||
}
|
||||
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -214,13 +214,10 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
pipeline.addLast(handler);
|
||||
}
|
||||
|
||||
ch.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.addLast(new ServerBootstrapAcceptor(
|
||||
ch, currentChildHandler, currentChildOptions, currentChildAttrs));
|
||||
promise.setSuccess();
|
||||
}
|
||||
ch.eventLoop().execute(() -> {
|
||||
pipeline.addLast(new ServerBootstrapAcceptor(
|
||||
ch, currentChildHandler, currentChildOptions, currentChildAttrs));
|
||||
promise.setSuccess();
|
||||
});
|
||||
}
|
||||
});
|
||||
@ -277,12 +274,7 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
// not be able to load the class because of the file limit it already reached.
|
||||
//
|
||||
// See https://github.com/netty/netty/issues/1328
|
||||
enableAutoReadTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
channel.config().setAutoRead(true);
|
||||
}
|
||||
};
|
||||
enableAutoReadTask = () -> channel.config().setAutoRead(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -295,12 +287,7 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
initChild(child);
|
||||
} else {
|
||||
try {
|
||||
childEventLoop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
initChild(child);
|
||||
}
|
||||
});
|
||||
childEventLoop.execute(() -> initChild(child));
|
||||
} catch (Throwable cause) {
|
||||
forceClose(child, cause);
|
||||
}
|
||||
@ -319,12 +306,9 @@ public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerCh
|
||||
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
|
||||
}
|
||||
|
||||
child.register().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
forceClose(child, future.cause());
|
||||
}
|
||||
child.register().addListener((ChannelFutureListener) future -> {
|
||||
if (!future.isSuccess()) {
|
||||
forceClose(child, future.cause());
|
||||
}
|
||||
});
|
||||
} catch (Throwable t) {
|
||||
|
@ -550,12 +550,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
}
|
||||
|
||||
if (!wasActive && isActive()) {
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelActive();
|
||||
readIfIsAutoRead();
|
||||
}
|
||||
invokeLater(() -> {
|
||||
pipeline.fireChannelActive();
|
||||
readIfIsAutoRead();
|
||||
});
|
||||
}
|
||||
|
||||
@ -580,12 +577,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
}
|
||||
|
||||
if (wasActive && !isActive()) {
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
});
|
||||
invokeLater(pipeline::fireChannelInactive);
|
||||
}
|
||||
|
||||
safeSetSuccess(promise);
|
||||
@ -631,24 +623,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
new ChannelOutputShutdownException("Channel output shutdown", cause);
|
||||
Executor closeExecutor = prepareToClose();
|
||||
if (closeExecutor != null) {
|
||||
closeExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// Execute the shutdown.
|
||||
doShutdownOutput();
|
||||
promise.setSuccess();
|
||||
} catch (Throwable err) {
|
||||
promise.setFailure(err);
|
||||
} finally {
|
||||
// Dispatch to the EventLoop
|
||||
eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
|
||||
}
|
||||
});
|
||||
}
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
// Execute the shutdown.
|
||||
doShutdownOutput();
|
||||
promise.setSuccess();
|
||||
} catch (Throwable err) {
|
||||
promise.setFailure(err);
|
||||
} finally {
|
||||
// Dispatch to the EventLoop
|
||||
eventLoop().execute(() ->
|
||||
closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
@ -683,12 +668,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
safeSetSuccess(promise);
|
||||
} else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
|
||||
// This means close() was called before so we just register a listener and return
|
||||
closeFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
promise.setSuccess();
|
||||
}
|
||||
});
|
||||
closeFuture.addListener((ChannelFutureListener) future -> promise.setSuccess());
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -700,26 +680,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
|
||||
Executor closeExecutor = prepareToClose();
|
||||
if (closeExecutor != null) {
|
||||
closeExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// Execute the close.
|
||||
doClose0(promise);
|
||||
} finally {
|
||||
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (outboundBuffer != null) {
|
||||
// Fail all the queued messages
|
||||
outboundBuffer.failFlushed(cause, notify);
|
||||
outboundBuffer.close(closeCause);
|
||||
}
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
});
|
||||
}
|
||||
closeExecutor.execute(() -> {
|
||||
try {
|
||||
// Execute the close.
|
||||
doClose0(promise);
|
||||
} finally {
|
||||
// Call invokeLater so closeAndDeregister is executed in the EventLoop again!
|
||||
invokeLater(() -> {
|
||||
if (outboundBuffer != null) {
|
||||
// Fail all the queued messages
|
||||
outboundBuffer.failFlushed(cause, notify);
|
||||
outboundBuffer.close(closeCause);
|
||||
}
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
});
|
||||
}
|
||||
});
|
||||
} else {
|
||||
@ -734,12 +708,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
}
|
||||
}
|
||||
if (inFlush0) {
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
});
|
||||
invokeLater(() -> fireChannelInactiveAndDeregister(wasActive));
|
||||
} else {
|
||||
fireChannelInactiveAndDeregister(wasActive);
|
||||
}
|
||||
@ -796,27 +765,24 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
//
|
||||
// See:
|
||||
// https://github.com/netty/netty/issues/4435
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
doDeregister();
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
|
||||
} finally {
|
||||
if (fireChannelInactive) {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
// Some transports like local and AIO does not allow the deregistration of
|
||||
// an open channel. Their doDeregister() calls close(). Consequently,
|
||||
// close() calls deregister() again - no need to fire channelUnregistered, so check
|
||||
// if it was registered.
|
||||
if (registered) {
|
||||
registered = false;
|
||||
pipeline.fireChannelUnregistered();
|
||||
}
|
||||
safeSetSuccess(promise);
|
||||
invokeLater(() -> {
|
||||
try {
|
||||
doDeregister();
|
||||
} catch (Throwable t) {
|
||||
logger.warn("Unexpected exception occurred while deregistering a channel.", t);
|
||||
} finally {
|
||||
if (fireChannelInactive) {
|
||||
pipeline.fireChannelInactive();
|
||||
}
|
||||
// Some transports like local and AIO does not allow the deregistration of
|
||||
// an open channel. Their doDeregister() calls close(). Consequently,
|
||||
// close() calls deregister() again - no need to fire channelUnregistered, so check
|
||||
// if it was registered.
|
||||
if (registered) {
|
||||
registered = false;
|
||||
pipeline.fireChannelUnregistered();
|
||||
}
|
||||
safeSetSuccess(promise);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -832,12 +798,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha
|
||||
try {
|
||||
doBeginRead();
|
||||
} catch (final Exception e) {
|
||||
invokeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireExceptionCaught(e);
|
||||
}
|
||||
});
|
||||
invokeLater(() -> pipeline.fireExceptionCaught(e));
|
||||
close(voidPromise());
|
||||
}
|
||||
}
|
||||
|
@ -232,12 +232,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
private static boolean isSkippable(
|
||||
final Class<?> handlerType, final String methodName, final Class<?>... paramTypes) throws Exception {
|
||||
|
||||
return AccessController.doPrivileged(new PrivilegedExceptionAction<Boolean>() {
|
||||
@Override
|
||||
public Boolean run() throws Exception {
|
||||
return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(ChannelHandler.Skip.class);
|
||||
}
|
||||
});
|
||||
return AccessController.doPrivileged((PrivilegedExceptionAction<Boolean>) () ->
|
||||
handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(ChannelHandler.Skip.class));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -280,12 +276,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelRegistered();
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelRegistered();
|
||||
}
|
||||
});
|
||||
executor.execute(next::invokeChannelRegistered);
|
||||
}
|
||||
}
|
||||
|
||||
@ -312,12 +303,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelUnregistered();
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelUnregistered();
|
||||
}
|
||||
});
|
||||
executor.execute(next::invokeChannelUnregistered);
|
||||
}
|
||||
}
|
||||
|
||||
@ -344,12 +330,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelActive();
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelActive();
|
||||
}
|
||||
});
|
||||
executor.execute(next::invokeChannelActive);
|
||||
}
|
||||
}
|
||||
|
||||
@ -376,12 +357,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelInactive();
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelInactive();
|
||||
}
|
||||
});
|
||||
executor.execute(next::invokeChannelInactive);
|
||||
}
|
||||
}
|
||||
|
||||
@ -410,12 +386,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
next.invokeExceptionCaught(cause);
|
||||
} else {
|
||||
try {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeExceptionCaught(cause);
|
||||
}
|
||||
});
|
||||
executor.execute(() -> next.invokeExceptionCaught(cause));
|
||||
} catch (Throwable t) {
|
||||
if (logger.isWarnEnabled()) {
|
||||
logger.warn("Failed to submit an exceptionCaught() event.", t);
|
||||
@ -460,12 +431,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeUserEventTriggered(event);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeUserEventTriggered(event);
|
||||
}
|
||||
});
|
||||
executor.execute(() -> next.invokeUserEventTriggered(event));
|
||||
}
|
||||
}
|
||||
|
||||
@ -493,12 +459,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeChannelRead(m);
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelRead(m);
|
||||
}
|
||||
});
|
||||
executor.execute(() -> next.invokeChannelRead(m));
|
||||
}
|
||||
}
|
||||
|
||||
@ -527,12 +488,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
} else {
|
||||
Runnable task = next.invokeChannelReadCompleteTask;
|
||||
if (task == null) {
|
||||
next.invokeChannelReadCompleteTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelReadComplete();
|
||||
}
|
||||
};
|
||||
next.invokeChannelReadCompleteTask = task = next::invokeChannelReadComplete;
|
||||
}
|
||||
executor.execute(task);
|
||||
}
|
||||
@ -563,12 +519,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
} else {
|
||||
Runnable task = next.invokeChannelWritableStateChangedTask;
|
||||
if (task == null) {
|
||||
next.invokeChannelWritableStateChangedTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeChannelWritabilityChanged();
|
||||
}
|
||||
};
|
||||
next.invokeChannelWritableStateChangedTask = task = next::invokeChannelWritabilityChanged;
|
||||
}
|
||||
executor.execute(task);
|
||||
}
|
||||
@ -636,12 +587,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeBind(localAddress, promise);
|
||||
} else {
|
||||
safeExecute(executor, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeBind(localAddress, promise);
|
||||
}
|
||||
}, promise, null);
|
||||
safeExecute(executor, () -> next.invokeBind(localAddress, promise), promise, null);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
@ -680,12 +626,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||
} else {
|
||||
safeExecute(executor, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeConnect(remoteAddress, localAddress, promise);
|
||||
}
|
||||
}, promise, null);
|
||||
safeExecute(executor, () -> next.invokeConnect(remoteAddress, localAddress, promise), promise, null);
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
@ -720,14 +661,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
next.invokeDisconnect(promise);
|
||||
}
|
||||
} else {
|
||||
safeExecute(executor, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (!channel().metadata().hasDisconnect()) {
|
||||
next.invokeClose(promise);
|
||||
} else {
|
||||
next.invokeDisconnect(promise);
|
||||
}
|
||||
safeExecute(executor, () -> {
|
||||
if (!channel().metadata().hasDisconnect()) {
|
||||
next.invokeClose(promise);
|
||||
} else {
|
||||
next.invokeDisconnect(promise);
|
||||
}
|
||||
}, promise, null);
|
||||
}
|
||||
@ -758,12 +696,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeClose(promise);
|
||||
} else {
|
||||
safeExecute(executor, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeClose(promise);
|
||||
}
|
||||
}, promise, null);
|
||||
safeExecute(executor, () -> next.invokeClose(promise), promise, null);
|
||||
}
|
||||
|
||||
return promise;
|
||||
@ -793,12 +726,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeRegister(promise);
|
||||
} else {
|
||||
safeExecute(executor, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeRegister(promise);
|
||||
}
|
||||
}, promise, null);
|
||||
safeExecute(executor, () -> next.invokeRegister(promise), promise, null);
|
||||
}
|
||||
|
||||
return promise;
|
||||
@ -828,12 +756,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
if (executor.inEventLoop()) {
|
||||
next.invokeDeregister(promise);
|
||||
} else {
|
||||
safeExecute(executor, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeDeregister(promise);
|
||||
}
|
||||
}, promise, null);
|
||||
safeExecute(executor, () -> next.invokeDeregister(promise), promise, null);
|
||||
}
|
||||
|
||||
return promise;
|
||||
@ -860,12 +783,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
} else {
|
||||
Runnable task = next.invokeReadTask;
|
||||
if (task == null) {
|
||||
next.invokeReadTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeRead();
|
||||
}
|
||||
};
|
||||
next.invokeReadTask = task = next::invokeRead;
|
||||
}
|
||||
executor.execute(task);
|
||||
}
|
||||
@ -922,12 +840,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
|
||||
} else {
|
||||
Runnable task = next.invokeFlushTask;
|
||||
if (task == null) {
|
||||
next.invokeFlushTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
next.invokeFlush();
|
||||
}
|
||||
};
|
||||
next.invokeFlushTask = task = next::invokeFlush;
|
||||
}
|
||||
safeExecute(executor, task, channel().voidPromise(), null);
|
||||
}
|
||||
|
@ -38,23 +38,15 @@ public interface ChannelFutureListener extends GenericFutureListener<ChannelFutu
|
||||
* A {@link ChannelFutureListener} that closes the {@link Channel} which is
|
||||
* associated with the specified {@link ChannelFuture}.
|
||||
*/
|
||||
ChannelFutureListener CLOSE = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
future.channel().close();
|
||||
}
|
||||
};
|
||||
ChannelFutureListener CLOSE = future -> future.channel().close();
|
||||
|
||||
/**
|
||||
* A {@link ChannelFutureListener} that closes the {@link Channel} when the
|
||||
* operation ended up with a failure or cancellation rather than a success.
|
||||
*/
|
||||
ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (!future.isSuccess()) {
|
||||
future.channel().close();
|
||||
}
|
||||
ChannelFutureListener CLOSE_ON_FAILURE = future -> {
|
||||
if (!future.isSuccess()) {
|
||||
future.channel().close();
|
||||
}
|
||||
};
|
||||
|
||||
@ -62,12 +54,9 @@ public interface ChannelFutureListener extends GenericFutureListener<ChannelFutu
|
||||
* A {@link ChannelFutureListener} that forwards the {@link Throwable} of the {@link ChannelFuture} into the
|
||||
* {@link ChannelPipeline}. This mimics the old behavior of Netty 3.
|
||||
*/
|
||||
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (!future.isSuccess()) {
|
||||
future.channel().pipeline().fireExceptionCaught(future.cause());
|
||||
}
|
||||
ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = future -> {
|
||||
if (!future.isSuccess()) {
|
||||
future.channel().pipeline().fireExceptionCaught(future.cause());
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -602,12 +602,7 @@ public final class ChannelOutboundBuffer {
|
||||
if (invokeLater) {
|
||||
Runnable task = fireChannelWritabilityChangedTask;
|
||||
if (task == null) {
|
||||
fireChannelWritabilityChangedTask = task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.fireChannelWritabilityChanged();
|
||||
}
|
||||
};
|
||||
fireChannelWritabilityChangedTask = task = pipeline::fireChannelWritabilityChanged;
|
||||
}
|
||||
channel.eventLoop().execute(task);
|
||||
} else {
|
||||
@ -654,12 +649,7 @@ public final class ChannelOutboundBuffer {
|
||||
|
||||
void close(final Throwable cause, final boolean allowChannelOpen) {
|
||||
if (inFail) {
|
||||
channel.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
close(cause, allowChannelOpen);
|
||||
}
|
||||
});
|
||||
channel.eventLoop().execute(() -> close(cause, allowChannelOpen));
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -614,12 +614,7 @@ public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O ext
|
||||
if (executor.inEventLoop()) {
|
||||
remove0();
|
||||
} else {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
remove0();
|
||||
}
|
||||
});
|
||||
executor.execute(this::remove0);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -123,12 +123,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
EventExecutor ctxExecutor = newCtx.executor();
|
||||
if (!ctxExecutor.inEventLoop()) {
|
||||
newCtx.setAddPending();
|
||||
ctxExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
ctxExecutor.execute(() -> callHandlerAdded0(newCtx));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@ -162,12 +157,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
EventExecutor ctxExecutor = newCtx.executor();
|
||||
if (!ctxExecutor.inEventLoop()) {
|
||||
newCtx.setAddPending();
|
||||
ctxExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
ctxExecutor.execute(() -> callHandlerAdded0(newCtx));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@ -205,12 +195,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
EventExecutor ctxExecutor = newCtx.executor();
|
||||
if (!ctxExecutor.inEventLoop()) {
|
||||
newCtx.setAddPending();
|
||||
ctxExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
ctxExecutor.execute(() -> callHandlerAdded0(newCtx));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@ -256,12 +241,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
EventExecutor ctxExecutor = newCtx.executor();
|
||||
if (!ctxExecutor.inEventLoop()) {
|
||||
newCtx.setAddPending();
|
||||
ctxExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerAdded0(newCtx);
|
||||
}
|
||||
});
|
||||
ctxExecutor.execute(() -> callHandlerAdded0(newCtx));
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@ -407,12 +387,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
EventExecutor executor = ctx.executor();
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
});
|
||||
executor.execute(() -> callHandlerRemoved0(ctx));
|
||||
return ctx;
|
||||
}
|
||||
}
|
||||
@ -483,15 +458,12 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
|
||||
EventExecutor executor = ctx.executor();
|
||||
if (!executor.inEventLoop()) {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
|
||||
// those event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
}
|
||||
executor.execute(() -> {
|
||||
// Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked)
|
||||
// because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and
|
||||
// those event handlers must be called after handlerAdded().
|
||||
callHandlerAdded0(newCtx);
|
||||
callHandlerRemoved0(ctx);
|
||||
});
|
||||
return ctx.handler();
|
||||
}
|
||||
@ -776,12 +748,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
final EventExecutor executor = ctx.executor();
|
||||
if (!inEventLoop && !executor.inEventLoop(currentThread)) {
|
||||
final AbstractChannelHandlerContext finalCtx = ctx;
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
destroyUp(finalCtx, true);
|
||||
}
|
||||
});
|
||||
executor.execute(() -> destroyUp(finalCtx, true));
|
||||
break;
|
||||
}
|
||||
|
||||
@ -806,12 +773,7 @@ public class DefaultChannelPipeline implements ChannelPipeline {
|
||||
callHandlerRemoved0(ctx);
|
||||
} else {
|
||||
final AbstractChannelHandlerContext finalCtx = ctx;
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
destroyDown(Thread.currentThread(), finalCtx, true);
|
||||
}
|
||||
});
|
||||
executor.execute(() -> destroyDown(Thread.currentThread(), finalCtx, true));
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -40,13 +40,10 @@ public final class VoidChannelPromise extends AbstractFuture<Void> implements Ch
|
||||
}
|
||||
this.channel = channel;
|
||||
if (fireException) {
|
||||
fireExceptionListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
Throwable cause = future.cause();
|
||||
if (cause != null) {
|
||||
fireException0(cause);
|
||||
}
|
||||
fireExceptionListener = future -> {
|
||||
Throwable cause = future.cause();
|
||||
if (cause != null) {
|
||||
fireException0(cause);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
|
@ -59,12 +59,7 @@ public class EmbeddedChannel extends AbstractChannel {
|
||||
private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false);
|
||||
private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true);
|
||||
|
||||
private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
recordException(future);
|
||||
}
|
||||
};
|
||||
private final ChannelFutureListener recordExceptionListener = this::recordException;
|
||||
|
||||
private final ChannelMetadata metadata;
|
||||
private final ChannelConfig config;
|
||||
|
@ -23,12 +23,7 @@ import io.netty.channel.ServerChannel;
|
||||
*/
|
||||
public final class ChannelMatchers {
|
||||
|
||||
private static final ChannelMatcher ALL_MATCHER = new ChannelMatcher() {
|
||||
@Override
|
||||
public boolean matches(Channel channel) {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
private static final ChannelMatcher ALL_MATCHER = channel -> true;
|
||||
|
||||
private static final ChannelMatcher SERVER_CHANNEL_MATCHER = isInstanceOf(ServerChannel.class);
|
||||
private static final ChannelMatcher NON_SERVER_CHANNEL_MATCHER = isNotInstanceOf(ServerChannel.class);
|
||||
|
@ -46,12 +46,7 @@ public class DefaultChannelGroup extends AbstractSet<Channel> implements Channel
|
||||
private final EventExecutor executor;
|
||||
private final ConcurrentMap<ChannelId, Channel> serverChannels = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<ChannelId, Channel> nonServerChannels = new ConcurrentHashMap<>();
|
||||
private final ChannelFutureListener remover = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
remove(future.channel());
|
||||
}
|
||||
};
|
||||
private final ChannelFutureListener remover = future -> remove(future.channel());
|
||||
private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this);
|
||||
private final boolean stayClosed;
|
||||
private volatile boolean closed;
|
||||
|
@ -63,13 +63,10 @@ public class LocalChannel extends AbstractChannel {
|
||||
private final ChannelConfig config = new DefaultChannelConfig(this);
|
||||
// To further optimize this we could write our own SPSC queue.
|
||||
final Queue<Object> inboundBuffer = PlatformDependent.newSpscQueue();
|
||||
private final Runnable readTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
|
||||
if (!inboundBuffer.isEmpty()) {
|
||||
readInbound();
|
||||
}
|
||||
private final Runnable readTask = () -> {
|
||||
// Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete()
|
||||
if (!inboundBuffer.isEmpty()) {
|
||||
readInbound();
|
||||
}
|
||||
};
|
||||
|
||||
@ -197,12 +194,7 @@ public class LocalChannel extends AbstractChannel {
|
||||
EventLoop peerEventLoop = peer.eventLoop();
|
||||
final boolean peerIsActive = peer.isActive();
|
||||
try {
|
||||
peerEventLoop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
peer.tryClose(peerIsActive);
|
||||
}
|
||||
});
|
||||
peerEventLoop.execute(() -> peer.tryClose(peerIsActive));
|
||||
} catch (Throwable cause) {
|
||||
logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!",
|
||||
this, peer, cause);
|
||||
@ -350,12 +342,7 @@ public class LocalChannel extends AbstractChannel {
|
||||
private void runFinishPeerReadTask(final LocalChannel peer) {
|
||||
// If the peer is writing, we must wait until after reads are completed for that peer before we can read. So
|
||||
// we keep track of the task, and coordinate later that our read can't happen until the peer is done.
|
||||
final Runnable finishPeerReadTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
finishPeerRead0(peer);
|
||||
}
|
||||
};
|
||||
final Runnable finishPeerReadTask = () -> finishPeerRead0(peer);
|
||||
try {
|
||||
if (peer.writeInProgress) {
|
||||
peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask);
|
||||
@ -470,17 +457,14 @@ public class LocalChannel extends AbstractChannel {
|
||||
// This ensures that if both channels are on the same event loop, the peer's channelActive
|
||||
// event is triggered *after* this channel's channelRegistered event, so that this channel's
|
||||
// pipeline is fully initialized by ChannelInitializer before any channelRead events.
|
||||
peer.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise promise = peer.connectPromise;
|
||||
peer.eventLoop().execute(() -> {
|
||||
ChannelPromise promise = peer.connectPromise;
|
||||
|
||||
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
|
||||
// connectPromise may be set to null if doClose() was called in the meantime.
|
||||
if (promise != null && promise.trySuccess()) {
|
||||
peer.pipeline().fireChannelActive();
|
||||
peer.readIfIsAutoRead();
|
||||
}
|
||||
// Only trigger fireChannelActive() if the promise was not null and was not completed yet.
|
||||
// connectPromise may be set to null if doClose() was called in the meantime.
|
||||
if (promise != null && promise.trySuccess()) {
|
||||
peer.pipeline().fireChannelActive();
|
||||
peer.readIfIsAutoRead();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -38,12 +38,7 @@ public final class LocalHandler implements IoHandler {
|
||||
* Returns a new {@link IoHandlerFactory} that creates {@link LocalHandler} instances.
|
||||
*/
|
||||
public static IoHandlerFactory newFactory() {
|
||||
return new IoHandlerFactory() {
|
||||
@Override
|
||||
public IoHandler newHandler() {
|
||||
return new LocalHandler();
|
||||
}
|
||||
};
|
||||
return LocalHandler::new;
|
||||
}
|
||||
|
||||
private static LocalChannelUnsafe cast(Channel channel) {
|
||||
|
@ -115,12 +115,7 @@ public class LocalServerChannel extends AbstractServerChannel {
|
||||
if (eventLoop().inEventLoop()) {
|
||||
serve0(child);
|
||||
} else {
|
||||
eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
serve0(child);
|
||||
}
|
||||
});
|
||||
eventLoop().execute(() -> serve0(child));
|
||||
}
|
||||
return child;
|
||||
}
|
||||
|
@ -47,13 +47,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel {
|
||||
" (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
|
||||
StringUtil.simpleClassName(FileRegion.class) + ')';
|
||||
|
||||
private final Runnable flushTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
|
||||
// meantime.
|
||||
((AbstractNioUnsafe) unsafe()).flush0();
|
||||
}
|
||||
private final Runnable flushTask = () -> {
|
||||
// Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
|
||||
// meantime.
|
||||
((AbstractNioUnsafe) unsafe()).flush0();
|
||||
};
|
||||
private boolean inputClosedSeenErrorOnRead;
|
||||
|
||||
|
@ -57,12 +57,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
protected final int readInterestOp;
|
||||
volatile SelectionKey selectionKey;
|
||||
boolean readPending;
|
||||
private final Runnable clearReadPendingRunnable = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
clearReadPending0();
|
||||
}
|
||||
};
|
||||
private final Runnable clearReadPendingRunnable = this::clearReadPending0;
|
||||
|
||||
/**
|
||||
* The future of the current connection attempt. If not null, subsequent
|
||||
@ -142,12 +137,7 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
if (eventLoop.inEventLoop()) {
|
||||
setReadPending0(readPending);
|
||||
} else {
|
||||
eventLoop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
setReadPending0(readPending);
|
||||
}
|
||||
});
|
||||
eventLoop.execute(() -> setReadPending0(readPending));
|
||||
}
|
||||
} else {
|
||||
// Best effort if we are not registered yet clear readPending.
|
||||
@ -255,29 +245,23 @@ public abstract class AbstractNioChannel extends AbstractChannel {
|
||||
// Schedule connect timeout.
|
||||
int connectTimeoutMillis = config().getConnectTimeoutMillis();
|
||||
if (connectTimeoutMillis > 0) {
|
||||
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
|
||||
ConnectTimeoutException cause =
|
||||
new ConnectTimeoutException("connection timed out: " + remoteAddress);
|
||||
if (connectPromise != null && connectPromise.tryFailure(cause)) {
|
||||
close(voidPromise());
|
||||
}
|
||||
connectTimeoutFuture = eventLoop().schedule(() -> {
|
||||
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
|
||||
ConnectTimeoutException cause =
|
||||
new ConnectTimeoutException("connection timed out: " + remoteAddress);
|
||||
if (connectPromise != null && connectPromise.tryFailure(cause)) {
|
||||
close(voidPromise());
|
||||
}
|
||||
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (future.isCancelled()) {
|
||||
if (connectTimeoutFuture != null) {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
}
|
||||
connectPromise = null;
|
||||
close(voidPromise());
|
||||
promise.addListener((ChannelFutureListener) future -> {
|
||||
if (future.isCancelled()) {
|
||||
if (connectTimeoutFuture != null) {
|
||||
connectTimeoutFuture.cancel(false);
|
||||
}
|
||||
connectPromise = null;
|
||||
close(voidPromise());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -67,12 +67,7 @@ public final class NioHandler implements IoHandler {
|
||||
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
|
||||
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
|
||||
|
||||
private final IntSupplier selectNowSupplier = new IntSupplier() {
|
||||
@Override
|
||||
public int get() throws Exception {
|
||||
return selectNow();
|
||||
}
|
||||
};
|
||||
private final IntSupplier selectNowSupplier = this::selectNow;
|
||||
|
||||
// Workaround for JDK NIO bug.
|
||||
//
|
||||
@ -84,12 +79,9 @@ public final class NioHandler implements IoHandler {
|
||||
final String bugLevel = SystemPropertyUtil.get(key);
|
||||
if (bugLevel == null) {
|
||||
try {
|
||||
AccessController.doPrivileged(new PrivilegedAction<Void>() {
|
||||
@Override
|
||||
public Void run() {
|
||||
System.setProperty(key, "");
|
||||
return null;
|
||||
}
|
||||
AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
|
||||
System.setProperty(key, "");
|
||||
return null;
|
||||
});
|
||||
} catch (final SecurityException e) {
|
||||
logger.debug("Unable to get/set System Property: " + key, e);
|
||||
@ -147,12 +139,7 @@ public final class NioHandler implements IoHandler {
|
||||
* Returns a new {@link IoHandlerFactory} that creates {@link NioHandler} instances.
|
||||
*/
|
||||
public static IoHandlerFactory newFactory() {
|
||||
return new IoHandlerFactory() {
|
||||
@Override
|
||||
public IoHandler newHandler() {
|
||||
return new NioHandler();
|
||||
}
|
||||
};
|
||||
return NioHandler::new;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -162,12 +149,7 @@ public final class NioHandler implements IoHandler {
|
||||
final SelectStrategyFactory selectStrategyFactory) {
|
||||
ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
|
||||
ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory");
|
||||
return new IoHandlerFactory() {
|
||||
@Override
|
||||
public IoHandler newHandler() {
|
||||
return new NioHandler(selectorProvider, selectStrategyFactory.newSelectStrategy());
|
||||
}
|
||||
};
|
||||
return () -> new NioHandler(selectorProvider, selectStrategyFactory.newSelectStrategy());
|
||||
}
|
||||
|
||||
private static final class SelectorTuple {
|
||||
@ -197,17 +179,14 @@ public final class NioHandler implements IoHandler {
|
||||
return new SelectorTuple(unwrappedSelector);
|
||||
}
|
||||
|
||||
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
|
||||
@Override
|
||||
public Object run() {
|
||||
try {
|
||||
return Class.forName(
|
||||
"sun.nio.ch.SelectorImpl",
|
||||
false,
|
||||
PlatformDependent.getSystemClassLoader());
|
||||
} catch (Throwable cause) {
|
||||
return cause;
|
||||
}
|
||||
Object maybeSelectorImplClass = AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
|
||||
try {
|
||||
return Class.forName(
|
||||
"sun.nio.ch.SelectorImpl",
|
||||
false,
|
||||
PlatformDependent.getSystemClassLoader());
|
||||
} catch (Throwable cause) {
|
||||
return cause;
|
||||
}
|
||||
});
|
||||
|
||||
@ -224,45 +203,42 @@ public final class NioHandler implements IoHandler {
|
||||
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
|
||||
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
|
||||
|
||||
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
|
||||
@Override
|
||||
public Object run() {
|
||||
try {
|
||||
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
|
||||
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
|
||||
Object maybeException = AccessController.doPrivileged((PrivilegedAction<Object>) () -> {
|
||||
try {
|
||||
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
|
||||
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
|
||||
|
||||
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
|
||||
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
|
||||
// This allows us to also do this in Java9+ without any extra flags.
|
||||
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
|
||||
long publicSelectedKeysFieldOffset =
|
||||
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
|
||||
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
|
||||
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
|
||||
// This allows us to also do this in Java9+ without any extra flags.
|
||||
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
|
||||
long publicSelectedKeysFieldOffset =
|
||||
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
|
||||
|
||||
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
|
||||
PlatformDependent.putObject(
|
||||
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
|
||||
PlatformDependent.putObject(
|
||||
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
|
||||
return null;
|
||||
}
|
||||
// We could not retrieve the offset, lets try reflection as last-resort.
|
||||
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
|
||||
PlatformDependent.putObject(
|
||||
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
|
||||
PlatformDependent.putObject(
|
||||
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
|
||||
return null;
|
||||
}
|
||||
|
||||
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
|
||||
if (cause != null) {
|
||||
return cause;
|
||||
}
|
||||
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
|
||||
if (cause != null) {
|
||||
return cause;
|
||||
}
|
||||
|
||||
selectedKeysField.set(unwrappedSelector, selectedKeySet);
|
||||
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
|
||||
return null;
|
||||
} catch (NoSuchFieldException | IllegalAccessException e) {
|
||||
return e;
|
||||
// We could not retrieve the offset, lets try reflection as last-resort.
|
||||
}
|
||||
|
||||
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
|
||||
if (cause != null) {
|
||||
return cause;
|
||||
}
|
||||
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
|
||||
if (cause != null) {
|
||||
return cause;
|
||||
}
|
||||
|
||||
selectedKeysField.set(unwrappedSelector, selectedKeySet);
|
||||
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
|
||||
return null;
|
||||
} catch (NoSuchFieldException | IllegalAccessException e) {
|
||||
return e;
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -170,12 +170,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
if (loop.inEventLoop()) {
|
||||
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
|
||||
} else {
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
((AbstractUnsafe) unsafe()).shutdownOutput(promise);
|
||||
}
|
||||
});
|
||||
loop.execute(() -> ((AbstractUnsafe) unsafe()).shutdownOutput(promise));
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
@ -196,12 +191,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
if (loop.inEventLoop()) {
|
||||
shutdownInput0(promise);
|
||||
} else {
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
shutdownInput0(promise);
|
||||
}
|
||||
});
|
||||
loop.execute(() -> shutdownInput0(promise));
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
@ -217,12 +207,8 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
if (shutdownOutputFuture.isDone()) {
|
||||
shutdownOutputDone(shutdownOutputFuture, promise);
|
||||
} else {
|
||||
shutdownOutputFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
|
||||
shutdownOutputDone(shutdownOutputFuture, promise);
|
||||
}
|
||||
});
|
||||
shutdownOutputFuture.addListener((ChannelFutureListener) shutdownOutputFuture1 ->
|
||||
shutdownOutputDone(shutdownOutputFuture1, promise));
|
||||
}
|
||||
return promise;
|
||||
}
|
||||
@ -232,12 +218,8 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty
|
||||
if (shutdownInputFuture.isDone()) {
|
||||
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
|
||||
} else {
|
||||
shutdownInputFuture.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
|
||||
shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
|
||||
}
|
||||
});
|
||||
shutdownInputFuture.addListener((ChannelFutureListener) shutdownInputFuture1 ->
|
||||
shutdownDone(shutdownOutputFuture, shutdownInputFuture1, promise));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -16,8 +16,6 @@
|
||||
|
||||
package io.netty.bootstrap;
|
||||
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFactory;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
@ -26,18 +24,15 @@ import io.netty.channel.ChannelInboundHandler;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelOutboundHandlerAdapter;
|
||||
import io.netty.channel.ChannelPromise;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.EventLoop;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.ServerChannel;
|
||||
import io.netty.channel.ServerChannelFactory;
|
||||
import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.local.LocalAddress;
|
||||
import io.netty.channel.local.LocalChannel;
|
||||
import io.netty.channel.local.LocalHandler;
|
||||
import io.netty.channel.local.LocalServerChannel;
|
||||
import io.netty.resolver.AbstractAddressResolver;
|
||||
import io.netty.resolver.AddressResolver;
|
||||
import io.netty.resolver.AddressResolverGroup;
|
||||
import io.netty.resolver.AbstractAddressResolver;
|
||||
import io.netty.util.concurrent.EventExecutor;
|
||||
import io.netty.util.concurrent.Future;
|
||||
import io.netty.util.concurrent.Promise;
|
||||
@ -55,8 +50,14 @@ import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import static org.hamcrest.Matchers.*;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
public class BootstrapTest {
|
||||
|
||||
@ -88,18 +89,12 @@ public class BootstrapTest {
|
||||
|
||||
// Try to bind from each other.
|
||||
for (int i = 0; i < 1024; i ++) {
|
||||
bindFutures.add(groupA.next().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
bootstrapB.bind(LocalAddress.ANY);
|
||||
}
|
||||
bindFutures.add(groupA.next().submit(() -> {
|
||||
bootstrapB.bind(LocalAddress.ANY);
|
||||
}));
|
||||
|
||||
bindFutures.add(groupB.next().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
bootstrapA.bind(LocalAddress.ANY);
|
||||
}
|
||||
bindFutures.add(groupB.next().submit(() -> {
|
||||
bootstrapA.bind(LocalAddress.ANY);
|
||||
}));
|
||||
}
|
||||
|
||||
@ -124,18 +119,12 @@ public class BootstrapTest {
|
||||
|
||||
// Try to connect from each other.
|
||||
for (int i = 0; i < 1024; i ++) {
|
||||
bindFutures.add(groupA.next().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
bootstrapB.connect(LocalAddress.ANY);
|
||||
}
|
||||
bindFutures.add(groupA.next().submit(() -> {
|
||||
bootstrapB.connect(LocalAddress.ANY);
|
||||
}));
|
||||
|
||||
bindFutures.add(groupB.next().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
bootstrapA.connect(LocalAddress.ANY);
|
||||
}
|
||||
bindFutures.add(groupB.next().submit(() -> {
|
||||
bootstrapA.connect(LocalAddress.ANY);
|
||||
}));
|
||||
}
|
||||
|
||||
@ -159,12 +148,9 @@ public class BootstrapTest {
|
||||
assertFalse(future.isDone());
|
||||
registerHandler.registerPromise().setSuccess();
|
||||
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||
queue.add(future.isSuccess());
|
||||
}
|
||||
future.addListener((ChannelFutureListener) future1 -> {
|
||||
queue.add(future1.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||
queue.add(future1.isSuccess());
|
||||
});
|
||||
assertTrue(queue.take());
|
||||
assertTrue(queue.take());
|
||||
@ -181,10 +167,8 @@ public class BootstrapTest {
|
||||
try {
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(group);
|
||||
bootstrap.channelFactory(new ServerChannelFactory<ServerChannel>() {
|
||||
@Override
|
||||
public ServerChannel newChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) {
|
||||
return new LocalServerChannel(eventLoop, childEventLoopGroup) {
|
||||
bootstrap.channelFactory((eventLoop, childEventLoopGroup) ->
|
||||
new LocalServerChannel(eventLoop, childEventLoopGroup) {
|
||||
@Override
|
||||
public ChannelFuture bind(SocketAddress localAddress) {
|
||||
// Close the Channel to emulate what NIO and others impl do on bind failure
|
||||
@ -200,9 +184,7 @@ public class BootstrapTest {
|
||||
close();
|
||||
return promise.setFailure(new SocketException());
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
});
|
||||
bootstrap.childHandler(new DummyHandler());
|
||||
bootstrap.handler(registerHandler);
|
||||
bootstrap.localAddress(new LocalAddress("1"));
|
||||
@ -210,12 +192,9 @@ public class BootstrapTest {
|
||||
assertFalse(future.isDone());
|
||||
registerHandler.registerPromise().setSuccess();
|
||||
final BlockingQueue<Boolean> queue = new LinkedBlockingQueue<>();
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||
queue.add(future.isSuccess());
|
||||
}
|
||||
future.addListener((ChannelFutureListener) future1 -> {
|
||||
queue.add(future1.channel().eventLoop().inEventLoop(Thread.currentThread()));
|
||||
queue.add(future1.isSuccess());
|
||||
});
|
||||
assertTrue(queue.take());
|
||||
assertFalse(queue.take());
|
||||
@ -291,12 +270,9 @@ public class BootstrapTest {
|
||||
final Bootstrap bootstrap = new Bootstrap()
|
||||
.handler(dummyHandler)
|
||||
.group(groupA)
|
||||
.channelFactory(new ChannelFactory<Channel>() {
|
||||
@Override
|
||||
public Channel newChannel(EventLoop eventLoop) {
|
||||
throw exception;
|
||||
}
|
||||
});
|
||||
.channelFactory(eventLoop -> {
|
||||
throw exception;
|
||||
});
|
||||
|
||||
ChannelFuture connectFuture = bootstrap.connect(LocalAddress.ANY);
|
||||
|
||||
@ -316,12 +292,9 @@ public class BootstrapTest {
|
||||
registerPromise = promise;
|
||||
latch.countDown();
|
||||
ChannelPromise newPromise = ctx.newPromise();
|
||||
newPromise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess()) {
|
||||
registerPromise.tryFailure(future.cause());
|
||||
}
|
||||
newPromise.addListener((ChannelFutureListener) future -> {
|
||||
if (!future.isSuccess()) {
|
||||
registerPromise.tryFailure(future.cause());
|
||||
}
|
||||
});
|
||||
super.register(ctx, newPromise);
|
||||
@ -356,14 +329,11 @@ public class BootstrapTest {
|
||||
@Override
|
||||
protected void doResolve(
|
||||
final SocketAddress unresolvedAddress, final Promise<SocketAddress> promise) {
|
||||
executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (success) {
|
||||
promise.setSuccess(unresolvedAddress);
|
||||
} else {
|
||||
promise.setFailure(new UnknownHostException(unresolvedAddress.toString()));
|
||||
}
|
||||
executor().execute(() -> {
|
||||
if (success) {
|
||||
promise.setSuccess(unresolvedAddress);
|
||||
} else {
|
||||
promise.setFailure(new UnknownHostException(unresolvedAddress.toString()));
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -372,14 +342,11 @@ public class BootstrapTest {
|
||||
protected void doResolveAll(
|
||||
final SocketAddress unresolvedAddress, final Promise<List<SocketAddress>> promise)
|
||||
throws Exception {
|
||||
executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
if (success) {
|
||||
promise.setSuccess(Collections.singletonList(unresolvedAddress));
|
||||
} else {
|
||||
promise.setFailure(new UnknownHostException(unresolvedAddress.toString()));
|
||||
}
|
||||
executor().execute(() -> {
|
||||
if (success) {
|
||||
promise.setSuccess(Collections.singletonList(unresolvedAddress));
|
||||
} else {
|
||||
promise.setFailure(new UnknownHostException(unresolvedAddress.toString()));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -51,12 +51,9 @@ public class AbstractChannelTest {
|
||||
when(eventLoop.inEventLoop()).thenReturn(true);
|
||||
when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class));
|
||||
|
||||
doAnswer(new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
|
||||
((Runnable) invocationOnMock.getArgument(0)).run();
|
||||
return null;
|
||||
}
|
||||
doAnswer(invocationOnMock -> {
|
||||
((Runnable) invocationOnMock.getArgument(0)).run();
|
||||
return null;
|
||||
}).when(eventLoop).execute(any(Runnable.class));
|
||||
|
||||
final TestChannel channel = new TestChannel(eventLoop);
|
||||
|
@ -146,11 +146,8 @@ public class ChannelInitializerTest {
|
||||
try {
|
||||
// Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the
|
||||
// pipeline.
|
||||
channel.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// NOOP
|
||||
}
|
||||
channel.eventLoop().submit(() -> {
|
||||
// NOOP
|
||||
}).syncUninterruptibly();
|
||||
Iterator<Map.Entry<String, ChannelHandler>> handlers = channel.pipeline().iterator();
|
||||
assertSame(handler1, handlers.next().getValue());
|
||||
@ -186,11 +183,8 @@ public class ChannelInitializerTest {
|
||||
try {
|
||||
// Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the
|
||||
// pipeline.
|
||||
channel.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// NOOP
|
||||
}
|
||||
channel.eventLoop().submit(() -> {
|
||||
// NOOP
|
||||
}).syncUninterruptibly();
|
||||
assertEquals(1, initChannelCalled.get());
|
||||
assertEquals(2, registeredCalled.get());
|
||||
|
@ -424,25 +424,19 @@ public class ChannelOutboundBufferTest {
|
||||
|
||||
final CountDownLatch executeLatch = new CountDownLatch(1);
|
||||
final CountDownLatch runLatch = new CountDownLatch(1);
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
runLatch.countDown();
|
||||
executeLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
runLatch.countDown();
|
||||
executeLatch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
});
|
||||
|
||||
runLatch.await();
|
||||
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Will not be executed but ensure the pending count is 1.
|
||||
}
|
||||
executor.execute(() -> {
|
||||
// Will not be executed but ensure the pending count is 1.
|
||||
});
|
||||
|
||||
assertEquals(1, executor.pendingTasks());
|
||||
|
@ -53,12 +53,9 @@ public class CoalescingBufferQueueTest {
|
||||
channel = new EmbeddedChannel();
|
||||
writeQueue = new CoalescingBufferQueue(channel, 16, true);
|
||||
catPromise = newPromise();
|
||||
mouseListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
mouseDone = true;
|
||||
mouseSuccess = future.isSuccess();
|
||||
}
|
||||
mouseListener = future -> {
|
||||
mouseDone = true;
|
||||
mouseSuccess = future.isSuccess();
|
||||
};
|
||||
emptyPromise = newPromise();
|
||||
voidPromise = channel.voidPromise();
|
||||
|
@ -368,17 +368,14 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
// Add handler.
|
||||
p.addFirst(handler.name, handler);
|
||||
self.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Validate handler life-cycle methods called.
|
||||
handler.validate(true, false);
|
||||
self.eventLoop().execute(() -> {
|
||||
// Validate handler life-cycle methods called.
|
||||
handler.validate(true, false);
|
||||
|
||||
// Store handler into the list.
|
||||
handlers.add(handler);
|
||||
// Store handler into the list.
|
||||
handlers.add(handler);
|
||||
|
||||
addLatch.countDown();
|
||||
}
|
||||
addLatch.countDown();
|
||||
});
|
||||
}
|
||||
addLatch.await();
|
||||
@ -391,13 +388,10 @@ public class DefaultChannelPipelineTest {
|
||||
for (final LifeCycleAwareTestHandler handler : handlers) {
|
||||
assertSame(handler, p.remove(handler.name));
|
||||
|
||||
self.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Validate handler life-cycle methods called.
|
||||
handler.validate(true, true);
|
||||
removeLatch.countDown();
|
||||
}
|
||||
self.eventLoop().execute(() -> {
|
||||
// Validate handler life-cycle methods called.
|
||||
handler.validate(true, true);
|
||||
removeLatch.countDown();
|
||||
});
|
||||
}
|
||||
removeLatch.await();
|
||||
@ -410,17 +404,14 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
setUp(handler1, handler2);
|
||||
|
||||
self.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler1.inboundBuffer.add(8);
|
||||
assertEquals(8, handler1.inboundBuffer.peek());
|
||||
assertTrue(handler2.inboundBuffer.isEmpty());
|
||||
p.remove(handler1);
|
||||
assertEquals(1, handler2.inboundBuffer.size());
|
||||
assertEquals(8, handler2.inboundBuffer.peek());
|
||||
}
|
||||
self.eventLoop().submit(() -> {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler1.inboundBuffer.add(8);
|
||||
assertEquals(8, handler1.inboundBuffer.peek());
|
||||
assertTrue(handler2.inboundBuffer.isEmpty());
|
||||
p.remove(handler1);
|
||||
assertEquals(1, handler2.inboundBuffer.size());
|
||||
assertEquals(8, handler2.inboundBuffer.peek());
|
||||
}).sync();
|
||||
}
|
||||
|
||||
@ -431,17 +422,14 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
setUp(handler1, handler2);
|
||||
|
||||
self.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler2.outboundBuffer.add(8);
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
assertTrue(handler1.outboundBuffer.isEmpty());
|
||||
p.remove(handler2);
|
||||
assertEquals(1, handler1.outboundBuffer.size());
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
}
|
||||
self.eventLoop().submit(() -> {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler2.outboundBuffer.add(8);
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
assertTrue(handler1.outboundBuffer.isEmpty());
|
||||
p.remove(handler2);
|
||||
assertEquals(1, handler1.outboundBuffer.size());
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
}).sync();
|
||||
}
|
||||
|
||||
@ -452,16 +440,13 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
setUp(handler1);
|
||||
|
||||
self.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler1.outboundBuffer.add(8);
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
assertTrue(handler2.outboundBuffer.isEmpty());
|
||||
p.replace(handler1, "handler2", handler2);
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
}
|
||||
self.eventLoop().submit(() -> {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler1.outboundBuffer.add(8);
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
assertTrue(handler2.outboundBuffer.isEmpty());
|
||||
p.replace(handler1, "handler2", handler2);
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
}).sync();
|
||||
}
|
||||
|
||||
@ -472,22 +457,19 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
setUp(handler1);
|
||||
|
||||
self.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler1.inboundBuffer.add(8);
|
||||
handler1.outboundBuffer.add(8);
|
||||
self.eventLoop().submit(() -> {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler1.inboundBuffer.add(8);
|
||||
handler1.outboundBuffer.add(8);
|
||||
|
||||
assertEquals(8, handler1.inboundBuffer.peek());
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
assertTrue(handler2.inboundBuffer.isEmpty());
|
||||
assertTrue(handler2.outboundBuffer.isEmpty());
|
||||
assertEquals(8, handler1.inboundBuffer.peek());
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
assertTrue(handler2.inboundBuffer.isEmpty());
|
||||
assertTrue(handler2.outboundBuffer.isEmpty());
|
||||
|
||||
p.replace(handler1, "handler2", handler2);
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
assertEquals(8, handler2.inboundBuffer.peek());
|
||||
}
|
||||
p.replace(handler1, "handler2", handler2);
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
assertEquals(8, handler2.inboundBuffer.peek());
|
||||
}).sync();
|
||||
}
|
||||
|
||||
@ -499,23 +481,20 @@ public class DefaultChannelPipelineTest {
|
||||
|
||||
setUp(handler1, handler2, handler3);
|
||||
|
||||
self.eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler2.inboundBuffer.add(8);
|
||||
handler2.outboundBuffer.add(8);
|
||||
self.eventLoop().submit(() -> {
|
||||
ChannelPipeline p = self.pipeline();
|
||||
handler2.inboundBuffer.add(8);
|
||||
handler2.outboundBuffer.add(8);
|
||||
|
||||
assertEquals(8, handler2.inboundBuffer.peek());
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
assertEquals(8, handler2.inboundBuffer.peek());
|
||||
assertEquals(8, handler2.outboundBuffer.peek());
|
||||
|
||||
assertEquals(0, handler1.outboundBuffer.size());
|
||||
assertEquals(0, handler3.inboundBuffer.size());
|
||||
assertEquals(0, handler1.outboundBuffer.size());
|
||||
assertEquals(0, handler3.inboundBuffer.size());
|
||||
|
||||
p.remove(handler2);
|
||||
assertEquals(8, handler3.inboundBuffer.peek());
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
}
|
||||
p.remove(handler2);
|
||||
assertEquals(8, handler3.inboundBuffer.peek());
|
||||
assertEquals(8, handler1.outboundBuffer.peek());
|
||||
}).sync();
|
||||
}
|
||||
|
||||
@ -870,12 +849,7 @@ public class DefaultChannelPipelineTest {
|
||||
@Override
|
||||
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
// Execute this later so we are sure the exception is handled first.
|
||||
ctx.executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
ctx.executor().execute(latch::countDown);
|
||||
throw exceptionRemoved;
|
||||
}
|
||||
});
|
||||
@ -986,34 +960,31 @@ public class DefaultChannelPipelineTest {
|
||||
try {
|
||||
final Object event = new Object();
|
||||
final Promise<Object> promise = ImmediateEventExecutor.INSTANCE.newPromise();
|
||||
pipeline1.channel().register().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
ChannelPipeline pipeline = future.channel().pipeline();
|
||||
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
|
||||
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
handlerAddedCalled.set(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
promise.setSuccess(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
});
|
||||
if (!handlerAddedCalled.get()) {
|
||||
promise.setFailure(new AssertionError("handlerAdded(...) should have been called"));
|
||||
return;
|
||||
pipeline1.channel().register().addListener((ChannelFutureListener) future -> {
|
||||
ChannelPipeline pipeline = future.channel().pipeline();
|
||||
final AtomicBoolean handlerAddedCalled = new AtomicBoolean();
|
||||
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
handlerAddedCalled.set(true);
|
||||
}
|
||||
// This event must be captured by the added handler.
|
||||
pipeline.fireUserEventTriggered(event);
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
promise.setSuccess(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
promise.setFailure(cause);
|
||||
}
|
||||
});
|
||||
if (!handlerAddedCalled.get()) {
|
||||
promise.setFailure(new AssertionError("handlerAdded(...) should have been called"));
|
||||
return;
|
||||
}
|
||||
// This event must be captured by the added handler.
|
||||
pipeline.fireUserEventTriggered(event);
|
||||
});
|
||||
assertSame(event, promise.syncUninterruptibly().getNow());
|
||||
} finally {
|
||||
@ -1184,11 +1155,8 @@ public class DefaultChannelPipelineTest {
|
||||
pipeline.channel().closeFuture().syncUninterruptibly();
|
||||
|
||||
// Schedule something on the EventLoop to ensure all other scheduled tasks had a chance to complete.
|
||||
pipeline.channel().eventLoop().submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// NOOP
|
||||
}
|
||||
pipeline.channel().eventLoop().submit(() -> {
|
||||
// NOOP
|
||||
}).syncUninterruptibly();
|
||||
Error error = errorRef.get();
|
||||
if (error != null) {
|
||||
@ -1646,33 +1614,30 @@ public class DefaultChannelPipelineTest {
|
||||
final Object writeObject = new Object();
|
||||
final CountDownLatch doneLatch = new CountDownLatch(1);
|
||||
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
if (evt == userEvent) {
|
||||
ctx.write(writeObject);
|
||||
}
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
}
|
||||
});
|
||||
pipeline.addFirst(new ChannelDuplexHandler() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||
ctx.fireUserEventTriggered(userEvent);
|
||||
Runnable r = () -> {
|
||||
pipeline.addLast(new ChannelInboundHandlerAdapter() {
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
if (evt == userEvent) {
|
||||
ctx.write(writeObject);
|
||||
}
|
||||
ctx.fireUserEventTriggered(evt);
|
||||
}
|
||||
});
|
||||
pipeline.addFirst(new ChannelDuplexHandler() {
|
||||
@Override
|
||||
public void handlerAdded(ChannelHandlerContext ctx) {
|
||||
ctx.fireUserEventTriggered(userEvent);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
if (msg == writeObject) {
|
||||
doneLatch.countDown();
|
||||
}
|
||||
ctx.write(msg, promise);
|
||||
@Override
|
||||
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
|
||||
if (msg == writeObject) {
|
||||
doneLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
ctx.write(msg, promise);
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
if (executeInEventLoop) {
|
||||
|
@ -40,12 +40,7 @@ public class PendingWriteQueueTest {
|
||||
assertFalse("Should not be writable anymore", ctx.channel().isWritable());
|
||||
|
||||
ChannelFuture future = queue.removeAndWrite();
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
assertQueueEmpty(queue);
|
||||
}
|
||||
});
|
||||
future.addListener((ChannelFutureListener) future1 -> assertQueueEmpty(queue));
|
||||
super.flush(ctx);
|
||||
}
|
||||
}, 1);
|
||||
@ -59,12 +54,7 @@ public class PendingWriteQueueTest {
|
||||
assertFalse("Should not be writable anymore", ctx.channel().isWritable());
|
||||
|
||||
ChannelFuture future = queue.removeAndWriteAll();
|
||||
future.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
assertQueueEmpty(queue);
|
||||
}
|
||||
});
|
||||
future.addListener((ChannelFutureListener) future1 -> assertQueueEmpty(queue));
|
||||
super.flush(ctx);
|
||||
}
|
||||
}, 3);
|
||||
@ -209,12 +199,7 @@ public class PendingWriteQueueTest {
|
||||
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
||||
|
||||
ChannelPromise promise = channel.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
queue.removeAndFailAll(new IllegalStateException());
|
||||
}
|
||||
});
|
||||
promise.addListener((ChannelFutureListener) future -> queue.removeAndFailAll(new IllegalStateException()));
|
||||
queue.add(1L, promise);
|
||||
|
||||
ChannelPromise promise2 = channel.newPromise();
|
||||
@ -241,12 +226,7 @@ public class PendingWriteQueueTest {
|
||||
|
||||
ChannelPromise promise = channel.newPromise();
|
||||
final ChannelPromise promise3 = channel.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
queue.add(3L, promise3);
|
||||
}
|
||||
});
|
||||
promise.addListener((ChannelFutureListener) future -> queue.add(3L, promise3));
|
||||
queue.add(1L, promise);
|
||||
ChannelPromise promise2 = channel.newPromise();
|
||||
queue.add(2L, promise2);
|
||||
@ -296,28 +276,15 @@ public class PendingWriteQueueTest {
|
||||
|
||||
ChannelPromise promise = channel.newPromise();
|
||||
final ChannelPromise promise3 = channel.newPromise();
|
||||
promise3.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
failOrder.add(3);
|
||||
}
|
||||
});
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
failOrder.add(1);
|
||||
queue.add(3L, promise3);
|
||||
}
|
||||
promise3.addListener((ChannelFutureListener) future -> failOrder.add(3));
|
||||
promise.addListener((ChannelFutureListener) future -> {
|
||||
failOrder.add(1);
|
||||
queue.add(3L, promise3);
|
||||
});
|
||||
queue.add(1L, promise);
|
||||
|
||||
ChannelPromise promise2 = channel.newPromise();
|
||||
promise2.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
failOrder.add(2);
|
||||
}
|
||||
});
|
||||
promise2.addListener((ChannelFutureListener) future -> failOrder.add(2));
|
||||
queue.add(2L, promise2);
|
||||
queue.removeAndFailAll(new Exception());
|
||||
assertTrue(promise.isDone());
|
||||
@ -338,12 +305,7 @@ public class PendingWriteQueueTest {
|
||||
final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext());
|
||||
|
||||
ChannelPromise promise = channel.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
queue.removeAndWriteAll();
|
||||
}
|
||||
});
|
||||
promise.addListener((ChannelFutureListener) future -> queue.removeAndWriteAll());
|
||||
queue.add(1L, promise);
|
||||
|
||||
ChannelPromise promise2 = channel.newPromise();
|
||||
|
@ -226,12 +226,7 @@ public class ReentrantChannelTest extends BaseChannelTest {
|
||||
|
||||
@Override
|
||||
public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
||||
promise.addListener(new GenericFutureListener<Future<? super Void>>() {
|
||||
@Override
|
||||
public void operationComplete(Future<? super Void> future) throws Exception {
|
||||
ctx.channel().close();
|
||||
}
|
||||
});
|
||||
promise.addListener(future -> ctx.channel().close());
|
||||
super.write(ctx, msg, promise);
|
||||
ctx.channel().flush();
|
||||
}
|
||||
|
@ -46,10 +46,7 @@ import static org.junit.Assert.*;
|
||||
|
||||
public class SingleThreadEventLoopTest {
|
||||
|
||||
private static final Runnable NOOP = new Runnable() {
|
||||
@Override
|
||||
public void run() { }
|
||||
};
|
||||
private static final Runnable NOOP = () -> { };
|
||||
|
||||
private SingleThreadEventLoopA loopA;
|
||||
private SingleThreadEventLoopB loopB;
|
||||
@ -98,12 +95,7 @@ public class SingleThreadEventLoopTest {
|
||||
@SuppressWarnings("deprecation")
|
||||
public void shutdownAfterStart() throws Exception {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
loopA.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
loopA.execute(latch::countDown);
|
||||
|
||||
// Wait for the event loop thread to start.
|
||||
latch.await();
|
||||
@ -142,12 +134,7 @@ public class SingleThreadEventLoopTest {
|
||||
private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException {
|
||||
long startTime = System.nanoTime();
|
||||
final AtomicLong endTime = new AtomicLong();
|
||||
loopA.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
endTime.set(System.nanoTime());
|
||||
}
|
||||
}, 500, TimeUnit.MILLISECONDS).get();
|
||||
loopA.schedule(() -> endTime.set(System.nanoTime()), 500, TimeUnit.MILLISECONDS).get();
|
||||
assertThat(endTime.get() - startTime,
|
||||
is(greaterThanOrEqualTo(TimeUnit.MILLISECONDS.toNanos(500))));
|
||||
}
|
||||
@ -166,17 +153,14 @@ public class SingleThreadEventLoopTest {
|
||||
final Queue<Long> timestamps = new LinkedBlockingQueue<>();
|
||||
final int expectedTimeStamps = 5;
|
||||
final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps);
|
||||
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
timestamps.add(System.nanoTime());
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
allTimeStampsLatch.countDown();
|
||||
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(() -> {
|
||||
timestamps.add(System.nanoTime());
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
allTimeStampsLatch.countDown();
|
||||
}, 100, 100, TimeUnit.MILLISECONDS);
|
||||
allTimeStampsLatch.await();
|
||||
assertTrue(f.cancel(true));
|
||||
@ -214,20 +198,17 @@ public class SingleThreadEventLoopTest {
|
||||
final Queue<Long> timestamps = new LinkedBlockingQueue<>();
|
||||
final int expectedTimeStamps = 5;
|
||||
final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps);
|
||||
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
boolean empty = timestamps.isEmpty();
|
||||
timestamps.add(System.nanoTime());
|
||||
if (empty) {
|
||||
try {
|
||||
Thread.sleep(401);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
ScheduledFuture<?> f = loopA.scheduleAtFixedRate(() -> {
|
||||
boolean empty = timestamps.isEmpty();
|
||||
timestamps.add(System.nanoTime());
|
||||
if (empty) {
|
||||
try {
|
||||
Thread.sleep(401);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
allTimeStampsLatch.countDown();
|
||||
}
|
||||
allTimeStampsLatch.countDown();
|
||||
}, 100, 100, TimeUnit.MILLISECONDS);
|
||||
allTimeStampsLatch.await();
|
||||
assertTrue(f.cancel(true));
|
||||
@ -268,17 +249,14 @@ public class SingleThreadEventLoopTest {
|
||||
final Queue<Long> timestamps = new LinkedBlockingQueue<>();
|
||||
final int expectedTimeStamps = 3;
|
||||
final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps);
|
||||
ScheduledFuture<?> f = loopA.scheduleWithFixedDelay(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
timestamps.add(System.nanoTime());
|
||||
try {
|
||||
Thread.sleep(51);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
allTimeStampsLatch.countDown();
|
||||
ScheduledFuture<?> f = loopA.scheduleWithFixedDelay(() -> {
|
||||
timestamps.add(System.nanoTime());
|
||||
try {
|
||||
Thread.sleep(51);
|
||||
} catch (InterruptedException e) {
|
||||
// Ignore
|
||||
}
|
||||
allTimeStampsLatch.countDown();
|
||||
}, 100, 100, TimeUnit.MILLISECONDS);
|
||||
allTimeStampsLatch.await();
|
||||
assertTrue(f.cancel(true));
|
||||
@ -305,16 +283,13 @@ public class SingleThreadEventLoopTest {
|
||||
final int NUM_TASKS = 3;
|
||||
final AtomicInteger ranTasks = new AtomicInteger();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Runnable task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ranTasks.incrementAndGet();
|
||||
while (latch.getCount() > 0) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
// Ignored
|
||||
}
|
||||
final Runnable task = () -> {
|
||||
ranTasks.incrementAndGet();
|
||||
while (latch.getCount() > 0) {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
// Ignored
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -380,12 +355,7 @@ public class SingleThreadEventLoopTest {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
Channel ch = new LocalChannel(loopA);
|
||||
ChannelPromise promise = ch.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
promise.addListener((ChannelFutureListener) future -> latch.countDown());
|
||||
|
||||
// Disable logging temporarily.
|
||||
Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
|
||||
|
@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import io.netty.channel.ChannelOutboundInvoker;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.Queue;
|
||||
@ -79,12 +80,7 @@ public class EmbeddedChannelTest {
|
||||
@Test(timeout = 2000)
|
||||
public void promiseDoesNotInfiniteLoop() throws InterruptedException {
|
||||
EmbeddedChannel channel = new EmbeddedChannel();
|
||||
channel.closeFuture().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
future.channel().close();
|
||||
}
|
||||
});
|
||||
channel.closeFuture().addListener((ChannelFutureListener) future -> future.channel().close());
|
||||
|
||||
channel.close().syncUninterruptibly();
|
||||
}
|
||||
@ -121,18 +117,8 @@ public class EmbeddedChannelTest {
|
||||
public void testScheduling() throws Exception {
|
||||
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
ScheduledFuture future = ch.eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS);
|
||||
future.addListener(new FutureListener() {
|
||||
@Override
|
||||
public void operationComplete(Future future) throws Exception {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
ScheduledFuture future = ch.eventLoop().schedule(latch::countDown, 1, TimeUnit.SECONDS);
|
||||
future.addListener((FutureListener) future1 -> latch.countDown());
|
||||
long next = ch.runScheduledPendingTasks();
|
||||
assertTrue(next > 0);
|
||||
// Sleep for the nanoseconds but also give extra 50ms as the clock my not be very precise and so fail the test
|
||||
@ -145,10 +131,7 @@ public class EmbeddedChannelTest {
|
||||
@Test
|
||||
public void testScheduledCancelled() throws Exception {
|
||||
EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
||||
ScheduledFuture<?> future = ch.eventLoop().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() { }
|
||||
}, 1, TimeUnit.DAYS);
|
||||
ScheduledFuture<?> future = ch.eventLoop().schedule(() -> { }, 1, TimeUnit.DAYS);
|
||||
ch.finish();
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
@ -200,35 +183,15 @@ public class EmbeddedChannelTest {
|
||||
// See https://github.com/netty/netty/issues/4316.
|
||||
@Test(timeout = 2000)
|
||||
public void testFireChannelInactiveAndUnregisteredOnClose() throws InterruptedException {
|
||||
testFireChannelInactiveAndUnregistered(new Action() {
|
||||
@Override
|
||||
public ChannelFuture doRun(Channel channel) {
|
||||
return channel.close();
|
||||
}
|
||||
});
|
||||
testFireChannelInactiveAndUnregistered(new Action() {
|
||||
@Override
|
||||
public ChannelFuture doRun(Channel channel) {
|
||||
return channel.close(channel.newPromise());
|
||||
}
|
||||
});
|
||||
testFireChannelInactiveAndUnregistered(ChannelOutboundInvoker::close);
|
||||
testFireChannelInactiveAndUnregistered(channel -> channel.close(channel.newPromise()));
|
||||
}
|
||||
|
||||
@Test(timeout = 2000)
|
||||
public void testFireChannelInactiveAndUnregisteredOnDisconnect() throws InterruptedException {
|
||||
testFireChannelInactiveAndUnregistered(new Action() {
|
||||
@Override
|
||||
public ChannelFuture doRun(Channel channel) {
|
||||
return channel.disconnect();
|
||||
}
|
||||
});
|
||||
testFireChannelInactiveAndUnregistered(ChannelOutboundInvoker::disconnect);
|
||||
|
||||
testFireChannelInactiveAndUnregistered(new Action() {
|
||||
@Override
|
||||
public ChannelFuture doRun(Channel channel) {
|
||||
return channel.disconnect(channel.newPromise());
|
||||
}
|
||||
});
|
||||
testFireChannelInactiveAndUnregistered(channel -> channel.disconnect(channel.newPromise()));
|
||||
}
|
||||
|
||||
private static void testFireChannelInactiveAndUnregistered(Action action) throws InterruptedException {
|
||||
@ -237,13 +200,8 @@ public class EmbeddedChannelTest {
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
latch.countDown();
|
||||
ctx.executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Should be executed.
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
// Should be executed.
|
||||
ctx.executor().execute(latch::countDown);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -368,12 +326,7 @@ public class EmbeddedChannelTest {
|
||||
@Override
|
||||
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
|
||||
throws Exception {
|
||||
ctx.executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ctx.write(msg, promise);
|
||||
}
|
||||
});
|
||||
ctx.executor().execute(() -> ctx.write(msg, promise));
|
||||
}
|
||||
});
|
||||
Object msg = new Object();
|
||||
@ -391,11 +344,8 @@ public class EmbeddedChannelTest {
|
||||
@Override
|
||||
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
|
||||
throws Exception {
|
||||
ctx.executor().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ctx.writeAndFlush(msg, promise);
|
||||
}
|
||||
ctx.executor().schedule(() -> {
|
||||
ctx.writeAndFlush(msg, promise);
|
||||
}, delay, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
});
|
||||
|
@ -117,13 +117,10 @@ public class LocalChannelTest {
|
||||
// Connect to the server
|
||||
cc = cb.connect(sc.localAddress()).sync().channel();
|
||||
final Channel ccCpy = cc;
|
||||
cc.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Send a message event up the pipeline.
|
||||
ccCpy.pipeline().fireChannelRead("Hello, World");
|
||||
latch.countDown();
|
||||
}
|
||||
cc.eventLoop().execute(() -> {
|
||||
// Send a message event up the pipeline.
|
||||
ccCpy.pipeline().fireChannelRead("Hello, World");
|
||||
latch.countDown();
|
||||
});
|
||||
assertTrue(latch.await(5, SECONDS));
|
||||
|
||||
@ -370,18 +367,10 @@ public class LocalChannelTest {
|
||||
|
||||
final Channel ccCpy = cc;
|
||||
// Make sure a write operation is executed in the eventloop
|
||||
cc.pipeline().lastContext().executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
ccCpy.pipeline().lastContext().close();
|
||||
}
|
||||
});
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
}
|
||||
cc.pipeline().lastContext().executor().execute(() -> {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener((ChannelFutureListener) future -> ccCpy.pipeline().lastContext().close());
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
});
|
||||
|
||||
assertTrue(messageLatch.await(5, SECONDS));
|
||||
@ -501,18 +490,11 @@ public class LocalChannelTest {
|
||||
|
||||
final Channel ccCpy = cc;
|
||||
// Make sure a write operation is executed in the eventloop
|
||||
cc.pipeline().lastContext().executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
ccCpy.writeAndFlush(data2.retainedDuplicate(), ccCpy.newPromise());
|
||||
}
|
||||
});
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
}
|
||||
cc.pipeline().lastContext().executor().execute(() -> {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener((ChannelFutureListener) future ->
|
||||
ccCpy.writeAndFlush(data2.retainedDuplicate(), ccCpy.newPromise()));
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
});
|
||||
|
||||
assertTrue(messageLatch.await(5, SECONDS));
|
||||
@ -583,19 +565,13 @@ public class LocalChannelTest {
|
||||
|
||||
final Channel ccCpy = cc;
|
||||
// Make sure a write operation is executed in the eventloop
|
||||
cc.pipeline().lastContext().executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
Channel serverChannelCpy = serverChannelRef.get();
|
||||
serverChannelCpy.writeAndFlush(data2.retainedDuplicate(), serverChannelCpy.newPromise());
|
||||
}
|
||||
});
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
}
|
||||
cc.pipeline().lastContext().executor().execute(() -> {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener((ChannelFutureListener) future -> {
|
||||
Channel serverChannelCpy = serverChannelRef.get();
|
||||
serverChannelCpy.writeAndFlush(data2.retainedDuplicate(), serverChannelCpy.newPromise());
|
||||
});
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
});
|
||||
|
||||
assertTrue(messageLatch.await(5, SECONDS));
|
||||
@ -665,20 +641,14 @@ public class LocalChannelTest {
|
||||
|
||||
final Channel ccCpy = cc;
|
||||
// Make sure a write operation is executed in the eventloop
|
||||
cc.pipeline().lastContext().executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
Channel serverChannelCpy = serverChannelRef.get();
|
||||
serverChannelCpy.writeAndFlush(
|
||||
data2.retainedDuplicate(), serverChannelCpy.newPromise());
|
||||
}
|
||||
});
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
}
|
||||
cc.pipeline().lastContext().executor().execute(() -> {
|
||||
ChannelPromise promise = ccCpy.newPromise();
|
||||
promise.addListener((ChannelFutureListener) future -> {
|
||||
Channel serverChannelCpy = serverChannelRef.get();
|
||||
serverChannelCpy.writeAndFlush(
|
||||
data2.retainedDuplicate(), serverChannelCpy.newPromise());
|
||||
});
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), promise);
|
||||
});
|
||||
|
||||
assertTrue(messageLatch.await(5, SECONDS));
|
||||
@ -747,47 +717,34 @@ public class LocalChannelTest {
|
||||
ccCpy.closeFuture().addListener(clientChannelCloseLatch);
|
||||
|
||||
// Make sure a write operation is executed in the eventloop
|
||||
cc.pipeline().lastContext().executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
cc.pipeline().lastContext().executor().execute(() ->
|
||||
ccCpy.writeAndFlush(data.retainedDuplicate(), ccCpy.newPromise())
|
||||
.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
serverChannelCpy.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// The point of this test is to write while the peer is closed, so we should
|
||||
// ensure the peer is actually closed before we write.
|
||||
int waitCount = 0;
|
||||
while (ccCpy.isOpen()) {
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException ignored) {
|
||||
// ignored
|
||||
}
|
||||
if (++waitCount > 5) {
|
||||
fail();
|
||||
}
|
||||
}
|
||||
serverChannelCpy.writeAndFlush(data2.retainedDuplicate(),
|
||||
serverChannelCpy.newPromise())
|
||||
.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
if (!future.isSuccess() &&
|
||||
future.cause() instanceof ClosedChannelException) {
|
||||
writeFailLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
ccCpy.close();
|
||||
.addListener((ChannelFutureListener) future -> {
|
||||
serverChannelCpy.eventLoop().execute(() -> {
|
||||
// The point of this test is to write while the peer is closed, so we should
|
||||
// ensure the peer is actually closed before we write.
|
||||
int waitCount = 0;
|
||||
while (ccCpy.isOpen()) {
|
||||
try {
|
||||
Thread.sleep(50);
|
||||
} catch (InterruptedException ignored) {
|
||||
// ignored
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
if (++waitCount > 5) {
|
||||
fail();
|
||||
}
|
||||
}
|
||||
serverChannelCpy.writeAndFlush(data2.retainedDuplicate(),
|
||||
serverChannelCpy.newPromise())
|
||||
.addListener((ChannelFutureListener) future1 -> {
|
||||
if (!future1.isSuccess() &&
|
||||
future1.cause() instanceof ClosedChannelException) {
|
||||
writeFailLatch.countDown();
|
||||
}
|
||||
});
|
||||
});
|
||||
ccCpy.close();
|
||||
}));
|
||||
|
||||
assertTrue(serverMessageLatch.await(5, SECONDS));
|
||||
assertTrue(writeFailLatch.await(5, SECONDS));
|
||||
@ -952,12 +909,9 @@ public class LocalChannelTest {
|
||||
}
|
||||
|
||||
private static void writeAndFlushReadOnSuccess(final ChannelHandlerContext ctx, Object msg) {
|
||||
ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) {
|
||||
if (future.isSuccess()) {
|
||||
ctx.read();
|
||||
}
|
||||
ctx.writeAndFlush(msg).addListener((ChannelFutureListener) future -> {
|
||||
if (future.isSuccess()) {
|
||||
ctx.read();
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -1182,12 +1136,9 @@ public class LocalChannelTest {
|
||||
if (!autoRead) {
|
||||
// The read will be scheduled 100ms in the future to ensure we not receive any
|
||||
// channelRead calls in the meantime.
|
||||
ctx.executor().schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
read = 0;
|
||||
ctx.read();
|
||||
}
|
||||
ctx.executor().schedule(() -> {
|
||||
read = 0;
|
||||
ctx.read();
|
||||
}, 100, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
read = 0;
|
||||
|
@ -266,12 +266,9 @@ public class LocalTransportThreadModelTest {
|
||||
final int end = i + ELEMS_PER_ROUNDS;
|
||||
i = end;
|
||||
|
||||
ch.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int j = start; j < end; j ++) {
|
||||
ch.pipeline().fireChannelRead(Integer.valueOf(j));
|
||||
}
|
||||
ch.eventLoop().execute(() -> {
|
||||
for (int j = start; j < end; j ++) {
|
||||
ch.pipeline().fireChannelRead(Integer.valueOf(j));
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -304,14 +301,11 @@ public class LocalTransportThreadModelTest {
|
||||
final int end = i + ELEMS_PER_ROUNDS;
|
||||
i = end;
|
||||
|
||||
ch.pipeline().context(h6).executor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (int j = start; j < end; j ++) {
|
||||
ch.write(Integer.valueOf(j));
|
||||
}
|
||||
ch.flush();
|
||||
ch.pipeline().context(h6).executor().execute(() -> {
|
||||
for (int j = start; j < end; j ++) {
|
||||
ch.write(Integer.valueOf(j));
|
||||
}
|
||||
ch.flush();
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -83,12 +83,7 @@ public class LocalTransportThreadModelTest2 {
|
||||
return;
|
||||
}
|
||||
|
||||
localChannel.eventLoop().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
close(localChannel, localRegistrationHandler);
|
||||
}
|
||||
});
|
||||
localChannel.eventLoop().execute(() -> close(localChannel, localRegistrationHandler));
|
||||
|
||||
// Wait until the connection is closed or the connection attempt fails.
|
||||
localChannel.closeFuture().awaitUninterruptibly();
|
||||
|
@ -152,25 +152,22 @@ public class LocalTransportThreadModelTest3 {
|
||||
|
||||
Throwable cause = new Throwable();
|
||||
|
||||
Thread pipelineModifier = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
Random random = new Random();
|
||||
Thread pipelineModifier = new Thread(() -> {
|
||||
Random random = new Random();
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
if (!ch.isRegistered()) {
|
||||
continue;
|
||||
}
|
||||
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
|
||||
ChannelHandler handler = ch.pipeline().removeFirst();
|
||||
ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder",
|
||||
UUID.randomUUID().toString(), handler);
|
||||
while (true) {
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
if (!ch.isRegistered()) {
|
||||
continue;
|
||||
}
|
||||
//EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)];
|
||||
ChannelHandler handler = ch.pipeline().removeFirst();
|
||||
ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder",
|
||||
UUID.randomUUID().toString(), handler);
|
||||
}
|
||||
});
|
||||
pipelineModifier.setDaemon(true);
|
||||
|
@ -23,7 +23,6 @@ import io.netty.channel.MultithreadEventLoopGroup;
|
||||
import io.netty.channel.SelectStrategy;
|
||||
import io.netty.channel.SelectStrategyFactory;
|
||||
import io.netty.channel.SingleThreadEventLoop;
|
||||
|
||||
import io.netty.channel.socket.ServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.util.IntSupplier;
|
||||
@ -38,7 +37,6 @@ import java.nio.channels.SelectionKey;
|
||||
import java.nio.channels.Selector;
|
||||
import java.nio.channels.SocketChannel;
|
||||
import java.nio.channels.spi.SelectorProvider;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -66,35 +64,15 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
Channel channel = new NioServerSocketChannel(loop, loop);
|
||||
channel.register().syncUninterruptibly();
|
||||
|
||||
Selector selector = loop.submit(new Callable<Selector>() {
|
||||
@Override
|
||||
public Selector call() throws Exception {
|
||||
return nioHandler.unwrappedSelector();
|
||||
}
|
||||
}).syncUninterruptibly().getNow();
|
||||
Selector selector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow();
|
||||
|
||||
assertSame(selector, loop.submit(new Callable<Selector>() {
|
||||
@Override
|
||||
public Selector call() throws Exception {
|
||||
return nioHandler.unwrappedSelector();
|
||||
}
|
||||
}).syncUninterruptibly().getNow());
|
||||
assertSame(selector, loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow());
|
||||
assertTrue(selector.isOpen());
|
||||
|
||||
// Submit to the EventLoop so we are sure its really executed in a non-async manner.
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
nioHandler.rebuildSelector();
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
loop.submit(nioHandler::rebuildSelector).syncUninterruptibly();
|
||||
|
||||
Selector newSelector = loop.submit(new Callable<Selector>() {
|
||||
@Override
|
||||
public Selector call() throws Exception {
|
||||
return nioHandler.unwrappedSelector();
|
||||
}
|
||||
}).syncUninterruptibly().getNow();
|
||||
Selector newSelector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow();
|
||||
assertTrue(newSelector.isOpen());
|
||||
assertNotSame(selector, newSelector);
|
||||
assertFalse(selector.isOpen());
|
||||
@ -110,11 +88,8 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
EventLoopGroup group = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
|
||||
|
||||
final EventLoop el = group.next();
|
||||
Future<?> future = el.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// NOOP
|
||||
}
|
||||
Future<?> future = el.schedule(() -> {
|
||||
// NOOP
|
||||
}, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
|
||||
|
||||
assertFalse(future.awaitUninterruptibly(1000));
|
||||
@ -127,48 +102,25 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
final NioHandler nioHandler = (NioHandler) NioHandler.newFactory().newHandler();
|
||||
EventLoop loop = new SingleThreadEventLoop(new DefaultThreadFactory("ioPool"), nioHandler);
|
||||
try {
|
||||
Selector selector = loop.submit(new Callable<Selector>() {
|
||||
@Override
|
||||
public Selector call() throws Exception {
|
||||
return nioHandler.unwrappedSelector();
|
||||
}
|
||||
}).syncUninterruptibly().getNow();
|
||||
Selector selector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow();
|
||||
assertTrue(selector.isOpen());
|
||||
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// Interrupt the thread which should not end-up in a busy spin and
|
||||
// so the selector should not have been rebuild.
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
loop.submit(() -> {
|
||||
// Interrupt the thread which should not end-up in a busy spin and
|
||||
// so the selector should not have been rebuild.
|
||||
Thread.currentThread().interrupt();
|
||||
}).syncUninterruptibly();
|
||||
|
||||
assertTrue(selector.isOpen());
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(2);
|
||||
loop.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
}).syncUninterruptibly();
|
||||
loop.submit(latch::countDown).syncUninterruptibly();
|
||||
|
||||
loop.schedule(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
latch.countDown();
|
||||
}
|
||||
}, 2, TimeUnit.SECONDS).syncUninterruptibly();
|
||||
loop.schedule(latch::countDown, 2, TimeUnit.SECONDS).syncUninterruptibly();
|
||||
|
||||
latch.await();
|
||||
|
||||
assertSame(selector, loop.submit(new Callable<Selector>() {
|
||||
@Override
|
||||
public Selector call() throws Exception {
|
||||
return nioHandler.unwrappedSelector();
|
||||
}
|
||||
}).syncUninterruptibly().getNow());
|
||||
assertSame(selector, loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow());
|
||||
assertTrue(selector.isOpen());
|
||||
} finally {
|
||||
loop.shutdownGracefully();
|
||||
@ -190,9 +142,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
loop.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
loop.execute(() ->
|
||||
nioHandler.register(selectableChannel, SelectionKey.OP_CONNECT, new NioTask<SocketChannel>() {
|
||||
@Override
|
||||
public void channelReady(SocketChannel ch, SelectionKey key) {
|
||||
@ -202,9 +152,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
@Override
|
||||
public void channelUnregistered(SocketChannel ch, Throwable cause) {
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}));
|
||||
|
||||
latch.await();
|
||||
|
||||
@ -219,27 +167,21 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
@Test
|
||||
public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() throws Exception {
|
||||
final AtomicReference<Throwable> error = new AtomicReference<>();
|
||||
final Runnable task = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
// NOOP
|
||||
}
|
||||
final Runnable task = () -> {
|
||||
// NOOP
|
||||
};
|
||||
// Just run often enough to trigger it normally.
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
EventLoopGroup group = new MultithreadEventLoopGroup(1, NioHandler.newFactory());
|
||||
final EventLoop loop = group.next();
|
||||
|
||||
Thread t = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for (;;) {
|
||||
loop.execute(task);
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
error.set(cause);
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
for (;;) {
|
||||
loop.execute(task);
|
||||
}
|
||||
} catch (Throwable cause) {
|
||||
error.set(cause);
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
@ -255,24 +197,19 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
public void testRebuildSelectorOnIOException() throws InterruptedException {
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final CountDownLatch strategyLatch = new CountDownLatch(1);
|
||||
SelectStrategyFactory selectStrategyFactory = new SelectStrategyFactory() {
|
||||
SelectStrategyFactory selectStrategyFactory = () -> new SelectStrategy() {
|
||||
|
||||
private boolean thrown;
|
||||
|
||||
@Override
|
||||
public SelectStrategy newSelectStrategy() {
|
||||
return new SelectStrategy() {
|
||||
|
||||
private boolean thrown;
|
||||
|
||||
@Override
|
||||
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
|
||||
strategyLatch.await();
|
||||
if (!thrown) {
|
||||
thrown = true;
|
||||
throw new IOException();
|
||||
}
|
||||
latch.countDown();
|
||||
return -1;
|
||||
}
|
||||
};
|
||||
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
|
||||
strategyLatch.await();
|
||||
if (!thrown) {
|
||||
thrown = true;
|
||||
throw new IOException();
|
||||
}
|
||||
latch.countDown();
|
||||
return -1;
|
||||
}
|
||||
};
|
||||
|
||||
@ -289,12 +226,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest {
|
||||
|
||||
latch.await();
|
||||
|
||||
Selector newSelector = loop.submit(new Callable<Selector>() {
|
||||
@Override
|
||||
public Selector call() throws Exception {
|
||||
return nioHandler.unwrappedSelector();
|
||||
}
|
||||
}).syncUninterruptibly().getNow();
|
||||
Selector newSelector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow();
|
||||
assertTrue(newSelector.isOpen());
|
||||
assertNotSame(selector, newSelector);
|
||||
assertFalse(selector.isOpen());
|
||||
|
@ -132,12 +132,9 @@ public class NioSocketChannelTest extends AbstractNioChannelTest<NioSocketChanne
|
||||
// Trigger a gathering write by writing two buffers.
|
||||
ctx.write(Unpooled.wrappedBuffer(new byte[] { 'a' }));
|
||||
ChannelFuture f = ctx.write(Unpooled.wrappedBuffer(new byte[] { 'b' }));
|
||||
f.addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture future) throws Exception {
|
||||
// This message must be flushed
|
||||
ctx.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{'c'}));
|
||||
}
|
||||
f.addListener((ChannelFutureListener) future -> {
|
||||
// This message must be flushed
|
||||
ctx.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{'c'}));
|
||||
});
|
||||
ctx.flush();
|
||||
}
|
||||
@ -197,12 +194,9 @@ public class NioSocketChannelTest extends AbstractNioChannelTest<NioSocketChanne
|
||||
// As soon as the channel becomes active re-register it to another
|
||||
// EventLoop. After this is done we should still receive the data that
|
||||
// was written to the channel.
|
||||
ctx.deregister().addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture cf) {
|
||||
Channel channel = cf.channel();
|
||||
channel.register();
|
||||
}
|
||||
ctx.deregister().addListener((ChannelFutureListener) cf -> {
|
||||
Channel channel = cf.channel();
|
||||
channel.register();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
Loading…
Reference in New Issue
Block a user