From 7ddc93bed87f6b1efb062df480566ff590a8b34e Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Thu, 31 May 2012 14:54:48 -0700 Subject: [PATCH] Ported IdleStateHandler / Forward-ported the UptimeClient example - Add ChannelHandlerContext.eventLoop() for convenience - Bootstrap and ServerBootstrap handles channel initialization failure better - More strict checks for missing @Sharable annotation - A handler without @Sharable annotation cannot be added more than once now. --- .../handler/codec/StreamToMessageDecoder.java | 2 +- .../proxy/HexDumpProxyFrontendHandler.java | 2 +- .../io/netty/example/uptime/UptimeClient.java | 10 +- .../example/uptime/UptimeClientHandler.java | 37 ++- .../timeout/DefaultIdleStateEvent.java | 79 ----- .../timeout/IdleStateAwareChannelHandler.java | 46 --- .../IdleStateAwareChannelUpstreamHandler.java | 46 --- .../netty/handler/timeout/IdleStateEvent.java | 49 ++- .../handler/timeout/IdleStateHandler.java | 290 +++++++----------- .../java/io/netty/bootstrap/Bootstrap.java | 34 +- .../io/netty/bootstrap/ServerBootstrap.java | 21 +- .../netty/channel/AbstractChannelHandler.java | 31 ++ .../netty/channel/ChannelHandlerAdapter.java | 23 +- .../netty/channel/ChannelHandlerContext.java | 1 + .../channel/ChannelInboundHandlerAdapter.java | 22 +- .../io/netty/channel/ChannelInitializer.java | 2 +- .../ChannelOutboundHandlerAdapter.java | 22 +- .../netty/channel/CombinedChannelHandler.java | 4 +- .../netty/channel/DefaultChannelPipeline.java | 18 +- 19 files changed, 302 insertions(+), 437 deletions(-) delete mode 100644 handler/src/main/java/io/netty/handler/timeout/DefaultIdleStateEvent.java delete mode 100644 handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelHandler.java delete mode 100644 handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelUpstreamHandler.java create mode 100644 transport/src/main/java/io/netty/channel/AbstractChannelHandler.java diff --git a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java index e741da0932..101ee472a0 100644 --- a/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java +++ b/codec/src/main/java/io/netty/handler/codec/StreamToMessageDecoder.java @@ -100,7 +100,7 @@ public abstract class StreamToMessageDecoder extends ChannelInboundHandlerAda * inbound buffer. */ public void replace(String newHandlerName, ChannelInboundHandler newHandler) { - if (!ctx.channel().eventLoop().inEventLoop()) { + if (!ctx.eventLoop().inEventLoop()) { throw new IllegalStateException("not in event loop"); } diff --git a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java index 0ff473af2d..c290204296 100644 --- a/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java +++ b/example/src/main/java/io/netty/example/proxy/HexDumpProxyFrontendHandler.java @@ -46,7 +46,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdap // Start the connection attempt. Bootstrap b = new Bootstrap(); - b.eventLoop(ctx.channel().eventLoop()) + b.eventLoop(ctx.eventLoop()) .channel(new NioSocketChannel()) .remoteAddress(remoteHost, remotePort) .initializer(new ChannelInitializer() { diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClient.java b/example/src/main/java/io/netty/example/uptime/UptimeClient.java index aa6162ac24..a041477692 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClient.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClient.java @@ -21,7 +21,7 @@ import io.netty.channel.EventLoop; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioEventLoop; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.IdleStateHandler; /** @@ -40,6 +40,10 @@ public class UptimeClient { private final String host; private final int port; + // A single handler will be reused across multiple connection attempts to keep when the last + // successful connection attempt was. + private final UptimeClientHandler handler = new UptimeClientHandler(this); + public UptimeClient(String host, int port) { this.host = host; this.port = port; @@ -60,9 +64,7 @@ public class UptimeClient { .initializer(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast( - new ReadTimeoutHandler(READ_TIMEOUT), - new UptimeClientHandler(UptimeClient.this)); + ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0), handler); } }); diff --git a/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java b/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java index bd80106ddf..e52e3a7e7a 100644 --- a/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java +++ b/example/src/main/java/io/netty/example/uptime/UptimeClientHandler.java @@ -16,10 +16,13 @@ package io.netty.example.uptime; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ChannelBuffer; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelInboundStreamHandlerAdapter; import io.netty.channel.EventLoop; -import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; import java.net.ConnectException; import java.util.concurrent.TimeUnit; @@ -28,6 +31,7 @@ import java.util.concurrent.TimeUnit; * Keep reconnecting to the server while printing out the current uptime and * connection attempt status. */ +@Sharable public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter { private final UptimeClient client; @@ -45,6 +49,26 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter { println("Connected to: " + ctx.channel().remoteAddress()); } + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx, ChannelBuffer in) throws Exception { + // Discard received data + in.clear(); + } + + @Override + public void userEventTriggered(ChannelInboundHandlerContext ctx, Object evt) throws Exception { + if (!(evt instanceof IdleStateEvent)) { + return; + } + + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.READER_IDLE) { + // The connection was OK but there was no traffic for last period. + println("Disconnecting due to no inbound traffic"); + ctx.close(); + } + } + @Override public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { println("Disconnected from: " + ctx.channel().remoteAddress()); @@ -55,7 +79,7 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter { throws Exception { println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + "s"); - final EventLoop loop = ctx.channel().eventLoop(); + final EventLoop loop = ctx.eventLoop(); loop.schedule(new Runnable() { @Override public void run() { @@ -71,13 +95,8 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter { startTime = -1; println("Failed to connect: " + cause.getMessage()); } - if (cause instanceof ReadTimeoutException) { - // The connection was OK but there was no traffic for last period. - println("Disconnecting due to no inbound traffic"); - } else { - cause.printStackTrace(); - } - ctx.channel().close(); + cause.printStackTrace(); + ctx.close(); } void println(String msg) { diff --git a/handler/src/main/java/io/netty/handler/timeout/DefaultIdleStateEvent.java b/handler/src/main/java/io/netty/handler/timeout/DefaultIdleStateEvent.java deleted file mode 100644 index f1fa029cd3..0000000000 --- a/handler/src/main/java/io/netty/handler/timeout/DefaultIdleStateEvent.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.timeout; - -import static io.netty.channel.Channels.*; - -import java.text.DateFormat; -import java.util.Date; -import java.util.Locale; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; - -/** - * The default {@link IdleStateEvent} implementation. - */ -public class DefaultIdleStateEvent implements IdleStateEvent { - - private final Channel channel; - private final IdleState state; - private final long lastActivityTimeMillis; - - /** - * Creates a new instance. - */ - public DefaultIdleStateEvent( - Channel channel, IdleState state, long lastActivityTimeMillis) { - if (channel == null) { - throw new NullPointerException("channel"); - } - if (state == null) { - throw new NullPointerException("state"); - } - this.channel = channel; - this.state = state; - this.lastActivityTimeMillis = lastActivityTimeMillis; - } - - @Override - public Channel getChannel() { - return channel; - } - - @Override - public ChannelFuture getFuture() { - return succeededFuture(getChannel()); - } - - @Override - public IdleState getState() { - return state; - } - - @Override - public long getLastActivityTimeMillis() { - return lastActivityTimeMillis; - } - - @Override - public String toString() { - return getChannel().toString() + ' ' + getState() + " since " + - DateFormat.getDateTimeInstance( - DateFormat.SHORT, DateFormat.SHORT, Locale.US).format( - new Date(getLastActivityTimeMillis())); - } -} diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelHandler.java deleted file mode 100644 index f4c3c18930..0000000000 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.timeout; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelHandler; - -/** - * An extended {@link SimpleChannelHandler} that adds the handler method for - * an {@link IdleStateEvent}. - * @apiviz.uses io.netty.handler.timeout.IdleStateEvent - */ -public class IdleStateAwareChannelHandler extends SimpleChannelHandler { - - @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - if (e instanceof IdleStateEvent) { - channelIdle(ctx, (IdleStateEvent) e); - } else { - super.handleUpstream(ctx, e); - } - } - - /** - * Invoked when a {@link Channel} has been idle for a while. - */ - public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { - ctx.sendUpstream(e); - } -} diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelUpstreamHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelUpstreamHandler.java deleted file mode 100644 index 48b91d8f34..0000000000 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateAwareChannelUpstreamHandler.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2011 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.handler.timeout; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.SimpleChannelUpstreamHandler; - -/** - * An extended {@link SimpleChannelUpstreamHandler} that adds the handler method - * for an {@link IdleStateEvent}. - * @apiviz.uses io.netty.handler.timeout.IdleStateEvent - */ -public class IdleStateAwareChannelUpstreamHandler extends SimpleChannelUpstreamHandler { - - @Override - public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) - throws Exception { - if (e instanceof IdleStateEvent) { - channelIdle(ctx, (IdleStateEvent) e); - } else { - super.handleUpstream(ctx, e); - } - } - - /** - * Invoked when a {@link Channel} has been idle for a while. - */ - public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception { - ctx.sendUpstream(e); - } -} diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateEvent.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateEvent.java index 5175fd391f..d63616d408 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateEvent.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateEvent.java @@ -16,22 +16,53 @@ package io.netty.handler.timeout; import io.netty.channel.Channel; -import io.netty.channel.ChannelEvent; /** - * A {@link ChannelEvent} that is triggered when a {@link Channel} has been idle - * for a while. + * A user event triggered by {@link IdleStateHandler} when a {@link Channel} is idle. + * * @apiviz.landmark * @apiviz.has io.netty.handler.timeout.IdleState oneway - - */ -public interface IdleStateEvent extends ChannelEvent { +public class IdleStateEvent { + + private final IdleState state; + private final int count; + private final long durationMillis; + + public IdleStateEvent(IdleState state, int count, long durationMillis) { + if (state == null) { + throw new NullPointerException("state"); + } + if (count < 0) { + throw new IllegalStateException(String.format("count: %d (expected: >= 0)", count)); + } + if (durationMillis < 0) { + throw new IllegalStateException(String.format( + "durationMillis: %d (expected: >= 0)", durationMillis)); + } + + this.state = state; + this.count = count; + this.durationMillis = durationMillis; + } + /** * Returns the detailed idle state. */ - IdleState getState(); + public IdleState state() { + return state; + } - /** - * Returns the last time when I/O occurred in milliseconds. - */ - long getLastActivityTimeMillis(); + public int count() { + return count; + } + + public long durationMillis() { + return durationMillis; + } + + @Override + public String toString() { + return state + "(" + count + ", " + durationMillis + "ms)"; + } } diff --git a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java index ad595e775f..1d5bb9eb0b 100644 --- a/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java +++ b/handler/src/main/java/io/netty/handler/timeout/IdleStateHandler.java @@ -15,28 +15,25 @@ */ package io.netty.handler.timeout; -import static io.netty.channel.Channels.*; - -import java.util.concurrent.TimeUnit; - import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelBufferHolder; +import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerContext; +import io.netty.channel.ChannelOutboundHandlerContext; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; -import io.netty.channel.ChannelStateEvent; -import io.netty.channel.Channels; -import io.netty.channel.LifeCycleAwareChannelHandler; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelUpstreamHandler; -import io.netty.channel.WriteCompletionEvent; -import io.netty.util.ExternalResourceReleasable; +import io.netty.channel.EventLoop; import io.netty.util.HashedWheelTimer; -import io.netty.util.Timeout; import io.netty.util.Timer; -import io.netty.util.TimerTask; + +import java.nio.channels.Channels; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed @@ -119,23 +116,28 @@ import io.netty.util.TimerTask; * @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.has io.netty.handler.timeout.IdleStateEvent oneway - - triggers */ -@Sharable -public class IdleStateHandler extends SimpleChannelUpstreamHandler - implements LifeCycleAwareChannelHandler, - ExternalResourceReleasable { +public class IdleStateHandler extends ChannelHandlerAdapter { - final Timer timer; + private final long readerIdleTimeMillis; + private final long writerIdleTimeMillis; + private final long allIdleTimeMillis; - final long readerIdleTimeMillis; - final long writerIdleTimeMillis; - final long allIdleTimeMillis; + volatile ScheduledFuture readerIdleTimeout; + volatile long lastReadTime; + int readerIdleCount; + + volatile ScheduledFuture writerIdleTimeout; + volatile long lastWriteTime; + int writerIdleCount; + + volatile ScheduledFuture allIdleTimeout; + int allIdleCount; + + volatile boolean destroyed; /** * Creates a new instance. * - * @param timer - * the {@link Timer} that is used to trigger the scheduled event. - * The recommended {@link Timer} implementation is {@link HashedWheelTimer}. * @param readerIdleTimeSeconds * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} * will be triggered when no read was performed for the specified @@ -150,22 +152,17 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler * the specified period of time. Specify {@code 0} to disable. */ public IdleStateHandler( - Timer timer, int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { - this(timer, - readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, + this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS); } /** * Creates a new instance. * - * @param timer - * the {@link Timer} that is used to trigger the scheduled event. - * The recommended {@link Timer} implementation is {@link HashedWheelTimer}. * @param readerIdleTime * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} * will be triggered when no read was performed for the specified @@ -183,18 +180,13 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler * {@code writeIdleTime}, and {@code allIdleTime} */ public IdleStateHandler( - Timer timer, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { - if (timer == null) { - throw new NullPointerException("timer"); - } if (unit == null) { throw new NullPointerException("unit"); } - this.timer = timer; if (readerIdleTime <= 0) { readerIdleTimeMillis = 0; } else { @@ -212,148 +204,118 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler } } - /** - * Stops the {@link Timer} which was specified in the constructor of this - * handler. You should not call this method if the {@link Timer} is in use - * by other objects. - */ @Override - public void releaseExternalResources() { - timer.stop(); + public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.inboundBypassBuffer(ctx); + } + + @Override + public ChannelBufferHolder newOutboundBuffer(ChannelOutboundHandlerContext ctx) throws Exception { + return ChannelBufferHolders.outboundBypassBuffer(ctx); } @Override public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - if (ctx.pipeline().isAttached()) { - // channelOpen event has been fired already, which means - // this.channelOpen() will not be invoked. - // We have to initialize here instead. + if (ctx.channel().isActive()) { + // channelActvie() event has been fired already, which means this.channelActive() will + // not be invoked. We have to initialize here instead. initialize(ctx); } else { - // channelOpen event has not been fired yet. - // this.channelOpen() will be invoked and initialization will occur there. + // channelActive() event has not been fired yet. this.channelOpen() will be invoked + // and initialization will occur there. } } - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - @Override public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - destroy(ctx); + destroy(); } @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - // NOOP - } - - @Override - public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { + public void channelActive(ChannelInboundHandlerContext ctx) throws Exception { // This method will be invoked only if this handler was added - // before channelOpen event is fired. If a user adds this handler - // after the channelOpen event, initialize() will be called by beforeAdd(). + // before channelActive() event is fired. If a user adds this handler + // after the channelActive() event, initialize() will be called by beforeAdd(). initialize(ctx); - ctx.sendUpstream(e); + super.channelActive(ctx); } @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception { - destroy(ctx); - ctx.sendUpstream(e); + public void channelInactive(ChannelInboundHandlerContext ctx) throws Exception { + destroy(); + super.channelInactive(ctx); + } + + + @Override + public void inboundBufferUpdated(ChannelInboundHandlerContext ctx) throws Exception { + lastReadTime = System.currentTimeMillis(); + readerIdleCount = allIdleCount = 0; + ctx.fireInboundBufferUpdated(); } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - State state = (State) ctx.getAttachment(); - state.lastReadTime = System.currentTimeMillis(); - ctx.sendUpstream(e); - } + public void flush(final ChannelOutboundHandlerContext ctx, ChannelFuture future) throws Exception { + future.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + lastWriteTime = System.currentTimeMillis(); + writerIdleCount = allIdleCount = 0; + } + }); - @Override - public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) - throws Exception { - if (e.getWrittenAmount() > 0) { - State state = (State) ctx.getAttachment(); - state.lastWriteTime = System.currentTimeMillis(); - } - ctx.sendUpstream(e); + super.flush(ctx, future); } private void initialize(ChannelHandlerContext ctx) { - State state = state(ctx); - // Avoid the case where destroy() is called before scheduling timeouts. // See: https://github.com/netty/netty/issues/143 - if (state.destroyed) { + if (destroyed) { return; } - state.lastReadTime = state.lastWriteTime = System.currentTimeMillis(); + EventLoop loop = ctx.eventLoop(); + + lastReadTime = lastWriteTime = System.currentTimeMillis(); if (readerIdleTimeMillis > 0) { - state.readerIdleTimeout = timer.newTimeout( + readerIdleTimeout = loop.schedule( new ReaderIdleTimeoutTask(ctx), readerIdleTimeMillis, TimeUnit.MILLISECONDS); } if (writerIdleTimeMillis > 0) { - state.writerIdleTimeout = timer.newTimeout( + writerIdleTimeout = loop.schedule( new WriterIdleTimeoutTask(ctx), writerIdleTimeMillis, TimeUnit.MILLISECONDS); } if (allIdleTimeMillis > 0) { - state.allIdleTimeout = timer.newTimeout( + allIdleTimeout = loop.schedule( new AllIdleTimeoutTask(ctx), allIdleTimeMillis, TimeUnit.MILLISECONDS); } } - private void destroy(ChannelHandlerContext ctx) { - State state; + private void destroy() { + destroyed = true; - synchronized (ctx) { - state = state(ctx); - state.destroyed = true; + if (readerIdleTimeout != null) { + readerIdleTimeout.cancel(false); + readerIdleTimeout = null; } - - if (state.readerIdleTimeout != null) { - state.readerIdleTimeout.cancel(); - state.readerIdleTimeout = null; + if (writerIdleTimeout != null) { + writerIdleTimeout.cancel(false); + writerIdleTimeout = null; } - if (state.writerIdleTimeout != null) { - state.writerIdleTimeout.cancel(); - state.writerIdleTimeout = null; - } - if (state.allIdleTimeout != null) { - state.allIdleTimeout.cancel(); - state.allIdleTimeout = null; + if (allIdleTimeout != null) { + allIdleTimeout.cancel(false); + allIdleTimeout = null; } } - - private State state(ChannelHandlerContext ctx) { - State state; - synchronized (ctx) { - // FIXME: It could have been better if there is setAttachmentIfAbsent(). - state = (State) ctx.getAttachment(); - if (state != null) { - return state; - } - state = new State(); - ctx.setAttachment(state); - } - return state; + + protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { + ctx.fireUserEventTriggered(evt); } - protected void channelIdle( - ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception { - ctx.sendUpstream(new DefaultIdleStateEvent(ctx.channel(), state, lastActivityTimeMillis)); - } - - private final class ReaderIdleTimeoutTask implements TimerTask { + private final class ReaderIdleTimeoutTask implements Runnable { private final ChannelHandlerContext ctx; @@ -362,34 +324,33 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler } @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || !ctx.channel().isOpen()) { + public void run() { + if (!ctx.channel().isOpen()) { return; } - State state = (State) ctx.getAttachment(); long currentTime = System.currentTimeMillis(); - long lastReadTime = state.lastReadTime; + long lastReadTime = IdleStateHandler.this.lastReadTime; long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime); if (nextDelay <= 0) { // Reader is idle - set a new timeout and notify the callback. - state.readerIdleTimeout = - timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); + readerIdleTimeout = + ctx.eventLoop().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); try { - channelIdle(ctx, IdleState.READER_IDLE, lastReadTime); + channelIdle(ctx, new IdleStateEvent( + IdleState.READER_IDLE, readerIdleCount ++, currentTime - lastReadTime)); } catch (Throwable t) { - fireExceptionCaught(ctx, t); + ctx.fireExceptionCaught(t); } } else { // Read occurred before the timeout - set a new timeout with shorter delay. - state.readerIdleTimeout = - timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); + readerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } - private final class WriterIdleTimeoutTask implements TimerTask { + private final class WriterIdleTimeoutTask implements Runnable { private final ChannelHandlerContext ctx; @@ -398,33 +359,32 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler } @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || !ctx.channel().isOpen()) { + public void run() { + if (!ctx.channel().isOpen()) { return; } - State state = (State) ctx.getAttachment(); long currentTime = System.currentTimeMillis(); - long lastWriteTime = state.lastWriteTime; + long lastWriteTime = IdleStateHandler.this.lastWriteTime; long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime); if (nextDelay <= 0) { // Writer is idle - set a new timeout and notify the callback. - state.writerIdleTimeout = - timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS); + writerIdleTimeout = ctx.eventLoop().schedule( + this, writerIdleTimeMillis, TimeUnit.MILLISECONDS); try { - channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime); + channelIdle(ctx, new IdleStateEvent( + IdleState.WRITER_IDLE, writerIdleCount ++, currentTime - lastWriteTime)); } catch (Throwable t) { - fireExceptionCaught(ctx, t); + ctx.fireExceptionCaught(t); } } else { // Write occurred before the timeout - set a new timeout with shorter delay. - state.writerIdleTimeout = - timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); + writerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } - private final class AllIdleTimeoutTask implements TimerTask { + private final class AllIdleTimeoutTask implements Runnable { private final ChannelHandlerContext ctx; @@ -433,46 +393,30 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler } @Override - public void run(Timeout timeout) throws Exception { - if (timeout.isCancelled() || !ctx.channel().isOpen()) { + public void run() { + if (!ctx.channel().isOpen()) { return; } - State state = (State) ctx.getAttachment(); long currentTime = System.currentTimeMillis(); - long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime); + long lastIoTime = Math.max(lastReadTime, lastWriteTime); long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime); if (nextDelay <= 0) { // Both reader and writer are idle - set a new timeout and // notify the callback. - state.allIdleTimeout = - timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS); + allIdleTimeout = ctx.eventLoop().schedule( + this, allIdleTimeMillis, TimeUnit.MILLISECONDS); try { - channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime); + channelIdle(ctx, new IdleStateEvent( + IdleState.ALL_IDLE, allIdleCount ++, currentTime - lastIoTime)); } catch (Throwable t) { - fireExceptionCaught(ctx, t); + ctx.fireExceptionCaught(t); } } else { // Either read or write occurred before the timeout - set a new // timeout with shorter delay. - state.allIdleTimeout = - timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); + allIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS); } } } - - private static final class State { - State() { - } - - volatile Timeout readerIdleTimeout; - volatile long lastReadTime; - - volatile Timeout writerIdleTimeout; - volatile long lastWriteTime; - - volatile Timeout allIdleTimeout; - - volatile boolean destroyed; - } } diff --git a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java index 63af450d10..f11b3753ef 100644 --- a/transport/src/main/java/io/netty/bootstrap/Bootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/Bootstrap.java @@ -1,6 +1,7 @@ package io.netty.bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -112,7 +113,7 @@ public class Bootstrap { } public ChannelFuture bind(ChannelFuture future) { - validate(); + validate(future); if (localAddress == null) { throw new IllegalStateException("localAddress not set"); } @@ -124,6 +125,10 @@ public class Bootstrap { return future; } + if (!ensureOpen(future)) { + return future; + } + return channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } @@ -133,7 +138,7 @@ public class Bootstrap { } public ChannelFuture connect(ChannelFuture future) { - validate(); + validate(future); if (remoteAddress == null) { throw new IllegalStateException("remoteAddress not set"); } @@ -145,6 +150,10 @@ public class Bootstrap { return future; } + if (!ensureOpen(future)) { + return future; + } + if (localAddress == null) { channel.connect(remoteAddress, future); } else { @@ -180,6 +189,16 @@ public class Bootstrap { eventLoop.register(channel).syncUninterruptibly(); } + private static boolean ensureOpen(ChannelFuture future) { + if (!future.channel().isOpen()) { + // Registration was successful but the channel was closed due to some failure in + // initializer. + future.setFailure(new ChannelException("initialization failure")); + return false; + } + return true; + } + public void shutdown() { if (eventLoop != null) { eventLoop.shutdown(); @@ -197,4 +216,15 @@ public class Bootstrap { throw new IllegalStateException("initializer not set"); } } + + private void validate(ChannelFuture future) { + if (future == null) { + throw new NullPointerException("future"); + } + + if (future.channel() != channel) { + throw new IllegalArgumentException("future.channel() must be the same channel."); + } + validate(); + } } diff --git a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java index c06ddd939f..35a43c6fe3 100644 --- a/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java +++ b/transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java @@ -3,6 +3,7 @@ package io.netty.bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolders; +import io.netty.channel.ChannelException; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; @@ -136,7 +137,7 @@ public class ServerBootstrap { } public ChannelFuture bind(ChannelFuture future) { - validate(); + validate(future); if (channel.isActive()) { future.setFailure(new IllegalStateException("channel already bound: " + channel)); return future; @@ -162,6 +163,13 @@ public class ServerBootstrap { return future; } + if (!channel.isOpen()) { + // Registration was successful but the channel was closed due to some failure in + // initializer. + future.setFailure(new ChannelException("initialization failure")); + return future; + } + channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); return future; @@ -196,6 +204,17 @@ public class ServerBootstrap { } } + private void validate(ChannelFuture future) { + if (future == null) { + throw new NullPointerException("future"); + } + + if (future.channel() != channel) { + throw new IllegalArgumentException("future.channel() must be the same channel."); + } + validate(); + } + private class Acceptor extends ChannelInboundHandlerAdapter { @Override public ChannelBufferHolder newInboundBuffer(ChannelInboundHandlerContext ctx) { diff --git a/transport/src/main/java/io/netty/channel/AbstractChannelHandler.java b/transport/src/main/java/io/netty/channel/AbstractChannelHandler.java new file mode 100644 index 0000000000..1f97609f43 --- /dev/null +++ b/transport/src/main/java/io/netty/channel/AbstractChannelHandler.java @@ -0,0 +1,31 @@ +package io.netty.channel; + +public class AbstractChannelHandler implements ChannelHandler { + + // Not using volatile because it's used only for a sanity check. + boolean added; + + final boolean isSharable() { + return getClass().isAnnotationPresent(Sharable.class); + } + + @Override + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } +} diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java index c5e51e4ce5..fa17fdddb9 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerAdapter.java @@ -2,27 +2,8 @@ package io.netty.channel; import java.net.SocketAddress; -public abstract class ChannelHandlerAdapter implements ChannelInboundHandler, ChannelOutboundHandler { - - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } +public abstract class ChannelHandlerAdapter extends AbstractChannelHandler + implements ChannelInboundHandler, ChannelOutboundHandler { @Override public void channelRegistered(ChannelInboundHandlerContext ctx) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java index 16d7de2b1a..f71cca4ca8 100644 --- a/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/ChannelHandlerContext.java @@ -129,6 +129,7 @@ public interface ChannelHandlerContext ChannelInboundInvoker, ChannelOutboundInvoker { Channel channel(); ChannelPipeline pipeline(); + EventLoop eventLoop(); String name(); ChannelHandler handler(); diff --git a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java index c0387e9f3d..dffb78a385 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelInboundHandlerAdapter.java @@ -4,26 +4,8 @@ import io.netty.buffer.ChannelBuffer; import java.util.Queue; -public abstract class ChannelInboundHandlerAdapter implements ChannelInboundHandler { - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } +public abstract class ChannelInboundHandlerAdapter extends AbstractChannelHandler + implements ChannelInboundHandler { @Override public void channelRegistered(ChannelInboundHandlerContext ctx) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/ChannelInitializer.java b/transport/src/main/java/io/netty/channel/ChannelInitializer.java index 66cbf68e4d..fbeea75e1e 100644 --- a/transport/src/main/java/io/netty/channel/ChannelInitializer.java +++ b/transport/src/main/java/io/netty/channel/ChannelInitializer.java @@ -45,7 +45,7 @@ public abstract class ChannelInitializer extends ChannelInbou // inserted a handler before the initializer using pipeline.addFirst(). ctx.pipeline().fireChannelRegistered(); } catch (Throwable t) { - logger.warn("Failed to initialize a channel. Closing: " + ctx.channel()); + logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t); ctx.close(); } } diff --git a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java index 784721364b..ba00a2a9c1 100644 --- a/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java +++ b/transport/src/main/java/io/netty/channel/ChannelOutboundHandlerAdapter.java @@ -5,26 +5,8 @@ import io.netty.buffer.ChannelBuffer; import java.net.SocketAddress; import java.util.Queue; -public abstract class ChannelOutboundHandlerAdapter implements ChannelOutboundHandler { - @Override - public void beforeAdd(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void afterAdd(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void beforeRemove(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } - - @Override - public void afterRemove(ChannelHandlerContext ctx) throws Exception { - // Do nothing by default. - } +public abstract class ChannelOutboundHandlerAdapter extends AbstractChannelHandler + implements ChannelOutboundHandler { @Override public void bind(ChannelOutboundHandlerContext ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { diff --git a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java index 00f4ee55ae..548a0e160b 100644 --- a/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java +++ b/transport/src/main/java/io/netty/channel/CombinedChannelHandler.java @@ -2,8 +2,8 @@ package io.netty.channel; import java.net.SocketAddress; -public class CombinedChannelHandler implements ChannelInboundHandler, - ChannelOutboundHandler { +public class CombinedChannelHandler extends AbstractChannelHandler + implements ChannelInboundHandler, ChannelOutboundHandler { private ChannelOutboundHandler out; private ChannelInboundHandler in; diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index d0425ac116..076da89e99 100644 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -364,11 +364,20 @@ public class DefaultChannelPipeline implements ChannelPipeline { } private static void callBeforeAdd(ChannelHandlerContext ctx) { + ChannelHandler handler = ctx.handler(); + if (handler instanceof AbstractChannelHandler) { + AbstractChannelHandler h = (AbstractChannelHandler) handler; + if (!h.isSharable() && h.added) { + throw new ChannelHandlerLifeCycleException( + "Only a @Sharable handler can be added or removed multiple times."); + } + h.added = true; + } try { - ctx.handler().beforeAdd(ctx); + handler.beforeAdd(ctx); } catch (Throwable t) { throw new ChannelHandlerLifeCycleException( - ctx.handler().getClass().getName() + + handler.getClass().getName() + ".beforeAdd() has thrown an exception; not adding.", t); } } @@ -1150,6 +1159,11 @@ public class DefaultChannelPipeline implements ChannelPipeline { return DefaultChannelPipeline.this; } + @Override + public EventLoop eventLoop() { + return channel().eventLoop(); + } + @Override public ChannelHandler handler() { return handler;