diff --git a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java index 3ddfd15c8a..d56968e016 100644 --- a/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/AbstractBootstrap.java @@ -244,17 +244,14 @@ public abstract class AbstractBootstrap, C } else { // Registration future is almost always fulfilled already, but just in case it's not. final ChannelPromise promise = channel.newPromise(); - regFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Throwable cause = future.cause(); - if (cause != null) { - // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an - // IllegalStateException once we try to access the EventLoop of the Channel. - promise.setFailure(cause); - } else { - doBind0(regFuture, channel, localAddress, promise); - } + regFuture.addListener((ChannelFutureListener) future -> { + Throwable cause = future.cause(); + if (cause != null) { + // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an + // IllegalStateException once we try to access the EventLoop of the Channel. + promise.setFailure(cause); + } else { + doBind0(regFuture, channel, localAddress, promise); } }); return promise; @@ -271,22 +268,14 @@ public abstract class AbstractBootstrap, C } final ChannelPromise promise = channel.newPromise(); - loop.execute(new Runnable() { - @Override - public void run() { - init(channel).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - channel.register(promise); - } else { - channel.unsafe().closeForcibly(); - promise.setFailure(future.cause()); - } - } - }); + loop.execute(() -> init(channel).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + channel.register(promise); + } else { + channel.unsafe().closeForcibly(); + promise.setFailure(future.cause()); } - }); + })); return promise; } @@ -301,14 +290,11 @@ public abstract class AbstractBootstrap, C // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - if (regFuture.isSuccess()) { - channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } else { - promise.setFailure(regFuture.cause()); - } + channel.eventLoop().execute(() -> { + if (regFuture.isSuccess()) { + channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } else { + promise.setFailure(regFuture.cause()); } }); } diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 05301ed499..11e1584706 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -206,19 +206,16 @@ public class Bootstrap extends AbstractBootstrap { + // Directly obtain the cause and do a null check so we only need one volatile read in case of a + // failure. + Throwable cause = future.cause(); + if (cause != null) { + // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an + // IllegalStateException once we try to access the EventLoop of the Channel. + promise.setFailure(cause); + } else { + doResolveAndConnect0(channel, remoteAddress, localAddress, promise); } }); return promise; @@ -254,15 +251,12 @@ public class Bootstrap extends AbstractBootstrap() { - @Override - public void operationComplete(Future future) throws Exception { - if (future.cause() != null) { - channel.close(); - promise.setFailure(future.cause()); - } else { - doConnect(future.getNow(), localAddress, promise); - } + resolveFuture.addListener((FutureListener) future -> { + if (future.cause() != null) { + channel.close(); + promise.setFailure(future.cause()); + } else { + doConnect(future.getNow(), localAddress, promise); } }); } catch (Throwable cause) { @@ -277,16 +271,13 @@ public class Bootstrap extends AbstractBootstrap { + if (localAddress == null) { + channel.connect(remoteAddress, connectPromise); + } else { + channel.connect(remoteAddress, localAddress, connectPromise); } + connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE); }); } diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index 4f423f3da5..9c9c3ebfd9 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -214,13 +214,10 @@ public class ServerBootstrap extends AbstractBootstrap { + pipeline.addLast(new ServerBootstrapAcceptor( + ch, currentChildHandler, currentChildOptions, currentChildAttrs)); + promise.setSuccess(); }); } }); @@ -277,12 +274,7 @@ public class ServerBootstrap extends AbstractBootstrap channel.config().setAutoRead(true); } @Override @@ -295,12 +287,7 @@ public class ServerBootstrap extends AbstractBootstrap initChild(child)); } catch (Throwable cause) { forceClose(child, cause); } @@ -319,12 +306,9 @@ public class ServerBootstrap extends AbstractBootstrap) e.getKey()).set(e.getValue()); } - child.register().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - forceClose(child, future.cause()); - } + child.register().addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + forceClose(child, future.cause()); } }); } catch (Throwable t) { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannel.java b/transport/src/main/java/io/netty/channel/AbstractChannel.java index fa8a312320..cd339533c0 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannel.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannel.java @@ -550,12 +550,9 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (!wasActive && isActive()) { - invokeLater(new Runnable() { - @Override - public void run() { - pipeline.fireChannelActive(); - readIfIsAutoRead(); - } + invokeLater(() -> { + pipeline.fireChannelActive(); + readIfIsAutoRead(); }); } @@ -580,12 +577,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } if (wasActive && !isActive()) { - invokeLater(new Runnable() { - @Override - public void run() { - pipeline.fireChannelInactive(); - } - }); + invokeLater(pipeline::fireChannelInactive); } safeSetSuccess(promise); @@ -631,24 +623,17 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha new ChannelOutputShutdownException("Channel output shutdown", cause); Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { - @Override - public void run() { - try { - // Execute the shutdown. - doShutdownOutput(); - promise.setSuccess(); - } catch (Throwable err) { - promise.setFailure(err); - } finally { - // Dispatch to the EventLoop - eventLoop().execute(new Runnable() { - @Override - public void run() { - closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause); - } - }); - } + closeExecutor.execute(() -> { + try { + // Execute the shutdown. + doShutdownOutput(); + promise.setSuccess(); + } catch (Throwable err) { + promise.setFailure(err); + } finally { + // Dispatch to the EventLoop + eventLoop().execute(() -> + closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause)); } }); } else { @@ -683,12 +668,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha safeSetSuccess(promise); } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise. // This means close() was called before so we just register a listener and return - closeFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - promise.setSuccess(); - } - }); + closeFuture.addListener((ChannelFutureListener) future -> promise.setSuccess()); } return; } @@ -700,26 +680,20 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. Executor closeExecutor = prepareToClose(); if (closeExecutor != null) { - closeExecutor.execute(new Runnable() { - @Override - public void run() { - try { - // Execute the close. - doClose0(promise); - } finally { - // Call invokeLater so closeAndDeregister is executed in the EventLoop again! - invokeLater(new Runnable() { - @Override - public void run() { - if (outboundBuffer != null) { - // Fail all the queued messages - outboundBuffer.failFlushed(cause, notify); - outboundBuffer.close(closeCause); - } - fireChannelInactiveAndDeregister(wasActive); - } - }); - } + closeExecutor.execute(() -> { + try { + // Execute the close. + doClose0(promise); + } finally { + // Call invokeLater so closeAndDeregister is executed in the EventLoop again! + invokeLater(() -> { + if (outboundBuffer != null) { + // Fail all the queued messages + outboundBuffer.failFlushed(cause, notify); + outboundBuffer.close(closeCause); + } + fireChannelInactiveAndDeregister(wasActive); + }); } }); } else { @@ -734,12 +708,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha } } if (inFlush0) { - invokeLater(new Runnable() { - @Override - public void run() { - fireChannelInactiveAndDeregister(wasActive); - } - }); + invokeLater(() -> fireChannelInactiveAndDeregister(wasActive)); } else { fireChannelInactiveAndDeregister(wasActive); } @@ -796,27 +765,24 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha // // See: // https://github.com/netty/netty/issues/4435 - invokeLater(new Runnable() { - @Override - public void run() { - try { - doDeregister(); - } catch (Throwable t) { - logger.warn("Unexpected exception occurred while deregistering a channel.", t); - } finally { - if (fireChannelInactive) { - pipeline.fireChannelInactive(); - } - // Some transports like local and AIO does not allow the deregistration of - // an open channel. Their doDeregister() calls close(). Consequently, - // close() calls deregister() again - no need to fire channelUnregistered, so check - // if it was registered. - if (registered) { - registered = false; - pipeline.fireChannelUnregistered(); - } - safeSetSuccess(promise); + invokeLater(() -> { + try { + doDeregister(); + } catch (Throwable t) { + logger.warn("Unexpected exception occurred while deregistering a channel.", t); + } finally { + if (fireChannelInactive) { + pipeline.fireChannelInactive(); } + // Some transports like local and AIO does not allow the deregistration of + // an open channel. Their doDeregister() calls close(). Consequently, + // close() calls deregister() again - no need to fire channelUnregistered, so check + // if it was registered. + if (registered) { + registered = false; + pipeline.fireChannelUnregistered(); + } + safeSetSuccess(promise); } }); } @@ -832,12 +798,7 @@ public abstract class AbstractChannel extends DefaultAttributeMap implements Cha try { doBeginRead(); } catch (final Exception e) { - invokeLater(new Runnable() { - @Override - public void run() { - pipeline.fireExceptionCaught(e); - } - }); + invokeLater(() -> pipeline.fireExceptionCaught(e)); close(voidPromise()); } } diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java index db0614372d..a3bf957652 100644 --- a/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java @@ -232,12 +232,8 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap private static boolean isSkippable( final Class handlerType, final String methodName, final Class... paramTypes) throws Exception { - return AccessController.doPrivileged(new PrivilegedExceptionAction() { - @Override - public Boolean run() throws Exception { - return handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(ChannelHandler.Skip.class); - } - }); + return AccessController.doPrivileged((PrivilegedExceptionAction) () -> + handlerType.getMethod(methodName, paramTypes).isAnnotationPresent(ChannelHandler.Skip.class)); } @Override @@ -280,12 +276,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelRegistered(); - } - }); + executor.execute(next::invokeChannelRegistered); } } @@ -312,12 +303,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeChannelUnregistered(); } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelUnregistered(); - } - }); + executor.execute(next::invokeChannelUnregistered); } } @@ -344,12 +330,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeChannelActive(); } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelActive(); - } - }); + executor.execute(next::invokeChannelActive); } } @@ -376,12 +357,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeChannelInactive(); } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelInactive(); - } - }); + executor.execute(next::invokeChannelInactive); } } @@ -410,12 +386,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap next.invokeExceptionCaught(cause); } else { try { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeExceptionCaught(cause); - } - }); + executor.execute(() -> next.invokeExceptionCaught(cause)); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to submit an exceptionCaught() event.", t); @@ -460,12 +431,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeUserEventTriggered(event); } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeUserEventTriggered(event); - } - }); + executor.execute(() -> next.invokeUserEventTriggered(event)); } } @@ -493,12 +459,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { - executor.execute(new Runnable() { - @Override - public void run() { - next.invokeChannelRead(m); - } - }); + executor.execute(() -> next.invokeChannelRead(m)); } } @@ -527,12 +488,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } else { Runnable task = next.invokeChannelReadCompleteTask; if (task == null) { - next.invokeChannelReadCompleteTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelReadComplete(); - } - }; + next.invokeChannelReadCompleteTask = task = next::invokeChannelReadComplete; } executor.execute(task); } @@ -563,12 +519,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } else { Runnable task = next.invokeChannelWritableStateChangedTask; if (task == null) { - next.invokeChannelWritableStateChangedTask = task = new Runnable() { - @Override - public void run() { - next.invokeChannelWritabilityChanged(); - } - }; + next.invokeChannelWritableStateChangedTask = task = next::invokeChannelWritabilityChanged; } executor.execute(task); } @@ -636,12 +587,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeBind(localAddress, promise); } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeBind(localAddress, promise); - } - }, promise, null); + safeExecute(executor, () -> next.invokeBind(localAddress, promise), promise, null); } return promise; } @@ -680,12 +626,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeConnect(remoteAddress, localAddress, promise); } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeConnect(remoteAddress, localAddress, promise); - } - }, promise, null); + safeExecute(executor, () -> next.invokeConnect(remoteAddress, localAddress, promise), promise, null); } return promise; } @@ -720,14 +661,11 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap next.invokeDisconnect(promise); } } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - if (!channel().metadata().hasDisconnect()) { - next.invokeClose(promise); - } else { - next.invokeDisconnect(promise); - } + safeExecute(executor, () -> { + if (!channel().metadata().hasDisconnect()) { + next.invokeClose(promise); + } else { + next.invokeDisconnect(promise); } }, promise, null); } @@ -758,12 +696,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeClose(promise); } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeClose(promise); - } - }, promise, null); + safeExecute(executor, () -> next.invokeClose(promise), promise, null); } return promise; @@ -793,12 +726,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeRegister(promise); } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeRegister(promise); - } - }, promise, null); + safeExecute(executor, () -> next.invokeRegister(promise), promise, null); } return promise; @@ -828,12 +756,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap if (executor.inEventLoop()) { next.invokeDeregister(promise); } else { - safeExecute(executor, new Runnable() { - @Override - public void run() { - next.invokeDeregister(promise); - } - }, promise, null); + safeExecute(executor, () -> next.invokeDeregister(promise), promise, null); } return promise; @@ -860,12 +783,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } else { Runnable task = next.invokeReadTask; if (task == null) { - next.invokeReadTask = task = new Runnable() { - @Override - public void run() { - next.invokeRead(); - } - }; + next.invokeReadTask = task = next::invokeRead; } executor.execute(task); } @@ -922,12 +840,7 @@ abstract class AbstractChannelHandlerContext extends DefaultAttributeMap } else { Runnable task = next.invokeFlushTask; if (task == null) { - next.invokeFlushTask = task = new Runnable() { - @Override - public void run() { - next.invokeFlush(); - } - }; + next.invokeFlushTask = task = next::invokeFlush; } safeExecute(executor, task, channel().voidPromise(), null); } diff --git a/transport/src/main/java/io/netty/channel/ChannelFutureListener.java b/transport/src/main/java/io/netty/channel/ChannelFutureListener.java index 38e76c1426..ebfa5e32e2 100644 --- a/transport/src/main/java/io/netty/channel/ChannelFutureListener.java +++ b/transport/src/main/java/io/netty/channel/ChannelFutureListener.java @@ -38,23 +38,15 @@ public interface ChannelFutureListener extends GenericFutureListener future.channel().close(); /** * A {@link ChannelFutureListener} that closes the {@link Channel} when the * operation ended up with a failure or cancellation rather than a success. */ - ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (!future.isSuccess()) { - future.channel().close(); - } + ChannelFutureListener CLOSE_ON_FAILURE = future -> { + if (!future.isSuccess()) { + future.channel().close(); } }; @@ -62,12 +54,9 @@ public interface ChannelFutureListener extends GenericFutureListener { + if (!future.isSuccess()) { + future.channel().pipeline().fireExceptionCaught(future.cause()); } }; diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java index d3a934a829..d64be73e75 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundBuffer.java @@ -602,12 +602,7 @@ public final class ChannelOutboundBuffer { if (invokeLater) { Runnable task = fireChannelWritabilityChangedTask; if (task == null) { - fireChannelWritabilityChangedTask = task = new Runnable() { - @Override - public void run() { - pipeline.fireChannelWritabilityChanged(); - } - }; + fireChannelWritabilityChangedTask = task = pipeline::fireChannelWritabilityChanged; } channel.eventLoop().execute(task); } else { @@ -654,12 +649,7 @@ public final class ChannelOutboundBuffer { void close(final Throwable cause, final boolean allowChannelOpen) { if (inFail) { - channel.eventLoop().execute(new Runnable() { - @Override - public void run() { - close(cause, allowChannelOpen); - } - }); + channel.eventLoop().execute(() -> close(cause, allowChannelOpen)); return; } diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java index 1f9bc4f179..c19e81b61a 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelDuplexHandler.java @@ -614,12 +614,7 @@ public class CombinedChannelDuplexHandler callHandlerAdded0(newCtx)); return this; } } @@ -162,12 +157,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { EventExecutor ctxExecutor = newCtx.executor(); if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - ctxExecutor.execute(new Runnable() { - @Override - public void run() { - callHandlerAdded0(newCtx); - } - }); + ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); return this; } } @@ -205,12 +195,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { EventExecutor ctxExecutor = newCtx.executor(); if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - ctxExecutor.execute(new Runnable() { - @Override - public void run() { - callHandlerAdded0(newCtx); - } - }); + ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); return this; } } @@ -256,12 +241,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { EventExecutor ctxExecutor = newCtx.executor(); if (!ctxExecutor.inEventLoop()) { newCtx.setAddPending(); - ctxExecutor.execute(new Runnable() { - @Override - public void run() { - callHandlerAdded0(newCtx); - } - }); + ctxExecutor.execute(() -> callHandlerAdded0(newCtx)); return this; } } @@ -407,12 +387,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { - executor.execute(new Runnable() { - @Override - public void run() { - callHandlerRemoved0(ctx); - } - }); + executor.execute(() -> callHandlerRemoved0(ctx)); return ctx; } } @@ -483,15 +458,12 @@ public class DefaultChannelPipeline implements ChannelPipeline { EventExecutor executor = ctx.executor(); if (!executor.inEventLoop()) { - executor.execute(new Runnable() { - @Override - public void run() { - // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) - // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and - // those event handlers must be called after handlerAdded(). - callHandlerAdded0(newCtx); - callHandlerRemoved0(ctx); - } + executor.execute(() -> { + // Invoke newHandler.handlerAdded() first (i.e. before oldHandler.handlerRemoved() is invoked) + // because callHandlerRemoved() will trigger channelRead() or flush() on newHandler and + // those event handlers must be called after handlerAdded(). + callHandlerAdded0(newCtx); + callHandlerRemoved0(ctx); }); return ctx.handler(); } @@ -776,12 +748,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { final EventExecutor executor = ctx.executor(); if (!inEventLoop && !executor.inEventLoop(currentThread)) { final AbstractChannelHandlerContext finalCtx = ctx; - executor.execute(new Runnable() { - @Override - public void run() { - destroyUp(finalCtx, true); - } - }); + executor.execute(() -> destroyUp(finalCtx, true)); break; } @@ -806,12 +773,7 @@ public class DefaultChannelPipeline implements ChannelPipeline { callHandlerRemoved0(ctx); } else { final AbstractChannelHandlerContext finalCtx = ctx; - executor.execute(new Runnable() { - @Override - public void run() { - destroyDown(Thread.currentThread(), finalCtx, true); - } - }); + executor.execute(() -> destroyDown(Thread.currentThread(), finalCtx, true)); break; } diff --git a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java index c684843ebf..05b8f48b90 100644 --- a/transport/src/main/java/io/netty/channel/VoidChannelPromise.java +++ b/transport/src/main/java/io/netty/channel/VoidChannelPromise.java @@ -40,13 +40,10 @@ public final class VoidChannelPromise extends AbstractFuture implements Ch } this.channel = channel; if (fireException) { - fireExceptionListener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Throwable cause = future.cause(); - if (cause != null) { - fireException0(cause); - } + fireExceptionListener = future -> { + Throwable cause = future.cause(); + if (cause != null) { + fireException0(cause); } }; } else { diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 0e0dfd59c7..dca9553d67 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -59,12 +59,7 @@ public class EmbeddedChannel extends AbstractChannel { private static final ChannelMetadata METADATA_NO_DISCONNECT = new ChannelMetadata(false); private static final ChannelMetadata METADATA_DISCONNECT = new ChannelMetadata(true); - private final ChannelFutureListener recordExceptionListener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - recordException(future); - } - }; + private final ChannelFutureListener recordExceptionListener = this::recordException; private final ChannelMetadata metadata; private final ChannelConfig config; diff --git a/transport/src/main/java/io/netty/channel/group/ChannelMatchers.java b/transport/src/main/java/io/netty/channel/group/ChannelMatchers.java index 5f67f5f9d6..c78556107e 100644 --- a/transport/src/main/java/io/netty/channel/group/ChannelMatchers.java +++ b/transport/src/main/java/io/netty/channel/group/ChannelMatchers.java @@ -23,12 +23,7 @@ import io.netty.channel.ServerChannel; */ public final class ChannelMatchers { - private static final ChannelMatcher ALL_MATCHER = new ChannelMatcher() { - @Override - public boolean matches(Channel channel) { - return true; - } - }; + private static final ChannelMatcher ALL_MATCHER = channel -> true; private static final ChannelMatcher SERVER_CHANNEL_MATCHER = isInstanceOf(ServerChannel.class); private static final ChannelMatcher NON_SERVER_CHANNEL_MATCHER = isNotInstanceOf(ServerChannel.class); diff --git a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java index 397adbc384..909742314e 100644 --- a/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java +++ b/transport/src/main/java/io/netty/channel/group/DefaultChannelGroup.java @@ -46,12 +46,7 @@ public class DefaultChannelGroup extends AbstractSet implements Channel private final EventExecutor executor; private final ConcurrentMap serverChannels = new ConcurrentHashMap<>(); private final ConcurrentMap nonServerChannels = new ConcurrentHashMap<>(); - private final ChannelFutureListener remover = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - remove(future.channel()); - } - }; + private final ChannelFutureListener remover = future -> remove(future.channel()); private final VoidChannelGroupFuture voidFuture = new VoidChannelGroupFuture(this); private final boolean stayClosed; private volatile boolean closed; diff --git a/transport/src/main/java/io/netty/channel/local/LocalChannel.java b/transport/src/main/java/io/netty/channel/local/LocalChannel.java index 6a810f5cae..50a0c2389a 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalChannel.java @@ -63,13 +63,10 @@ public class LocalChannel extends AbstractChannel { private final ChannelConfig config = new DefaultChannelConfig(this); // To further optimize this we could write our own SPSC queue. final Queue inboundBuffer = PlatformDependent.newSpscQueue(); - private final Runnable readTask = new Runnable() { - @Override - public void run() { - // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete() - if (!inboundBuffer.isEmpty()) { - readInbound(); - } + private final Runnable readTask = () -> { + // Ensure the inboundBuffer is not empty as readInbound() will always call fireChannelReadComplete() + if (!inboundBuffer.isEmpty()) { + readInbound(); } }; @@ -197,12 +194,7 @@ public class LocalChannel extends AbstractChannel { EventLoop peerEventLoop = peer.eventLoop(); final boolean peerIsActive = peer.isActive(); try { - peerEventLoop.execute(new Runnable() { - @Override - public void run() { - peer.tryClose(peerIsActive); - } - }); + peerEventLoop.execute(() -> peer.tryClose(peerIsActive)); } catch (Throwable cause) { logger.warn("Releasing Inbound Queues for channels {}-{} because exception occurred!", this, peer, cause); @@ -350,12 +342,7 @@ public class LocalChannel extends AbstractChannel { private void runFinishPeerReadTask(final LocalChannel peer) { // If the peer is writing, we must wait until after reads are completed for that peer before we can read. So // we keep track of the task, and coordinate later that our read can't happen until the peer is done. - final Runnable finishPeerReadTask = new Runnable() { - @Override - public void run() { - finishPeerRead0(peer); - } - }; + final Runnable finishPeerReadTask = () -> finishPeerRead0(peer); try { if (peer.writeInProgress) { peer.finishReadFuture = peer.eventLoop().submit(finishPeerReadTask); @@ -470,17 +457,14 @@ public class LocalChannel extends AbstractChannel { // This ensures that if both channels are on the same event loop, the peer's channelActive // event is triggered *after* this channel's channelRegistered event, so that this channel's // pipeline is fully initialized by ChannelInitializer before any channelRead events. - peer.eventLoop().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = peer.connectPromise; + peer.eventLoop().execute(() -> { + ChannelPromise promise = peer.connectPromise; - // Only trigger fireChannelActive() if the promise was not null and was not completed yet. - // connectPromise may be set to null if doClose() was called in the meantime. - if (promise != null && promise.trySuccess()) { - peer.pipeline().fireChannelActive(); - peer.readIfIsAutoRead(); - } + // Only trigger fireChannelActive() if the promise was not null and was not completed yet. + // connectPromise may be set to null if doClose() was called in the meantime. + if (promise != null && promise.trySuccess()) { + peer.pipeline().fireChannelActive(); + peer.readIfIsAutoRead(); } }); } diff --git a/transport/src/main/java/io/netty/channel/local/LocalHandler.java b/transport/src/main/java/io/netty/channel/local/LocalHandler.java index 26be1e7ca8..249eb5aae8 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalHandler.java +++ b/transport/src/main/java/io/netty/channel/local/LocalHandler.java @@ -38,12 +38,7 @@ public final class LocalHandler implements IoHandler { * Returns a new {@link IoHandlerFactory} that creates {@link LocalHandler} instances. */ public static IoHandlerFactory newFactory() { - return new IoHandlerFactory() { - @Override - public IoHandler newHandler() { - return new LocalHandler(); - } - }; + return LocalHandler::new; } private static LocalChannelUnsafe cast(Channel channel) { diff --git a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java index ed265c6310..73a099fcf9 100644 --- a/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java +++ b/transport/src/main/java/io/netty/channel/local/LocalServerChannel.java @@ -115,12 +115,7 @@ public class LocalServerChannel extends AbstractServerChannel { if (eventLoop().inEventLoop()) { serve0(child); } else { - eventLoop().execute(new Runnable() { - @Override - public void run() { - serve0(child); - } - }); + eventLoop().execute(() -> serve0(child)); } return child; } diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java index 70b14cef4f..039686e3aa 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioByteChannel.java @@ -47,13 +47,10 @@ public abstract class AbstractNioByteChannel extends AbstractNioChannel { " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " + StringUtil.simpleClassName(FileRegion.class) + ')'; - private final Runnable flushTask = new Runnable() { - @Override - public void run() { - // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the - // meantime. - ((AbstractNioUnsafe) unsafe()).flush0(); - } + private final Runnable flushTask = () -> { + // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the + // meantime. + ((AbstractNioUnsafe) unsafe()).flush0(); }; private boolean inputClosedSeenErrorOnRead; diff --git a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java index 530f527468..1a11229bc3 100644 --- a/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java +++ b/transport/src/main/java/io/netty/channel/nio/AbstractNioChannel.java @@ -57,12 +57,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { protected final int readInterestOp; volatile SelectionKey selectionKey; boolean readPending; - private final Runnable clearReadPendingRunnable = new Runnable() { - @Override - public void run() { - clearReadPending0(); - } - }; + private final Runnable clearReadPendingRunnable = this::clearReadPending0; /** * The future of the current connection attempt. If not null, subsequent @@ -142,12 +137,7 @@ public abstract class AbstractNioChannel extends AbstractChannel { if (eventLoop.inEventLoop()) { setReadPending0(readPending); } else { - eventLoop.execute(new Runnable() { - @Override - public void run() { - setReadPending0(readPending); - } - }); + eventLoop.execute(() -> setReadPending0(readPending)); } } else { // Best effort if we are not registered yet clear readPending. @@ -255,29 +245,23 @@ public abstract class AbstractNioChannel extends AbstractChannel { // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { - connectTimeoutFuture = eventLoop().schedule(new Runnable() { - @Override - public void run() { - ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; - ConnectTimeoutException cause = - new ConnectTimeoutException("connection timed out: " + remoteAddress); - if (connectPromise != null && connectPromise.tryFailure(cause)) { - close(voidPromise()); - } + connectTimeoutFuture = eventLoop().schedule(() -> { + ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isCancelled()) { - if (connectTimeoutFuture != null) { - connectTimeoutFuture.cancel(false); - } - connectPromise = null; - close(voidPromise()); + promise.addListener((ChannelFutureListener) future -> { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); } + connectPromise = null; + close(voidPromise()); } }); } diff --git a/transport/src/main/java/io/netty/channel/nio/NioHandler.java b/transport/src/main/java/io/netty/channel/nio/NioHandler.java index 764c3e13a4..4b9175536e 100644 --- a/transport/src/main/java/io/netty/channel/nio/NioHandler.java +++ b/transport/src/main/java/io/netty/channel/nio/NioHandler.java @@ -67,12 +67,7 @@ public final class NioHandler implements IoHandler { private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3; private static final int SELECTOR_AUTO_REBUILD_THRESHOLD; - private final IntSupplier selectNowSupplier = new IntSupplier() { - @Override - public int get() throws Exception { - return selectNow(); - } - }; + private final IntSupplier selectNowSupplier = this::selectNow; // Workaround for JDK NIO bug. // @@ -84,12 +79,9 @@ public final class NioHandler implements IoHandler { final String bugLevel = SystemPropertyUtil.get(key); if (bugLevel == null) { try { - AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Void run() { - System.setProperty(key, ""); - return null; - } + AccessController.doPrivileged((PrivilegedAction) () -> { + System.setProperty(key, ""); + return null; }); } catch (final SecurityException e) { logger.debug("Unable to get/set System Property: " + key, e); @@ -147,12 +139,7 @@ public final class NioHandler implements IoHandler { * Returns a new {@link IoHandlerFactory} that creates {@link NioHandler} instances. */ public static IoHandlerFactory newFactory() { - return new IoHandlerFactory() { - @Override - public IoHandler newHandler() { - return new NioHandler(); - } - }; + return NioHandler::new; } /** @@ -162,12 +149,7 @@ public final class NioHandler implements IoHandler { final SelectStrategyFactory selectStrategyFactory) { ObjectUtil.checkNotNull(selectorProvider, "selectorProvider"); ObjectUtil.checkNotNull(selectStrategyFactory, "selectStrategyFactory"); - return new IoHandlerFactory() { - @Override - public IoHandler newHandler() { - return new NioHandler(selectorProvider, selectStrategyFactory.newSelectStrategy()); - } - }; + return () -> new NioHandler(selectorProvider, selectStrategyFactory.newSelectStrategy()); } private static final class SelectorTuple { @@ -197,17 +179,14 @@ public final class NioHandler implements IoHandler { return new SelectorTuple(unwrappedSelector); } - Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Object run() { - try { - return Class.forName( - "sun.nio.ch.SelectorImpl", - false, - PlatformDependent.getSystemClassLoader()); - } catch (Throwable cause) { - return cause; - } + Object maybeSelectorImplClass = AccessController.doPrivileged((PrivilegedAction) () -> { + try { + return Class.forName( + "sun.nio.ch.SelectorImpl", + false, + PlatformDependent.getSystemClassLoader()); + } catch (Throwable cause) { + return cause; } }); @@ -224,45 +203,42 @@ public final class NioHandler implements IoHandler { final Class selectorImplClass = (Class) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); - Object maybeException = AccessController.doPrivileged(new PrivilegedAction() { - @Override - public Object run() { - try { - Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); - Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); + Object maybeException = AccessController.doPrivileged((PrivilegedAction) () -> { + try { + Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); + Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); - if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { - // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. - // This allows us to also do this in Java9+ without any extra flags. - long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); - long publicSelectedKeysFieldOffset = - PlatformDependent.objectFieldOffset(publicSelectedKeysField); + if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { + // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. + // This allows us to also do this in Java9+ without any extra flags. + long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); + long publicSelectedKeysFieldOffset = + PlatformDependent.objectFieldOffset(publicSelectedKeysField); - if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { - PlatformDependent.putObject( - unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); - PlatformDependent.putObject( - unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); - return null; - } - // We could not retrieve the offset, lets try reflection as last-resort. + if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { + PlatformDependent.putObject( + unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); + PlatformDependent.putObject( + unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); + return null; } - - Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); - if (cause != null) { - return cause; - } - cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); - if (cause != null) { - return cause; - } - - selectedKeysField.set(unwrappedSelector, selectedKeySet); - publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); - return null; - } catch (NoSuchFieldException | IllegalAccessException e) { - return e; + // We could not retrieve the offset, lets try reflection as last-resort. } + + Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); + if (cause != null) { + return cause; + } + cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); + if (cause != null) { + return cause; + } + + selectedKeysField.set(unwrappedSelector, selectedKeySet); + publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); + return null; + } catch (NoSuchFieldException | IllegalAccessException e) { + return e; } }); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index d2cd78716c..cad2618798 100644 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -170,12 +170,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (loop.inEventLoop()) { ((AbstractUnsafe) unsafe()).shutdownOutput(promise); } else { - loop.execute(new Runnable() { - @Override - public void run() { - ((AbstractUnsafe) unsafe()).shutdownOutput(promise); - } - }); + loop.execute(() -> ((AbstractUnsafe) unsafe()).shutdownOutput(promise)); } return promise; } @@ -196,12 +191,7 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (loop.inEventLoop()) { shutdownInput0(promise); } else { - loop.execute(new Runnable() { - @Override - public void run() { - shutdownInput0(promise); - } - }); + loop.execute(() -> shutdownInput0(promise)); } return promise; } @@ -217,12 +207,8 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (shutdownOutputFuture.isDone()) { shutdownOutputDone(shutdownOutputFuture, promise); } else { - shutdownOutputFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception { - shutdownOutputDone(shutdownOutputFuture, promise); - } - }); + shutdownOutputFuture.addListener((ChannelFutureListener) shutdownOutputFuture1 -> + shutdownOutputDone(shutdownOutputFuture1, promise)); } return promise; } @@ -232,12 +218,8 @@ public class NioSocketChannel extends AbstractNioByteChannel implements io.netty if (shutdownInputFuture.isDone()) { shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); } else { - shutdownInputFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception { - shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise); - } - }); + shutdownInputFuture.addListener((ChannelFutureListener) shutdownInputFuture1 -> + shutdownDone(shutdownOutputFuture, shutdownInputFuture1, promise)); } } diff --git a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java index c989d98018..eaa6c48d90 100644 --- a/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java +++ b/transport/src/test/java/io/netty/bootstrap/BootstrapTest.java @@ -16,8 +16,6 @@ package io.netty.bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFactory; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; @@ -26,18 +24,15 @@ import io.netty.channel.ChannelInboundHandler; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; -import io.netty.channel.MultithreadEventLoopGroup; -import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; -import io.netty.channel.ServerChannelFactory; +import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.local.LocalAddress; import io.netty.channel.local.LocalChannel; import io.netty.channel.local.LocalHandler; import io.netty.channel.local.LocalServerChannel; +import io.netty.resolver.AbstractAddressResolver; import io.netty.resolver.AddressResolver; import io.netty.resolver.AddressResolverGroup; -import io.netty.resolver.AbstractAddressResolver; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; @@ -55,8 +50,14 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; +import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class BootstrapTest { @@ -88,18 +89,12 @@ public class BootstrapTest { // Try to bind from each other. for (int i = 0; i < 1024; i ++) { - bindFutures.add(groupA.next().submit(new Runnable() { - @Override - public void run() { - bootstrapB.bind(LocalAddress.ANY); - } + bindFutures.add(groupA.next().submit(() -> { + bootstrapB.bind(LocalAddress.ANY); })); - bindFutures.add(groupB.next().submit(new Runnable() { - @Override - public void run() { - bootstrapA.bind(LocalAddress.ANY); - } + bindFutures.add(groupB.next().submit(() -> { + bootstrapA.bind(LocalAddress.ANY); })); } @@ -124,18 +119,12 @@ public class BootstrapTest { // Try to connect from each other. for (int i = 0; i < 1024; i ++) { - bindFutures.add(groupA.next().submit(new Runnable() { - @Override - public void run() { - bootstrapB.connect(LocalAddress.ANY); - } + bindFutures.add(groupA.next().submit(() -> { + bootstrapB.connect(LocalAddress.ANY); })); - bindFutures.add(groupB.next().submit(new Runnable() { - @Override - public void run() { - bootstrapA.connect(LocalAddress.ANY); - } + bindFutures.add(groupB.next().submit(() -> { + bootstrapA.connect(LocalAddress.ANY); })); } @@ -159,12 +148,9 @@ public class BootstrapTest { assertFalse(future.isDone()); registerHandler.registerPromise().setSuccess(); final BlockingQueue queue = new LinkedBlockingQueue<>(); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread())); - queue.add(future.isSuccess()); - } + future.addListener((ChannelFutureListener) future1 -> { + queue.add(future1.channel().eventLoop().inEventLoop(Thread.currentThread())); + queue.add(future1.isSuccess()); }); assertTrue(queue.take()); assertTrue(queue.take()); @@ -181,10 +167,8 @@ public class BootstrapTest { try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group); - bootstrap.channelFactory(new ServerChannelFactory() { - @Override - public ServerChannel newChannel(EventLoop eventLoop, EventLoopGroup childEventLoopGroup) { - return new LocalServerChannel(eventLoop, childEventLoopGroup) { + bootstrap.channelFactory((eventLoop, childEventLoopGroup) -> + new LocalServerChannel(eventLoop, childEventLoopGroup) { @Override public ChannelFuture bind(SocketAddress localAddress) { // Close the Channel to emulate what NIO and others impl do on bind failure @@ -200,9 +184,7 @@ public class BootstrapTest { close(); return promise.setFailure(new SocketException()); } - }; - } - }); + }); bootstrap.childHandler(new DummyHandler()); bootstrap.handler(registerHandler); bootstrap.localAddress(new LocalAddress("1")); @@ -210,12 +192,9 @@ public class BootstrapTest { assertFalse(future.isDone()); registerHandler.registerPromise().setSuccess(); final BlockingQueue queue = new LinkedBlockingQueue<>(); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - queue.add(future.channel().eventLoop().inEventLoop(Thread.currentThread())); - queue.add(future.isSuccess()); - } + future.addListener((ChannelFutureListener) future1 -> { + queue.add(future1.channel().eventLoop().inEventLoop(Thread.currentThread())); + queue.add(future1.isSuccess()); }); assertTrue(queue.take()); assertFalse(queue.take()); @@ -291,12 +270,9 @@ public class BootstrapTest { final Bootstrap bootstrap = new Bootstrap() .handler(dummyHandler) .group(groupA) - .channelFactory(new ChannelFactory() { - @Override - public Channel newChannel(EventLoop eventLoop) { - throw exception; - } - }); + .channelFactory(eventLoop -> { + throw exception; + }); ChannelFuture connectFuture = bootstrap.connect(LocalAddress.ANY); @@ -316,12 +292,9 @@ public class BootstrapTest { registerPromise = promise; latch.countDown(); ChannelPromise newPromise = ctx.newPromise(); - newPromise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess()) { - registerPromise.tryFailure(future.cause()); - } + newPromise.addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + registerPromise.tryFailure(future.cause()); } }); super.register(ctx, newPromise); @@ -356,14 +329,11 @@ public class BootstrapTest { @Override protected void doResolve( final SocketAddress unresolvedAddress, final Promise promise) { - executor().execute(new Runnable() { - @Override - public void run() { - if (success) { - promise.setSuccess(unresolvedAddress); - } else { - promise.setFailure(new UnknownHostException(unresolvedAddress.toString())); - } + executor().execute(() -> { + if (success) { + promise.setSuccess(unresolvedAddress); + } else { + promise.setFailure(new UnknownHostException(unresolvedAddress.toString())); } }); } @@ -372,14 +342,11 @@ public class BootstrapTest { protected void doResolveAll( final SocketAddress unresolvedAddress, final Promise> promise) throws Exception { - executor().execute(new Runnable() { - @Override - public void run() { - if (success) { - promise.setSuccess(Collections.singletonList(unresolvedAddress)); - } else { - promise.setFailure(new UnknownHostException(unresolvedAddress.toString())); - } + executor().execute(() -> { + if (success) { + promise.setSuccess(Collections.singletonList(unresolvedAddress)); + } else { + promise.setFailure(new UnknownHostException(unresolvedAddress.toString())); } }); } diff --git a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java index 397641d42a..36b4bc8816 100644 --- a/transport/src/test/java/io/netty/channel/AbstractChannelTest.java +++ b/transport/src/test/java/io/netty/channel/AbstractChannelTest.java @@ -51,12 +51,9 @@ public class AbstractChannelTest { when(eventLoop.inEventLoop()).thenReturn(true); when(eventLoop.unsafe()).thenReturn(mock(EventLoop.Unsafe.class)); - doAnswer(new Answer() { - @Override - public Object answer(InvocationOnMock invocationOnMock) throws Throwable { - ((Runnable) invocationOnMock.getArgument(0)).run(); - return null; - } + doAnswer(invocationOnMock -> { + ((Runnable) invocationOnMock.getArgument(0)).run(); + return null; }).when(eventLoop).execute(any(Runnable.class)); final TestChannel channel = new TestChannel(eventLoop); diff --git a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java index c1aa5d6ef9..07bf65e0e4 100644 --- a/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelInitializerTest.java @@ -146,11 +146,8 @@ public class ChannelInitializerTest { try { // Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the // pipeline. - channel.eventLoop().submit(new Runnable() { - @Override - public void run() { - // NOOP - } + channel.eventLoop().submit(() -> { + // NOOP }).syncUninterruptibly(); Iterator> handlers = channel.pipeline().iterator(); assertSame(handler1, handlers.next().getValue()); @@ -186,11 +183,8 @@ public class ChannelInitializerTest { try { // Execute some task on the EventLoop and wait until its done to be sure all handlers are added to the // pipeline. - channel.eventLoop().submit(new Runnable() { - @Override - public void run() { - // NOOP - } + channel.eventLoop().submit(() -> { + // NOOP }).syncUninterruptibly(); assertEquals(1, initChannelCalled.get()); assertEquals(2, registeredCalled.get()); diff --git a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java index f6bea40180..dfae3be5b2 100644 --- a/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java +++ b/transport/src/test/java/io/netty/channel/ChannelOutboundBufferTest.java @@ -424,25 +424,19 @@ public class ChannelOutboundBufferTest { final CountDownLatch executeLatch = new CountDownLatch(1); final CountDownLatch runLatch = new CountDownLatch(1); - executor.execute(new Runnable() { - @Override - public void run() { - try { - runLatch.countDown(); - executeLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + executor.execute(() -> { + try { + runLatch.countDown(); + executeLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } }); runLatch.await(); - executor.execute(new Runnable() { - @Override - public void run() { - // Will not be executed but ensure the pending count is 1. - } + executor.execute(() -> { + // Will not be executed but ensure the pending count is 1. }); assertEquals(1, executor.pendingTasks()); diff --git a/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java b/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java index 2a5749b3ab..f5c8b67ffa 100644 --- a/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java +++ b/transport/src/test/java/io/netty/channel/CoalescingBufferQueueTest.java @@ -53,12 +53,9 @@ public class CoalescingBufferQueueTest { channel = new EmbeddedChannel(); writeQueue = new CoalescingBufferQueue(channel, 16, true); catPromise = newPromise(); - mouseListener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - mouseDone = true; - mouseSuccess = future.isSuccess(); - } + mouseListener = future -> { + mouseDone = true; + mouseSuccess = future.isSuccess(); }; emptyPromise = newPromise(); voidPromise = channel.voidPromise(); diff --git a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java index cbf4794208..301a673bfa 100644 --- a/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java +++ b/transport/src/test/java/io/netty/channel/DefaultChannelPipelineTest.java @@ -368,17 +368,14 @@ public class DefaultChannelPipelineTest { // Add handler. p.addFirst(handler.name, handler); - self.eventLoop().execute(new Runnable() { - @Override - public void run() { - // Validate handler life-cycle methods called. - handler.validate(true, false); + self.eventLoop().execute(() -> { + // Validate handler life-cycle methods called. + handler.validate(true, false); - // Store handler into the list. - handlers.add(handler); + // Store handler into the list. + handlers.add(handler); - addLatch.countDown(); - } + addLatch.countDown(); }); } addLatch.await(); @@ -391,13 +388,10 @@ public class DefaultChannelPipelineTest { for (final LifeCycleAwareTestHandler handler : handlers) { assertSame(handler, p.remove(handler.name)); - self.eventLoop().execute(new Runnable() { - @Override - public void run() { - // Validate handler life-cycle methods called. - handler.validate(true, true); - removeLatch.countDown(); - } + self.eventLoop().execute(() -> { + // Validate handler life-cycle methods called. + handler.validate(true, true); + removeLatch.countDown(); }); } removeLatch.await(); @@ -410,17 +404,14 @@ public class DefaultChannelPipelineTest { setUp(handler1, handler2); - self.eventLoop().submit(new Runnable() { - @Override - public void run() { - ChannelPipeline p = self.pipeline(); - handler1.inboundBuffer.add(8); - assertEquals(8, handler1.inboundBuffer.peek()); - assertTrue(handler2.inboundBuffer.isEmpty()); - p.remove(handler1); - assertEquals(1, handler2.inboundBuffer.size()); - assertEquals(8, handler2.inboundBuffer.peek()); - } + self.eventLoop().submit(() -> { + ChannelPipeline p = self.pipeline(); + handler1.inboundBuffer.add(8); + assertEquals(8, handler1.inboundBuffer.peek()); + assertTrue(handler2.inboundBuffer.isEmpty()); + p.remove(handler1); + assertEquals(1, handler2.inboundBuffer.size()); + assertEquals(8, handler2.inboundBuffer.peek()); }).sync(); } @@ -431,17 +422,14 @@ public class DefaultChannelPipelineTest { setUp(handler1, handler2); - self.eventLoop().submit(new Runnable() { - @Override - public void run() { - ChannelPipeline p = self.pipeline(); - handler2.outboundBuffer.add(8); - assertEquals(8, handler2.outboundBuffer.peek()); - assertTrue(handler1.outboundBuffer.isEmpty()); - p.remove(handler2); - assertEquals(1, handler1.outboundBuffer.size()); - assertEquals(8, handler1.outboundBuffer.peek()); - } + self.eventLoop().submit(() -> { + ChannelPipeline p = self.pipeline(); + handler2.outboundBuffer.add(8); + assertEquals(8, handler2.outboundBuffer.peek()); + assertTrue(handler1.outboundBuffer.isEmpty()); + p.remove(handler2); + assertEquals(1, handler1.outboundBuffer.size()); + assertEquals(8, handler1.outboundBuffer.peek()); }).sync(); } @@ -452,16 +440,13 @@ public class DefaultChannelPipelineTest { setUp(handler1); - self.eventLoop().submit(new Runnable() { - @Override - public void run() { - ChannelPipeline p = self.pipeline(); - handler1.outboundBuffer.add(8); - assertEquals(8, handler1.outboundBuffer.peek()); - assertTrue(handler2.outboundBuffer.isEmpty()); - p.replace(handler1, "handler2", handler2); - assertEquals(8, handler2.outboundBuffer.peek()); - } + self.eventLoop().submit(() -> { + ChannelPipeline p = self.pipeline(); + handler1.outboundBuffer.add(8); + assertEquals(8, handler1.outboundBuffer.peek()); + assertTrue(handler2.outboundBuffer.isEmpty()); + p.replace(handler1, "handler2", handler2); + assertEquals(8, handler2.outboundBuffer.peek()); }).sync(); } @@ -472,22 +457,19 @@ public class DefaultChannelPipelineTest { setUp(handler1); - self.eventLoop().submit(new Runnable() { - @Override - public void run() { - ChannelPipeline p = self.pipeline(); - handler1.inboundBuffer.add(8); - handler1.outboundBuffer.add(8); + self.eventLoop().submit(() -> { + ChannelPipeline p = self.pipeline(); + handler1.inboundBuffer.add(8); + handler1.outboundBuffer.add(8); - assertEquals(8, handler1.inboundBuffer.peek()); - assertEquals(8, handler1.outboundBuffer.peek()); - assertTrue(handler2.inboundBuffer.isEmpty()); - assertTrue(handler2.outboundBuffer.isEmpty()); + assertEquals(8, handler1.inboundBuffer.peek()); + assertEquals(8, handler1.outboundBuffer.peek()); + assertTrue(handler2.inboundBuffer.isEmpty()); + assertTrue(handler2.outboundBuffer.isEmpty()); - p.replace(handler1, "handler2", handler2); - assertEquals(8, handler2.outboundBuffer.peek()); - assertEquals(8, handler2.inboundBuffer.peek()); - } + p.replace(handler1, "handler2", handler2); + assertEquals(8, handler2.outboundBuffer.peek()); + assertEquals(8, handler2.inboundBuffer.peek()); }).sync(); } @@ -499,23 +481,20 @@ public class DefaultChannelPipelineTest { setUp(handler1, handler2, handler3); - self.eventLoop().submit(new Runnable() { - @Override - public void run() { - ChannelPipeline p = self.pipeline(); - handler2.inboundBuffer.add(8); - handler2.outboundBuffer.add(8); + self.eventLoop().submit(() -> { + ChannelPipeline p = self.pipeline(); + handler2.inboundBuffer.add(8); + handler2.outboundBuffer.add(8); - assertEquals(8, handler2.inboundBuffer.peek()); - assertEquals(8, handler2.outboundBuffer.peek()); + assertEquals(8, handler2.inboundBuffer.peek()); + assertEquals(8, handler2.outboundBuffer.peek()); - assertEquals(0, handler1.outboundBuffer.size()); - assertEquals(0, handler3.inboundBuffer.size()); + assertEquals(0, handler1.outboundBuffer.size()); + assertEquals(0, handler3.inboundBuffer.size()); - p.remove(handler2); - assertEquals(8, handler3.inboundBuffer.peek()); - assertEquals(8, handler1.outboundBuffer.peek()); - } + p.remove(handler2); + assertEquals(8, handler3.inboundBuffer.peek()); + assertEquals(8, handler1.outboundBuffer.peek()); }).sync(); } @@ -870,12 +849,7 @@ public class DefaultChannelPipelineTest { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // Execute this later so we are sure the exception is handled first. - ctx.executor().execute(new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }); + ctx.executor().execute(latch::countDown); throw exceptionRemoved; } }); @@ -986,34 +960,31 @@ public class DefaultChannelPipelineTest { try { final Object event = new Object(); final Promise promise = ImmediateEventExecutor.INSTANCE.newPromise(); - pipeline1.channel().register().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - ChannelPipeline pipeline = future.channel().pipeline(); - final AtomicBoolean handlerAddedCalled = new AtomicBoolean(); - pipeline.addLast(new ChannelInboundHandlerAdapter() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - handlerAddedCalled.set(true); - } - - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - promise.setSuccess(event); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - promise.setFailure(cause); - } - }); - if (!handlerAddedCalled.get()) { - promise.setFailure(new AssertionError("handlerAdded(...) should have been called")); - return; + pipeline1.channel().register().addListener((ChannelFutureListener) future -> { + ChannelPipeline pipeline = future.channel().pipeline(); + final AtomicBoolean handlerAddedCalled = new AtomicBoolean(); + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + handlerAddedCalled.set(true); } - // This event must be captured by the added handler. - pipeline.fireUserEventTriggered(event); + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + promise.setSuccess(event); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + promise.setFailure(cause); + } + }); + if (!handlerAddedCalled.get()) { + promise.setFailure(new AssertionError("handlerAdded(...) should have been called")); + return; } + // This event must be captured by the added handler. + pipeline.fireUserEventTriggered(event); }); assertSame(event, promise.syncUninterruptibly().getNow()); } finally { @@ -1184,11 +1155,8 @@ public class DefaultChannelPipelineTest { pipeline.channel().closeFuture().syncUninterruptibly(); // Schedule something on the EventLoop to ensure all other scheduled tasks had a chance to complete. - pipeline.channel().eventLoop().submit(new Runnable() { - @Override - public void run() { - // NOOP - } + pipeline.channel().eventLoop().submit(() -> { + // NOOP }).syncUninterruptibly(); Error error = errorRef.get(); if (error != null) { @@ -1646,33 +1614,30 @@ public class DefaultChannelPipelineTest { final Object writeObject = new Object(); final CountDownLatch doneLatch = new CountDownLatch(1); - Runnable r = new Runnable() { - @Override - public void run() { - pipeline.addLast(new ChannelInboundHandlerAdapter() { - @Override - public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { - if (evt == userEvent) { - ctx.write(writeObject); - } - ctx.fireUserEventTriggered(evt); - } - }); - pipeline.addFirst(new ChannelDuplexHandler() { - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - ctx.fireUserEventTriggered(userEvent); + Runnable r = () -> { + pipeline.addLast(new ChannelInboundHandlerAdapter() { + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + if (evt == userEvent) { + ctx.write(writeObject); } + ctx.fireUserEventTriggered(evt); + } + }); + pipeline.addFirst(new ChannelDuplexHandler() { + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + ctx.fireUserEventTriggered(userEvent); + } - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - if (msg == writeObject) { - doneLatch.countDown(); - } - ctx.write(msg, promise); + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (msg == writeObject) { + doneLatch.countDown(); } - }); - } + ctx.write(msg, promise); + } + }); }; if (executeInEventLoop) { diff --git a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java index fc262e725c..218a1bc154 100644 --- a/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java +++ b/transport/src/test/java/io/netty/channel/PendingWriteQueueTest.java @@ -40,12 +40,7 @@ public class PendingWriteQueueTest { assertFalse("Should not be writable anymore", ctx.channel().isWritable()); ChannelFuture future = queue.removeAndWrite(); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - assertQueueEmpty(queue); - } - }); + future.addListener((ChannelFutureListener) future1 -> assertQueueEmpty(queue)); super.flush(ctx); } }, 1); @@ -59,12 +54,7 @@ public class PendingWriteQueueTest { assertFalse("Should not be writable anymore", ctx.channel().isWritable()); ChannelFuture future = queue.removeAndWriteAll(); - future.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - assertQueueEmpty(queue); - } - }); + future.addListener((ChannelFutureListener) future1 -> assertQueueEmpty(queue)); super.flush(ctx); } }, 3); @@ -209,12 +199,7 @@ public class PendingWriteQueueTest { final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - queue.removeAndFailAll(new IllegalStateException()); - } - }); + promise.addListener((ChannelFutureListener) future -> queue.removeAndFailAll(new IllegalStateException())); queue.add(1L, promise); ChannelPromise promise2 = channel.newPromise(); @@ -241,12 +226,7 @@ public class PendingWriteQueueTest { ChannelPromise promise = channel.newPromise(); final ChannelPromise promise3 = channel.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - queue.add(3L, promise3); - } - }); + promise.addListener((ChannelFutureListener) future -> queue.add(3L, promise3)); queue.add(1L, promise); ChannelPromise promise2 = channel.newPromise(); queue.add(2L, promise2); @@ -296,28 +276,15 @@ public class PendingWriteQueueTest { ChannelPromise promise = channel.newPromise(); final ChannelPromise promise3 = channel.newPromise(); - promise3.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - failOrder.add(3); - } - }); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - failOrder.add(1); - queue.add(3L, promise3); - } + promise3.addListener((ChannelFutureListener) future -> failOrder.add(3)); + promise.addListener((ChannelFutureListener) future -> { + failOrder.add(1); + queue.add(3L, promise3); }); queue.add(1L, promise); ChannelPromise promise2 = channel.newPromise(); - promise2.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - failOrder.add(2); - } - }); + promise2.addListener((ChannelFutureListener) future -> failOrder.add(2)); queue.add(2L, promise2); queue.removeAndFailAll(new Exception()); assertTrue(promise.isDone()); @@ -338,12 +305,7 @@ public class PendingWriteQueueTest { final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().firstContext()); ChannelPromise promise = channel.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - queue.removeAndWriteAll(); - } - }); + promise.addListener((ChannelFutureListener) future -> queue.removeAndWriteAll()); queue.add(1L, promise); ChannelPromise promise2 = channel.newPromise(); diff --git a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java index d629964936..4bebdc8016 100644 --- a/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java +++ b/transport/src/test/java/io/netty/channel/ReentrantChannelTest.java @@ -226,12 +226,7 @@ public class ReentrantChannelTest extends BaseChannelTest { @Override public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - promise.addListener(new GenericFutureListener>() { - @Override - public void operationComplete(Future future) throws Exception { - ctx.channel().close(); - } - }); + promise.addListener(future -> ctx.channel().close()); super.write(ctx, msg, promise); ctx.channel().flush(); } diff --git a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java index 24bc023117..54a2775d35 100644 --- a/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/SingleThreadEventLoopTest.java @@ -46,10 +46,7 @@ import static org.junit.Assert.*; public class SingleThreadEventLoopTest { - private static final Runnable NOOP = new Runnable() { - @Override - public void run() { } - }; + private static final Runnable NOOP = () -> { }; private SingleThreadEventLoopA loopA; private SingleThreadEventLoopB loopB; @@ -98,12 +95,7 @@ public class SingleThreadEventLoopTest { @SuppressWarnings("deprecation") public void shutdownAfterStart() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - loopA.execute(new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }); + loopA.execute(latch::countDown); // Wait for the event loop thread to start. latch.await(); @@ -142,12 +134,7 @@ public class SingleThreadEventLoopTest { private static void testScheduleTask(EventLoop loopA) throws InterruptedException, ExecutionException { long startTime = System.nanoTime(); final AtomicLong endTime = new AtomicLong(); - loopA.schedule(new Runnable() { - @Override - public void run() { - endTime.set(System.nanoTime()); - } - }, 500, TimeUnit.MILLISECONDS).get(); + loopA.schedule(() -> endTime.set(System.nanoTime()), 500, TimeUnit.MILLISECONDS).get(); assertThat(endTime.get() - startTime, is(greaterThanOrEqualTo(TimeUnit.MILLISECONDS.toNanos(500)))); } @@ -166,17 +153,14 @@ public class SingleThreadEventLoopTest { final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - timestamps.add(System.nanoTime()); - try { - Thread.sleep(50); - } catch (InterruptedException e) { - // Ignore - } - allTimeStampsLatch.countDown(); + ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + timestamps.add(System.nanoTime()); + try { + Thread.sleep(50); + } catch (InterruptedException e) { + // Ignore } + allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); assertTrue(f.cancel(true)); @@ -214,20 +198,17 @@ public class SingleThreadEventLoopTest { final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 5; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - boolean empty = timestamps.isEmpty(); - timestamps.add(System.nanoTime()); - if (empty) { - try { - Thread.sleep(401); - } catch (InterruptedException e) { - // Ignore - } + ScheduledFuture f = loopA.scheduleAtFixedRate(() -> { + boolean empty = timestamps.isEmpty(); + timestamps.add(System.nanoTime()); + if (empty) { + try { + Thread.sleep(401); + } catch (InterruptedException e) { + // Ignore } - allTimeStampsLatch.countDown(); } + allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); assertTrue(f.cancel(true)); @@ -268,17 +249,14 @@ public class SingleThreadEventLoopTest { final Queue timestamps = new LinkedBlockingQueue<>(); final int expectedTimeStamps = 3; final CountDownLatch allTimeStampsLatch = new CountDownLatch(expectedTimeStamps); - ScheduledFuture f = loopA.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - timestamps.add(System.nanoTime()); - try { - Thread.sleep(51); - } catch (InterruptedException e) { - // Ignore - } - allTimeStampsLatch.countDown(); + ScheduledFuture f = loopA.scheduleWithFixedDelay(() -> { + timestamps.add(System.nanoTime()); + try { + Thread.sleep(51); + } catch (InterruptedException e) { + // Ignore } + allTimeStampsLatch.countDown(); }, 100, 100, TimeUnit.MILLISECONDS); allTimeStampsLatch.await(); assertTrue(f.cancel(true)); @@ -305,16 +283,13 @@ public class SingleThreadEventLoopTest { final int NUM_TASKS = 3; final AtomicInteger ranTasks = new AtomicInteger(); final CountDownLatch latch = new CountDownLatch(1); - final Runnable task = new Runnable() { - @Override - public void run() { - ranTasks.incrementAndGet(); - while (latch.getCount() > 0) { - try { - latch.await(); - } catch (InterruptedException e) { - // Ignored - } + final Runnable task = () -> { + ranTasks.incrementAndGet(); + while (latch.getCount() > 0) { + try { + latch.await(); + } catch (InterruptedException e) { + // Ignored } } }; @@ -380,12 +355,7 @@ public class SingleThreadEventLoopTest { final CountDownLatch latch = new CountDownLatch(1); Channel ch = new LocalChannel(loopA); ChannelPromise promise = ch.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - latch.countDown(); - } - }); + promise.addListener((ChannelFutureListener) future -> latch.countDown()); // Disable logging temporarily. Logger root = (Logger) LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 9419ae6d24..1df6ef3cfe 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.channel.ChannelOutboundInvoker; import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; @@ -79,12 +80,7 @@ public class EmbeddedChannelTest { @Test(timeout = 2000) public void promiseDoesNotInfiniteLoop() throws InterruptedException { EmbeddedChannel channel = new EmbeddedChannel(); - channel.closeFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - future.channel().close(); - } - }); + channel.closeFuture().addListener((ChannelFutureListener) future -> future.channel().close()); channel.close().syncUninterruptibly(); } @@ -121,18 +117,8 @@ public class EmbeddedChannelTest { public void testScheduling() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); final CountDownLatch latch = new CountDownLatch(2); - ScheduledFuture future = ch.eventLoop().schedule(new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }, 1, TimeUnit.SECONDS); - future.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - latch.countDown(); - } - }); + ScheduledFuture future = ch.eventLoop().schedule(latch::countDown, 1, TimeUnit.SECONDS); + future.addListener((FutureListener) future1 -> latch.countDown()); long next = ch.runScheduledPendingTasks(); assertTrue(next > 0); // Sleep for the nanoseconds but also give extra 50ms as the clock my not be very precise and so fail the test @@ -145,10 +131,7 @@ public class EmbeddedChannelTest { @Test public void testScheduledCancelled() throws Exception { EmbeddedChannel ch = new EmbeddedChannel(new ChannelInboundHandlerAdapter()); - ScheduledFuture future = ch.eventLoop().schedule(new Runnable() { - @Override - public void run() { } - }, 1, TimeUnit.DAYS); + ScheduledFuture future = ch.eventLoop().schedule(() -> { }, 1, TimeUnit.DAYS); ch.finish(); assertTrue(future.isCancelled()); } @@ -200,35 +183,15 @@ public class EmbeddedChannelTest { // See https://github.com/netty/netty/issues/4316. @Test(timeout = 2000) public void testFireChannelInactiveAndUnregisteredOnClose() throws InterruptedException { - testFireChannelInactiveAndUnregistered(new Action() { - @Override - public ChannelFuture doRun(Channel channel) { - return channel.close(); - } - }); - testFireChannelInactiveAndUnregistered(new Action() { - @Override - public ChannelFuture doRun(Channel channel) { - return channel.close(channel.newPromise()); - } - }); + testFireChannelInactiveAndUnregistered(ChannelOutboundInvoker::close); + testFireChannelInactiveAndUnregistered(channel -> channel.close(channel.newPromise())); } @Test(timeout = 2000) public void testFireChannelInactiveAndUnregisteredOnDisconnect() throws InterruptedException { - testFireChannelInactiveAndUnregistered(new Action() { - @Override - public ChannelFuture doRun(Channel channel) { - return channel.disconnect(); - } - }); + testFireChannelInactiveAndUnregistered(ChannelOutboundInvoker::disconnect); - testFireChannelInactiveAndUnregistered(new Action() { - @Override - public ChannelFuture doRun(Channel channel) { - return channel.disconnect(channel.newPromise()); - } - }); + testFireChannelInactiveAndUnregistered(channel -> channel.disconnect(channel.newPromise())); } private static void testFireChannelInactiveAndUnregistered(Action action) throws InterruptedException { @@ -237,13 +200,8 @@ public class EmbeddedChannelTest { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { latch.countDown(); - ctx.executor().execute(new Runnable() { - @Override - public void run() { - // Should be executed. - latch.countDown(); - } - }); + // Should be executed. + ctx.executor().execute(latch::countDown); } @Override @@ -368,12 +326,7 @@ public class EmbeddedChannelTest { @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { - ctx.executor().execute(new Runnable() { - @Override - public void run() { - ctx.write(msg, promise); - } - }); + ctx.executor().execute(() -> ctx.write(msg, promise)); } }); Object msg = new Object(); @@ -391,11 +344,8 @@ public class EmbeddedChannelTest { @Override public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - ctx.writeAndFlush(msg, promise); - } + ctx.executor().schedule(() -> { + ctx.writeAndFlush(msg, promise); }, delay, TimeUnit.MILLISECONDS); } }); diff --git a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java index 541c611cc9..aa0900fcaa 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalChannelTest.java @@ -117,13 +117,10 @@ public class LocalChannelTest { // Connect to the server cc = cb.connect(sc.localAddress()).sync().channel(); final Channel ccCpy = cc; - cc.eventLoop().execute(new Runnable() { - @Override - public void run() { - // Send a message event up the pipeline. - ccCpy.pipeline().fireChannelRead("Hello, World"); - latch.countDown(); - } + cc.eventLoop().execute(() -> { + // Send a message event up the pipeline. + ccCpy.pipeline().fireChannelRead("Hello, World"); + latch.countDown(); }); assertTrue(latch.await(5, SECONDS)); @@ -370,18 +367,10 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = ccCpy.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - ccCpy.pipeline().lastContext().close(); - } - }); - ccCpy.writeAndFlush(data.retainedDuplicate(), promise); - } + cc.pipeline().lastContext().executor().execute(() -> { + ChannelPromise promise = ccCpy.newPromise(); + promise.addListener((ChannelFutureListener) future -> ccCpy.pipeline().lastContext().close()); + ccCpy.writeAndFlush(data.retainedDuplicate(), promise); }); assertTrue(messageLatch.await(5, SECONDS)); @@ -501,18 +490,11 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = ccCpy.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - ccCpy.writeAndFlush(data2.retainedDuplicate(), ccCpy.newPromise()); - } - }); - ccCpy.writeAndFlush(data.retainedDuplicate(), promise); - } + cc.pipeline().lastContext().executor().execute(() -> { + ChannelPromise promise = ccCpy.newPromise(); + promise.addListener((ChannelFutureListener) future -> + ccCpy.writeAndFlush(data2.retainedDuplicate(), ccCpy.newPromise())); + ccCpy.writeAndFlush(data.retainedDuplicate(), promise); }); assertTrue(messageLatch.await(5, SECONDS)); @@ -583,19 +565,13 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = ccCpy.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Channel serverChannelCpy = serverChannelRef.get(); - serverChannelCpy.writeAndFlush(data2.retainedDuplicate(), serverChannelCpy.newPromise()); - } - }); - ccCpy.writeAndFlush(data.retainedDuplicate(), promise); - } + cc.pipeline().lastContext().executor().execute(() -> { + ChannelPromise promise = ccCpy.newPromise(); + promise.addListener((ChannelFutureListener) future -> { + Channel serverChannelCpy = serverChannelRef.get(); + serverChannelCpy.writeAndFlush(data2.retainedDuplicate(), serverChannelCpy.newPromise()); + }); + ccCpy.writeAndFlush(data.retainedDuplicate(), promise); }); assertTrue(messageLatch.await(5, SECONDS)); @@ -665,20 +641,14 @@ public class LocalChannelTest { final Channel ccCpy = cc; // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { - ChannelPromise promise = ccCpy.newPromise(); - promise.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - Channel serverChannelCpy = serverChannelRef.get(); - serverChannelCpy.writeAndFlush( - data2.retainedDuplicate(), serverChannelCpy.newPromise()); - } - }); - ccCpy.writeAndFlush(data.retainedDuplicate(), promise); - } + cc.pipeline().lastContext().executor().execute(() -> { + ChannelPromise promise = ccCpy.newPromise(); + promise.addListener((ChannelFutureListener) future -> { + Channel serverChannelCpy = serverChannelRef.get(); + serverChannelCpy.writeAndFlush( + data2.retainedDuplicate(), serverChannelCpy.newPromise()); + }); + ccCpy.writeAndFlush(data.retainedDuplicate(), promise); }); assertTrue(messageLatch.await(5, SECONDS)); @@ -747,47 +717,34 @@ public class LocalChannelTest { ccCpy.closeFuture().addListener(clientChannelCloseLatch); // Make sure a write operation is executed in the eventloop - cc.pipeline().lastContext().executor().execute(new Runnable() { - @Override - public void run() { + cc.pipeline().lastContext().executor().execute(() -> ccCpy.writeAndFlush(data.retainedDuplicate(), ccCpy.newPromise()) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - serverChannelCpy.eventLoop().execute(new Runnable() { - @Override - public void run() { - // The point of this test is to write while the peer is closed, so we should - // ensure the peer is actually closed before we write. - int waitCount = 0; - while (ccCpy.isOpen()) { - try { - Thread.sleep(50); - } catch (InterruptedException ignored) { - // ignored - } - if (++waitCount > 5) { - fail(); - } - } - serverChannelCpy.writeAndFlush(data2.retainedDuplicate(), - serverChannelCpy.newPromise()) - .addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (!future.isSuccess() && - future.cause() instanceof ClosedChannelException) { - writeFailLatch.countDown(); - } - } - }); - } - }); - ccCpy.close(); + .addListener((ChannelFutureListener) future -> { + serverChannelCpy.eventLoop().execute(() -> { + // The point of this test is to write while the peer is closed, so we should + // ensure the peer is actually closed before we write. + int waitCount = 0; + while (ccCpy.isOpen()) { + try { + Thread.sleep(50); + } catch (InterruptedException ignored) { + // ignored } - }); - } - }); + if (++waitCount > 5) { + fail(); + } + } + serverChannelCpy.writeAndFlush(data2.retainedDuplicate(), + serverChannelCpy.newPromise()) + .addListener((ChannelFutureListener) future1 -> { + if (!future1.isSuccess() && + future1.cause() instanceof ClosedChannelException) { + writeFailLatch.countDown(); + } + }); + }); + ccCpy.close(); + })); assertTrue(serverMessageLatch.await(5, SECONDS)); assertTrue(writeFailLatch.await(5, SECONDS)); @@ -952,12 +909,9 @@ public class LocalChannelTest { } private static void writeAndFlushReadOnSuccess(final ChannelHandlerContext ctx, Object msg) { - ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) { - if (future.isSuccess()) { - ctx.read(); - } + ctx.writeAndFlush(msg).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + ctx.read(); } }); } @@ -1182,12 +1136,9 @@ public class LocalChannelTest { if (!autoRead) { // The read will be scheduled 100ms in the future to ensure we not receive any // channelRead calls in the meantime. - ctx.executor().schedule(new Runnable() { - @Override - public void run() { - read = 0; - ctx.read(); - } + ctx.executor().schedule(() -> { + read = 0; + ctx.read(); }, 100, TimeUnit.MILLISECONDS); } else { read = 0; diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java index 31a95c9d76..ba5f8e8549 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest.java @@ -266,12 +266,9 @@ public class LocalTransportThreadModelTest { final int end = i + ELEMS_PER_ROUNDS; i = end; - ch.eventLoop().execute(new Runnable() { - @Override - public void run() { - for (int j = start; j < end; j ++) { - ch.pipeline().fireChannelRead(Integer.valueOf(j)); - } + ch.eventLoop().execute(() -> { + for (int j = start; j < end; j ++) { + ch.pipeline().fireChannelRead(Integer.valueOf(j)); } }); } @@ -304,14 +301,11 @@ public class LocalTransportThreadModelTest { final int end = i + ELEMS_PER_ROUNDS; i = end; - ch.pipeline().context(h6).executor().execute(new Runnable() { - @Override - public void run() { - for (int j = start; j < end; j ++) { - ch.write(Integer.valueOf(j)); - } - ch.flush(); + ch.pipeline().context(h6).executor().execute(() -> { + for (int j = start; j < end; j ++) { + ch.write(Integer.valueOf(j)); } + ch.flush(); }); } diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java index 10c3382001..28c87c7226 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest2.java @@ -83,12 +83,7 @@ public class LocalTransportThreadModelTest2 { return; } - localChannel.eventLoop().execute(new Runnable() { - @Override - public void run() { - close(localChannel, localRegistrationHandler); - } - }); + localChannel.eventLoop().execute(() -> close(localChannel, localRegistrationHandler)); // Wait until the connection is closed or the connection attempt fails. localChannel.closeFuture().awaitUninterruptibly(); diff --git a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java index e6d3845d81..0be7647195 100644 --- a/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java +++ b/transport/src/test/java/io/netty/channel/local/LocalTransportThreadModelTest3.java @@ -152,25 +152,22 @@ public class LocalTransportThreadModelTest3 { Throwable cause = new Throwable(); - Thread pipelineModifier = new Thread(new Runnable() { - @Override - public void run() { - Random random = new Random(); + Thread pipelineModifier = new Thread(() -> { + Random random = new Random(); - while (true) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - return; - } - if (!ch.isRegistered()) { - continue; - } - //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)]; - ChannelHandler handler = ch.pipeline().removeFirst(); - ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder", - UUID.randomUUID().toString(), handler); + while (true) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + return; } + if (!ch.isRegistered()) { + continue; + } + //EventForwardHandler forwardHandler = forwarders[random.nextInt(forwarders.length)]; + ChannelHandler handler = ch.pipeline().removeFirst(); + ch.pipeline().addBefore(groups[random.nextInt(groups.length)].next(), "recorder", + UUID.randomUUID().toString(), handler); } }); pipelineModifier.setDaemon(true); diff --git a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java index 34b681a291..1c5b5fc7f1 100644 --- a/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java +++ b/transport/src/test/java/io/netty/channel/nio/NioEventLoopTest.java @@ -23,7 +23,6 @@ import io.netty.channel.MultithreadEventLoopGroup; import io.netty.channel.SelectStrategy; import io.netty.channel.SelectStrategyFactory; import io.netty.channel.SingleThreadEventLoop; - import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.IntSupplier; @@ -38,7 +37,6 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.channels.spi.SelectorProvider; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -66,35 +64,15 @@ public class NioEventLoopTest extends AbstractEventLoopTest { Channel channel = new NioServerSocketChannel(loop, loop); channel.register().syncUninterruptibly(); - Selector selector = loop.submit(new Callable() { - @Override - public Selector call() throws Exception { - return nioHandler.unwrappedSelector(); - } - }).syncUninterruptibly().getNow(); + Selector selector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow(); - assertSame(selector, loop.submit(new Callable() { - @Override - public Selector call() throws Exception { - return nioHandler.unwrappedSelector(); - } - }).syncUninterruptibly().getNow()); + assertSame(selector, loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow()); assertTrue(selector.isOpen()); // Submit to the EventLoop so we are sure its really executed in a non-async manner. - loop.submit(new Runnable() { - @Override - public void run() { - nioHandler.rebuildSelector(); - } - }).syncUninterruptibly(); + loop.submit(nioHandler::rebuildSelector).syncUninterruptibly(); - Selector newSelector = loop.submit(new Callable() { - @Override - public Selector call() throws Exception { - return nioHandler.unwrappedSelector(); - } - }).syncUninterruptibly().getNow(); + Selector newSelector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow(); assertTrue(newSelector.isOpen()); assertNotSame(selector, newSelector); assertFalse(selector.isOpen()); @@ -110,11 +88,8 @@ public class NioEventLoopTest extends AbstractEventLoopTest { EventLoopGroup group = new MultithreadEventLoopGroup(1, NioHandler.newFactory()); final EventLoop el = group.next(); - Future future = el.schedule(new Runnable() { - @Override - public void run() { - // NOOP - } + Future future = el.schedule(() -> { + // NOOP }, Long.MAX_VALUE, TimeUnit.MILLISECONDS); assertFalse(future.awaitUninterruptibly(1000)); @@ -127,48 +102,25 @@ public class NioEventLoopTest extends AbstractEventLoopTest { final NioHandler nioHandler = (NioHandler) NioHandler.newFactory().newHandler(); EventLoop loop = new SingleThreadEventLoop(new DefaultThreadFactory("ioPool"), nioHandler); try { - Selector selector = loop.submit(new Callable() { - @Override - public Selector call() throws Exception { - return nioHandler.unwrappedSelector(); - } - }).syncUninterruptibly().getNow(); + Selector selector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow(); assertTrue(selector.isOpen()); - loop.submit(new Runnable() { - @Override - public void run() { - // Interrupt the thread which should not end-up in a busy spin and - // so the selector should not have been rebuild. - Thread.currentThread().interrupt(); - } + loop.submit(() -> { + // Interrupt the thread which should not end-up in a busy spin and + // so the selector should not have been rebuild. + Thread.currentThread().interrupt(); }).syncUninterruptibly(); assertTrue(selector.isOpen()); final CountDownLatch latch = new CountDownLatch(2); - loop.submit(new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }).syncUninterruptibly(); + loop.submit(latch::countDown).syncUninterruptibly(); - loop.schedule(new Runnable() { - @Override - public void run() { - latch.countDown(); - } - }, 2, TimeUnit.SECONDS).syncUninterruptibly(); + loop.schedule(latch::countDown, 2, TimeUnit.SECONDS).syncUninterruptibly(); latch.await(); - assertSame(selector, loop.submit(new Callable() { - @Override - public Selector call() throws Exception { - return nioHandler.unwrappedSelector(); - } - }).syncUninterruptibly().getNow()); + assertSame(selector, loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow()); assertTrue(selector.isOpen()); } finally { loop.shutdownGracefully(); @@ -190,9 +142,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest { final CountDownLatch latch = new CountDownLatch(1); - loop.execute(new Runnable() { - @Override - public void run() { + loop.execute(() -> nioHandler.register(selectableChannel, SelectionKey.OP_CONNECT, new NioTask() { @Override public void channelReady(SocketChannel ch, SelectionKey key) { @@ -202,9 +152,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest { @Override public void channelUnregistered(SocketChannel ch, Throwable cause) { } - }); - } - }); + })); latch.await(); @@ -219,27 +167,21 @@ public class NioEventLoopTest extends AbstractEventLoopTest { @Test public void testTaskRemovalOnShutdownThrowsNoUnsupportedOperationException() throws Exception { final AtomicReference error = new AtomicReference<>(); - final Runnable task = new Runnable() { - @Override - public void run() { - // NOOP - } + final Runnable task = () -> { + // NOOP }; // Just run often enough to trigger it normally. for (int i = 0; i < 1000; i++) { EventLoopGroup group = new MultithreadEventLoopGroup(1, NioHandler.newFactory()); final EventLoop loop = group.next(); - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - for (;;) { - loop.execute(task); - } - } catch (Throwable cause) { - error.set(cause); + Thread t = new Thread(() -> { + try { + for (;;) { + loop.execute(task); } + } catch (Throwable cause) { + error.set(cause); } }); t.start(); @@ -255,24 +197,19 @@ public class NioEventLoopTest extends AbstractEventLoopTest { public void testRebuildSelectorOnIOException() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch strategyLatch = new CountDownLatch(1); - SelectStrategyFactory selectStrategyFactory = new SelectStrategyFactory() { + SelectStrategyFactory selectStrategyFactory = () -> new SelectStrategy() { + + private boolean thrown; + @Override - public SelectStrategy newSelectStrategy() { - return new SelectStrategy() { - - private boolean thrown; - - @Override - public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { - strategyLatch.await(); - if (!thrown) { - thrown = true; - throw new IOException(); - } - latch.countDown(); - return -1; - } - }; + public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { + strategyLatch.await(); + if (!thrown) { + thrown = true; + throw new IOException(); + } + latch.countDown(); + return -1; } }; @@ -289,12 +226,7 @@ public class NioEventLoopTest extends AbstractEventLoopTest { latch.await(); - Selector newSelector = loop.submit(new Callable() { - @Override - public Selector call() throws Exception { - return nioHandler.unwrappedSelector(); - } - }).syncUninterruptibly().getNow(); + Selector newSelector = loop.submit(nioHandler::unwrappedSelector).syncUninterruptibly().getNow(); assertTrue(newSelector.isOpen()); assertNotSame(selector, newSelector); assertFalse(selector.isOpen()); diff --git a/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelTest.java b/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelTest.java index a0ca7d5b46..c2739561df 100644 --- a/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelTest.java +++ b/transport/src/test/java/io/netty/channel/socket/nio/NioSocketChannelTest.java @@ -132,12 +132,9 @@ public class NioSocketChannelTest extends AbstractNioChannelTest { + // This message must be flushed + ctx.writeAndFlush(Unpooled.wrappedBuffer(new byte[]{'c'})); }); ctx.flush(); } @@ -197,12 +194,9 @@ public class NioSocketChannelTest extends AbstractNioChannelTest { + Channel channel = cf.channel(); + channel.register(); }); } });