Ported IdleStateHandler / Forward-ported the UptimeClient example

- Add ChannelHandlerContext.eventLoop() for convenience
- Bootstrap and ServerBootstrap handles channel initialization failure
  better
- More strict checks for missing @Sharable annotation
  - A handler without @Sharable annotation cannot be added more than
    once now.
This commit is contained in:
Trustin Lee 2012-05-31 14:54:48 -07:00
parent 77274ae743
commit 7ddc93bed8
19 changed files with 302 additions and 437 deletions

View File

@ -100,7 +100,7 @@ public abstract class StreamToMessageDecoder<O> extends ChannelInboundHandlerAda
* inbound buffer. * inbound buffer.
*/ */
public void replace(String newHandlerName, ChannelInboundHandler<Byte> newHandler) { public void replace(String newHandlerName, ChannelInboundHandler<Byte> newHandler) {
if (!ctx.channel().eventLoop().inEventLoop()) { if (!ctx.eventLoop().inEventLoop()) {
throw new IllegalStateException("not in event loop"); throw new IllegalStateException("not in event loop");
} }

View File

@ -46,7 +46,7 @@ public class HexDumpProxyFrontendHandler extends ChannelInboundStreamHandlerAdap
// Start the connection attempt. // Start the connection attempt.
Bootstrap b = new Bootstrap(); Bootstrap b = new Bootstrap();
b.eventLoop(ctx.channel().eventLoop()) b.eventLoop(ctx.eventLoop())
.channel(new NioSocketChannel()) .channel(new NioSocketChannel())
.remoteAddress(remoteHost, remotePort) .remoteAddress(remoteHost, remotePort)
.initializer(new ChannelInitializer<SocketChannel>() { .initializer(new ChannelInitializer<SocketChannel>() {

View File

@ -21,7 +21,7 @@ import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioEventLoop; import io.netty.channel.socket.nio.NioEventLoop;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.IdleStateHandler;
/** /**
@ -40,6 +40,10 @@ public class UptimeClient {
private final String host; private final String host;
private final int port; private final int port;
// A single handler will be reused across multiple connection attempts to keep when the last
// successful connection attempt was.
private final UptimeClientHandler handler = new UptimeClientHandler(this);
public UptimeClient(String host, int port) { public UptimeClient(String host, int port) {
this.host = host; this.host = host;
this.port = port; this.port = port;
@ -60,9 +64,7 @@ public class UptimeClient {
.initializer(new ChannelInitializer<SocketChannel>() { .initializer(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( ch.pipeline().addLast(new IdleStateHandler(READ_TIMEOUT, 0, 0), handler);
new ReadTimeoutHandler(READ_TIMEOUT),
new UptimeClientHandler(UptimeClient.this));
} }
}); });

View File

@ -16,10 +16,13 @@
package io.netty.example.uptime; package io.netty.example.uptime;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelInboundHandlerContext; import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelInboundStreamHandlerAdapter; import io.netty.channel.ChannelInboundStreamHandlerAdapter;
import io.netty.channel.EventLoop; import io.netty.channel.EventLoop;
import io.netty.handler.timeout.ReadTimeoutException; import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.net.ConnectException; import java.net.ConnectException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -28,6 +31,7 @@ import java.util.concurrent.TimeUnit;
* Keep reconnecting to the server while printing out the current uptime and * Keep reconnecting to the server while printing out the current uptime and
* connection attempt status. * connection attempt status.
*/ */
@Sharable
public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter { public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter {
private final UptimeClient client; private final UptimeClient client;
@ -45,6 +49,26 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter {
println("Connected to: " + ctx.channel().remoteAddress()); println("Connected to: " + ctx.channel().remoteAddress());
} }
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Byte> ctx, ChannelBuffer in) throws Exception {
// Discard received data
in.clear();
}
@Override
public void userEventTriggered(ChannelInboundHandlerContext<Byte> ctx, Object evt) throws Exception {
if (!(evt instanceof IdleStateEvent)) {
return;
}
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
// The connection was OK but there was no traffic for last period.
println("Disconnecting due to no inbound traffic");
ctx.close();
}
}
@Override @Override
public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception { public void channelInactive(ChannelInboundHandlerContext<Byte> ctx) throws Exception {
println("Disconnected from: " + ctx.channel().remoteAddress()); println("Disconnected from: " + ctx.channel().remoteAddress());
@ -55,7 +79,7 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter {
throws Exception { throws Exception {
println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + "s"); println("Sleeping for: " + UptimeClient.RECONNECT_DELAY + "s");
final EventLoop loop = ctx.channel().eventLoop(); final EventLoop loop = ctx.eventLoop();
loop.schedule(new Runnable() { loop.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -71,13 +95,8 @@ public class UptimeClientHandler extends ChannelInboundStreamHandlerAdapter {
startTime = -1; startTime = -1;
println("Failed to connect: " + cause.getMessage()); println("Failed to connect: " + cause.getMessage());
} }
if (cause instanceof ReadTimeoutException) {
// The connection was OK but there was no traffic for last period.
println("Disconnecting due to no inbound traffic");
} else {
cause.printStackTrace(); cause.printStackTrace();
} ctx.close();
ctx.channel().close();
} }
void println(String msg) { void println(String msg) {

View File

@ -1,79 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.timeout;
import static io.netty.channel.Channels.*;
import java.text.DateFormat;
import java.util.Date;
import java.util.Locale;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
/**
* The default {@link IdleStateEvent} implementation.
*/
public class DefaultIdleStateEvent implements IdleStateEvent {
private final Channel channel;
private final IdleState state;
private final long lastActivityTimeMillis;
/**
* Creates a new instance.
*/
public DefaultIdleStateEvent(
Channel channel, IdleState state, long lastActivityTimeMillis) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (state == null) {
throw new NullPointerException("state");
}
this.channel = channel;
this.state = state;
this.lastActivityTimeMillis = lastActivityTimeMillis;
}
@Override
public Channel getChannel() {
return channel;
}
@Override
public ChannelFuture getFuture() {
return succeededFuture(getChannel());
}
@Override
public IdleState getState() {
return state;
}
@Override
public long getLastActivityTimeMillis() {
return lastActivityTimeMillis;
}
@Override
public String toString() {
return getChannel().toString() + ' ' + getState() + " since " +
DateFormat.getDateTimeInstance(
DateFormat.SHORT, DateFormat.SHORT, Locale.US).format(
new Date(getLastActivityTimeMillis()));
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.timeout;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelHandler;
/**
* An extended {@link SimpleChannelHandler} that adds the handler method for
* an {@link IdleStateEvent}.
* @apiviz.uses io.netty.handler.timeout.IdleStateEvent
*/
public class IdleStateAwareChannelHandler extends SimpleChannelHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof IdleStateEvent) {
channelIdle(ctx, (IdleStateEvent) e);
} else {
super.handleUpstream(ctx, e);
}
}
/**
* Invoked when a {@link Channel} has been idle for a while.
*/
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
ctx.sendUpstream(e);
}
}

View File

@ -1,46 +0,0 @@
/*
* Copyright 2011 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.timeout;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelUpstreamHandler;
/**
* An extended {@link SimpleChannelUpstreamHandler} that adds the handler method
* for an {@link IdleStateEvent}.
* @apiviz.uses io.netty.handler.timeout.IdleStateEvent
*/
public class IdleStateAwareChannelUpstreamHandler extends SimpleChannelUpstreamHandler {
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
if (e instanceof IdleStateEvent) {
channelIdle(ctx, (IdleStateEvent) e);
} else {
super.handleUpstream(ctx, e);
}
}
/**
* Invoked when a {@link Channel} has been idle for a while.
*/
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
ctx.sendUpstream(e);
}
}

View File

@ -16,22 +16,53 @@
package io.netty.handler.timeout; package io.netty.handler.timeout;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
/** /**
* A {@link ChannelEvent} that is triggered when a {@link Channel} has been idle * A user event triggered by {@link IdleStateHandler} when a {@link Channel} is idle.
* for a while. *
* @apiviz.landmark * @apiviz.landmark
* @apiviz.has io.netty.handler.timeout.IdleState oneway - - * @apiviz.has io.netty.handler.timeout.IdleState oneway - -
*/ */
public interface IdleStateEvent extends ChannelEvent { public class IdleStateEvent {
private final IdleState state;
private final int count;
private final long durationMillis;
public IdleStateEvent(IdleState state, int count, long durationMillis) {
if (state == null) {
throw new NullPointerException("state");
}
if (count < 0) {
throw new IllegalStateException(String.format("count: %d (expected: >= 0)", count));
}
if (durationMillis < 0) {
throw new IllegalStateException(String.format(
"durationMillis: %d (expected: >= 0)", durationMillis));
}
this.state = state;
this.count = count;
this.durationMillis = durationMillis;
}
/** /**
* Returns the detailed idle state. * Returns the detailed idle state.
*/ */
IdleState getState(); public IdleState state() {
return state;
}
/** public int count() {
* Returns the last time when I/O occurred in milliseconds. return count;
*/ }
long getLastActivityTimeMillis();
public long durationMillis() {
return durationMillis;
}
@Override
public String toString() {
return state + "(" + count + ", " + durationMillis + "ms)";
}
} }

View File

@ -15,28 +15,25 @@
*/ */
package io.netty.handler.timeout; package io.netty.handler.timeout;
import static io.netty.channel.Channels.*;
import java.util.concurrent.TimeUnit;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelOutboundHandlerContext;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory; import io.netty.channel.EventLoop;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.Channels;
import io.netty.channel.LifeCycleAwareChannelHandler;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.channel.WriteCompletionEvent;
import io.netty.util.ExternalResourceReleasable;
import io.netty.util.HashedWheelTimer; import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer; import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.nio.channels.Channels;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/** /**
* Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed * Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
@ -119,23 +116,28 @@ import io.netty.util.TimerTask;
* @apiviz.uses io.netty.util.HashedWheelTimer * @apiviz.uses io.netty.util.HashedWheelTimer
* @apiviz.has io.netty.handler.timeout.IdleStateEvent oneway - - triggers * @apiviz.has io.netty.handler.timeout.IdleStateEvent oneway - - triggers
*/ */
@Sharable public class IdleStateHandler extends ChannelHandlerAdapter<Object, Object> {
public class IdleStateHandler extends SimpleChannelUpstreamHandler
implements LifeCycleAwareChannelHandler,
ExternalResourceReleasable {
final Timer timer; private final long readerIdleTimeMillis;
private final long writerIdleTimeMillis;
private final long allIdleTimeMillis;
final long readerIdleTimeMillis; volatile ScheduledFuture<?> readerIdleTimeout;
final long writerIdleTimeMillis; volatile long lastReadTime;
final long allIdleTimeMillis; int readerIdleCount;
volatile ScheduledFuture<?> writerIdleTimeout;
volatile long lastWriteTime;
int writerIdleCount;
volatile ScheduledFuture<?> allIdleTimeout;
int allIdleCount;
volatile boolean destroyed;
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param timer
* the {@link Timer} that is used to trigger the scheduled event.
* The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
* @param readerIdleTimeSeconds * @param readerIdleTimeSeconds
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
* will be triggered when no read was performed for the specified * will be triggered when no read was performed for the specified
@ -150,22 +152,17 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
* the specified period of time. Specify {@code 0} to disable. * the specified period of time. Specify {@code 0} to disable.
*/ */
public IdleStateHandler( public IdleStateHandler(
Timer timer,
int readerIdleTimeSeconds, int readerIdleTimeSeconds,
int writerIdleTimeSeconds, int writerIdleTimeSeconds,
int allIdleTimeSeconds) { int allIdleTimeSeconds) {
this(timer, this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS); TimeUnit.SECONDS);
} }
/** /**
* Creates a new instance. * Creates a new instance.
* *
* @param timer
* the {@link Timer} that is used to trigger the scheduled event.
* The recommended {@link Timer} implementation is {@link HashedWheelTimer}.
* @param readerIdleTime * @param readerIdleTime
* an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE} * an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
* will be triggered when no read was performed for the specified * will be triggered when no read was performed for the specified
@ -183,18 +180,13 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
* {@code writeIdleTime}, and {@code allIdleTime} * {@code writeIdleTime}, and {@code allIdleTime}
*/ */
public IdleStateHandler( public IdleStateHandler(
Timer timer,
long readerIdleTime, long writerIdleTime, long allIdleTime, long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) { TimeUnit unit) {
if (timer == null) {
throw new NullPointerException("timer");
}
if (unit == null) { if (unit == null) {
throw new NullPointerException("unit"); throw new NullPointerException("unit");
} }
this.timer = timer;
if (readerIdleTime <= 0) { if (readerIdleTime <= 0) {
readerIdleTimeMillis = 0; readerIdleTimeMillis = 0;
} else { } else {
@ -212,148 +204,118 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} }
} }
/**
* Stops the {@link Timer} which was specified in the constructor of this
* handler. You should not call this method if the {@link Timer} is in use
* by other objects.
*/
@Override @Override
public void releaseExternalResources() { public ChannelBufferHolder<Object> newInboundBuffer(ChannelInboundHandlerContext<Object> ctx) throws Exception {
timer.stop(); return ChannelBufferHolders.inboundBypassBuffer(ctx);
}
@Override
public ChannelBufferHolder<Object> newOutboundBuffer(ChannelOutboundHandlerContext<Object> ctx) throws Exception {
return ChannelBufferHolders.outboundBypassBuffer(ctx);
} }
@Override @Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception { public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
if (ctx.pipeline().isAttached()) { if (ctx.channel().isActive()) {
// channelOpen event has been fired already, which means // channelActvie() event has been fired already, which means this.channelActive() will
// this.channelOpen() will not be invoked. // not be invoked. We have to initialize here instead.
// We have to initialize here instead.
initialize(ctx); initialize(ctx);
} else { } else {
// channelOpen event has not been fired yet. // channelActive() event has not been fired yet. this.channelOpen() will be invoked
// this.channelOpen() will be invoked and initialization will occur there. // and initialization will occur there.
} }
} }
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override @Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception { public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
destroy(ctx); destroy();
} }
@Override @Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception { public void channelActive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
// NOOP
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// This method will be invoked only if this handler was added // This method will be invoked only if this handler was added
// before channelOpen event is fired. If a user adds this handler // before channelActive() event is fired. If a user adds this handler
// after the channelOpen event, initialize() will be called by beforeAdd(). // after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx); initialize(ctx);
ctx.sendUpstream(e); super.channelActive(ctx);
} }
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void channelInactive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
throws Exception { destroy();
destroy(ctx); super.channelInactive(ctx);
ctx.sendUpstream(e); }
@Override
public void inboundBufferUpdated(ChannelInboundHandlerContext<Object> ctx) throws Exception {
lastReadTime = System.currentTimeMillis();
readerIdleCount = allIdleCount = 0;
ctx.fireInboundBufferUpdated();
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void flush(final ChannelOutboundHandlerContext<Object> ctx, ChannelFuture future) throws Exception {
throws Exception { future.addListener(new ChannelFutureListener() {
State state = (State) ctx.getAttachment();
state.lastReadTime = System.currentTimeMillis();
ctx.sendUpstream(e);
}
@Override @Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) public void operationComplete(ChannelFuture future) throws Exception {
throws Exception { lastWriteTime = System.currentTimeMillis();
if (e.getWrittenAmount() > 0) { writerIdleCount = allIdleCount = 0;
State state = (State) ctx.getAttachment();
state.lastWriteTime = System.currentTimeMillis();
} }
ctx.sendUpstream(e); });
super.flush(ctx, future);
} }
private void initialize(ChannelHandlerContext ctx) { private void initialize(ChannelHandlerContext ctx) {
State state = state(ctx);
// Avoid the case where destroy() is called before scheduling timeouts. // Avoid the case where destroy() is called before scheduling timeouts.
// See: https://github.com/netty/netty/issues/143 // See: https://github.com/netty/netty/issues/143
if (state.destroyed) { if (destroyed) {
return; return;
} }
state.lastReadTime = state.lastWriteTime = System.currentTimeMillis(); EventLoop loop = ctx.eventLoop();
lastReadTime = lastWriteTime = System.currentTimeMillis();
if (readerIdleTimeMillis > 0) { if (readerIdleTimeMillis > 0) {
state.readerIdleTimeout = timer.newTimeout( readerIdleTimeout = loop.schedule(
new ReaderIdleTimeoutTask(ctx), new ReaderIdleTimeoutTask(ctx),
readerIdleTimeMillis, TimeUnit.MILLISECONDS); readerIdleTimeMillis, TimeUnit.MILLISECONDS);
} }
if (writerIdleTimeMillis > 0) { if (writerIdleTimeMillis > 0) {
state.writerIdleTimeout = timer.newTimeout( writerIdleTimeout = loop.schedule(
new WriterIdleTimeoutTask(ctx), new WriterIdleTimeoutTask(ctx),
writerIdleTimeMillis, TimeUnit.MILLISECONDS); writerIdleTimeMillis, TimeUnit.MILLISECONDS);
} }
if (allIdleTimeMillis > 0) { if (allIdleTimeMillis > 0) {
state.allIdleTimeout = timer.newTimeout( allIdleTimeout = loop.schedule(
new AllIdleTimeoutTask(ctx), new AllIdleTimeoutTask(ctx),
allIdleTimeMillis, TimeUnit.MILLISECONDS); allIdleTimeMillis, TimeUnit.MILLISECONDS);
} }
} }
private void destroy(ChannelHandlerContext ctx) { private void destroy() {
State state; destroyed = true;
synchronized (ctx) { if (readerIdleTimeout != null) {
state = state(ctx); readerIdleTimeout.cancel(false);
state.destroyed = true; readerIdleTimeout = null;
} }
if (writerIdleTimeout != null) {
if (state.readerIdleTimeout != null) { writerIdleTimeout.cancel(false);
state.readerIdleTimeout.cancel(); writerIdleTimeout = null;
state.readerIdleTimeout = null;
} }
if (state.writerIdleTimeout != null) { if (allIdleTimeout != null) {
state.writerIdleTimeout.cancel(); allIdleTimeout.cancel(false);
state.writerIdleTimeout = null; allIdleTimeout = null;
}
if (state.allIdleTimeout != null) {
state.allIdleTimeout.cancel();
state.allIdleTimeout = null;
} }
} }
private State state(ChannelHandlerContext ctx) { protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
State state; ctx.fireUserEventTriggered(evt);
synchronized (ctx) {
// FIXME: It could have been better if there is setAttachmentIfAbsent().
state = (State) ctx.getAttachment();
if (state != null) {
return state;
}
state = new State();
ctx.setAttachment(state);
}
return state;
} }
protected void channelIdle( private final class ReaderIdleTimeoutTask implements Runnable {
ChannelHandlerContext ctx, IdleState state, long lastActivityTimeMillis) throws Exception {
ctx.sendUpstream(new DefaultIdleStateEvent(ctx.channel(), state, lastActivityTimeMillis));
}
private final class ReaderIdleTimeoutTask implements TimerTask {
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
@ -362,34 +324,33 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} }
@Override @Override
public void run(Timeout timeout) throws Exception { public void run() {
if (timeout.isCancelled() || !ctx.channel().isOpen()) { if (!ctx.channel().isOpen()) {
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long lastReadTime = state.lastReadTime; long lastReadTime = IdleStateHandler.this.lastReadTime;
long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime); long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Reader is idle - set a new timeout and notify the callback. // Reader is idle - set a new timeout and notify the callback.
state.readerIdleTimeout = readerIdleTimeout =
timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); ctx.eventLoop().schedule(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS);
try { try {
channelIdle(ctx, IdleState.READER_IDLE, lastReadTime); channelIdle(ctx, new IdleStateEvent(
IdleState.READER_IDLE, readerIdleCount ++, currentTime - lastReadTime));
} catch (Throwable t) { } catch (Throwable t) {
fireExceptionCaught(ctx, t); ctx.fireExceptionCaught(t);
} }
} else { } else {
// Read occurred before the timeout - set a new timeout with shorter delay. // Read occurred before the timeout - set a new timeout with shorter delay.
state.readerIdleTimeout = readerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private final class WriterIdleTimeoutTask implements TimerTask { private final class WriterIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
@ -398,33 +359,32 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} }
@Override @Override
public void run(Timeout timeout) throws Exception { public void run() {
if (timeout.isCancelled() || !ctx.channel().isOpen()) { if (!ctx.channel().isOpen()) {
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long lastWriteTime = state.lastWriteTime; long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime); long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Writer is idle - set a new timeout and notify the callback. // Writer is idle - set a new timeout and notify the callback.
state.writerIdleTimeout = writerIdleTimeout = ctx.eventLoop().schedule(
timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS); this, writerIdleTimeMillis, TimeUnit.MILLISECONDS);
try { try {
channelIdle(ctx, IdleState.WRITER_IDLE, lastWriteTime); channelIdle(ctx, new IdleStateEvent(
IdleState.WRITER_IDLE, writerIdleCount ++, currentTime - lastWriteTime));
} catch (Throwable t) { } catch (Throwable t) {
fireExceptionCaught(ctx, t); ctx.fireExceptionCaught(t);
} }
} else { } else {
// Write occurred before the timeout - set a new timeout with shorter delay. // Write occurred before the timeout - set a new timeout with shorter delay.
state.writerIdleTimeout = writerIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private final class AllIdleTimeoutTask implements TimerTask { private final class AllIdleTimeoutTask implements Runnable {
private final ChannelHandlerContext ctx; private final ChannelHandlerContext ctx;
@ -433,46 +393,30 @@ public class IdleStateHandler extends SimpleChannelUpstreamHandler
} }
@Override @Override
public void run(Timeout timeout) throws Exception { public void run() {
if (timeout.isCancelled() || !ctx.channel().isOpen()) { if (!ctx.channel().isOpen()) {
return; return;
} }
State state = (State) ctx.getAttachment();
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
long lastIoTime = Math.max(state.lastReadTime, state.lastWriteTime); long lastIoTime = Math.max(lastReadTime, lastWriteTime);
long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime); long nextDelay = allIdleTimeMillis - (currentTime - lastIoTime);
if (nextDelay <= 0) { if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and // Both reader and writer are idle - set a new timeout and
// notify the callback. // notify the callback.
state.allIdleTimeout = allIdleTimeout = ctx.eventLoop().schedule(
timer.newTimeout(this, allIdleTimeMillis, TimeUnit.MILLISECONDS); this, allIdleTimeMillis, TimeUnit.MILLISECONDS);
try { try {
channelIdle(ctx, IdleState.ALL_IDLE, lastIoTime); channelIdle(ctx, new IdleStateEvent(
IdleState.ALL_IDLE, allIdleCount ++, currentTime - lastIoTime));
} catch (Throwable t) { } catch (Throwable t) {
fireExceptionCaught(ctx, t); ctx.fireExceptionCaught(t);
} }
} else { } else {
// Either read or write occurred before the timeout - set a new // Either read or write occurred before the timeout - set a new
// timeout with shorter delay. // timeout with shorter delay.
state.allIdleTimeout = allIdleTimeout = ctx.eventLoop().schedule(this, nextDelay, TimeUnit.MILLISECONDS);
timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS);
} }
} }
} }
private static final class State {
State() {
}
volatile Timeout readerIdleTimeout;
volatile long lastReadTime;
volatile Timeout writerIdleTimeout;
volatile long lastWriteTime;
volatile Timeout allIdleTimeout;
volatile boolean destroyed;
}
} }

View File

@ -1,6 +1,7 @@
package io.netty.bootstrap; package io.netty.bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -112,7 +113,7 @@ public class Bootstrap {
} }
public ChannelFuture bind(ChannelFuture future) { public ChannelFuture bind(ChannelFuture future) {
validate(); validate(future);
if (localAddress == null) { if (localAddress == null) {
throw new IllegalStateException("localAddress not set"); throw new IllegalStateException("localAddress not set");
} }
@ -124,6 +125,10 @@ public class Bootstrap {
return future; return future;
} }
if (!ensureOpen(future)) {
return future;
}
return channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); return channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} }
@ -133,7 +138,7 @@ public class Bootstrap {
} }
public ChannelFuture connect(ChannelFuture future) { public ChannelFuture connect(ChannelFuture future) {
validate(); validate(future);
if (remoteAddress == null) { if (remoteAddress == null) {
throw new IllegalStateException("remoteAddress not set"); throw new IllegalStateException("remoteAddress not set");
} }
@ -145,6 +150,10 @@ public class Bootstrap {
return future; return future;
} }
if (!ensureOpen(future)) {
return future;
}
if (localAddress == null) { if (localAddress == null) {
channel.connect(remoteAddress, future); channel.connect(remoteAddress, future);
} else { } else {
@ -180,6 +189,16 @@ public class Bootstrap {
eventLoop.register(channel).syncUninterruptibly(); eventLoop.register(channel).syncUninterruptibly();
} }
private static boolean ensureOpen(ChannelFuture future) {
if (!future.channel().isOpen()) {
// Registration was successful but the channel was closed due to some failure in
// initializer.
future.setFailure(new ChannelException("initialization failure"));
return false;
}
return true;
}
public void shutdown() { public void shutdown() {
if (eventLoop != null) { if (eventLoop != null) {
eventLoop.shutdown(); eventLoop.shutdown();
@ -197,4 +216,15 @@ public class Bootstrap {
throw new IllegalStateException("initializer not set"); throw new IllegalStateException("initializer not set");
} }
} }
private void validate(ChannelFuture future) {
if (future == null) {
throw new NullPointerException("future");
}
if (future.channel() != channel) {
throw new IllegalArgumentException("future.channel() must be the same channel.");
}
validate();
}
} }

View File

@ -3,6 +3,7 @@ package io.netty.bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder; import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders; import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -136,7 +137,7 @@ public class ServerBootstrap {
} }
public ChannelFuture bind(ChannelFuture future) { public ChannelFuture bind(ChannelFuture future) {
validate(); validate(future);
if (channel.isActive()) { if (channel.isActive()) {
future.setFailure(new IllegalStateException("channel already bound: " + channel)); future.setFailure(new IllegalStateException("channel already bound: " + channel));
return future; return future;
@ -162,6 +163,13 @@ public class ServerBootstrap {
return future; return future;
} }
if (!channel.isOpen()) {
// Registration was successful but the channel was closed due to some failure in
// initializer.
future.setFailure(new ChannelException("initialization failure"));
return future;
}
channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); channel.bind(localAddress, future).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
return future; return future;
@ -196,6 +204,17 @@ public class ServerBootstrap {
} }
} }
private void validate(ChannelFuture future) {
if (future == null) {
throw new NullPointerException("future");
}
if (future.channel() != channel) {
throw new IllegalArgumentException("future.channel() must be the same channel.");
}
validate();
}
private class Acceptor extends ChannelInboundHandlerAdapter<Channel> { private class Acceptor extends ChannelInboundHandlerAdapter<Channel> {
@Override @Override
public ChannelBufferHolder<Channel> newInboundBuffer(ChannelInboundHandlerContext<Channel> ctx) { public ChannelBufferHolder<Channel> newInboundBuffer(ChannelInboundHandlerContext<Channel> ctx) {

View File

@ -0,0 +1,31 @@
package io.netty.channel;
public class AbstractChannelHandler implements ChannelHandler {
// Not using volatile because it's used only for a sanity check.
boolean added;
final boolean isSharable() {
return getClass().isAnnotationPresent(Sharable.class);
}
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
}

View File

@ -2,27 +2,8 @@ package io.netty.channel;
import java.net.SocketAddress; import java.net.SocketAddress;
public abstract class ChannelHandlerAdapter<I, O> implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> { public abstract class ChannelHandlerAdapter<I, O> extends AbstractChannelHandler
implements ChannelInboundHandler<I>, ChannelOutboundHandler<O> {
@Override
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override @Override
public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception { public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception {

View File

@ -129,6 +129,7 @@ public interface ChannelHandlerContext
ChannelInboundInvoker, ChannelOutboundInvoker { ChannelInboundInvoker, ChannelOutboundInvoker {
Channel channel(); Channel channel();
ChannelPipeline pipeline(); ChannelPipeline pipeline();
EventLoop eventLoop();
String name(); String name();
ChannelHandler handler(); ChannelHandler handler();

View File

@ -4,26 +4,8 @@ import io.netty.buffer.ChannelBuffer;
import java.util.Queue; import java.util.Queue;
public abstract class ChannelInboundHandlerAdapter<I> implements ChannelInboundHandler<I> { public abstract class ChannelInboundHandlerAdapter<I> extends AbstractChannelHandler
@Override implements ChannelInboundHandler<I> {
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override @Override
public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception { public void channelRegistered(ChannelInboundHandlerContext<I> ctx) throws Exception {

View File

@ -45,7 +45,7 @@ public abstract class ChannelInitializer<C extends Channel> extends ChannelInbou
// inserted a handler before the initializer using pipeline.addFirst(). // inserted a handler before the initializer using pipeline.addFirst().
ctx.pipeline().fireChannelRegistered(); ctx.pipeline().fireChannelRegistered();
} catch (Throwable t) { } catch (Throwable t) {
logger.warn("Failed to initialize a channel. Closing: " + ctx.channel()); logger.warn("Failed to initialize a channel. Closing: " + ctx.channel(), t);
ctx.close(); ctx.close();
} }
} }

View File

@ -5,26 +5,8 @@ import io.netty.buffer.ChannelBuffer;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.util.Queue; import java.util.Queue;
public abstract class ChannelOutboundHandlerAdapter<O> implements ChannelOutboundHandler<O> { public abstract class ChannelOutboundHandlerAdapter<O> extends AbstractChannelHandler
@Override implements ChannelOutboundHandler<O> {
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
// Do nothing by default.
}
@Override @Override
public void bind(ChannelOutboundHandlerContext<O> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception { public void bind(ChannelOutboundHandlerContext<O> ctx, SocketAddress localAddress, ChannelFuture future) throws Exception {

View File

@ -2,8 +2,8 @@ package io.netty.channel;
import java.net.SocketAddress; import java.net.SocketAddress;
public class CombinedChannelHandler implements ChannelInboundHandler<Object>, public class CombinedChannelHandler extends AbstractChannelHandler
ChannelOutboundHandler<Object> { implements ChannelInboundHandler<Object>, ChannelOutboundHandler<Object> {
private ChannelOutboundHandler<Object> out; private ChannelOutboundHandler<Object> out;
private ChannelInboundHandler<Object> in; private ChannelInboundHandler<Object> in;

View File

@ -364,11 +364,20 @@ public class DefaultChannelPipeline implements ChannelPipeline {
} }
private static void callBeforeAdd(ChannelHandlerContext ctx) { private static void callBeforeAdd(ChannelHandlerContext ctx) {
ChannelHandler handler = ctx.handler();
if (handler instanceof AbstractChannelHandler) {
AbstractChannelHandler h = (AbstractChannelHandler) handler;
if (!h.isSharable() && h.added) {
throw new ChannelHandlerLifeCycleException(
"Only a @Sharable handler can be added or removed multiple times.");
}
h.added = true;
}
try { try {
ctx.handler().beforeAdd(ctx); handler.beforeAdd(ctx);
} catch (Throwable t) { } catch (Throwable t) {
throw new ChannelHandlerLifeCycleException( throw new ChannelHandlerLifeCycleException(
ctx.handler().getClass().getName() + handler.getClass().getName() +
".beforeAdd() has thrown an exception; not adding.", t); ".beforeAdd() has thrown an exception; not adding.", t);
} }
} }
@ -1150,6 +1159,11 @@ public class DefaultChannelPipeline implements ChannelPipeline {
return DefaultChannelPipeline.this; return DefaultChannelPipeline.this;
} }
@Override
public EventLoop eventLoop() {
return channel().eventLoop();
}
@Override @Override
public ChannelHandler handler() { public ChannelHandler handler() {
return handler; return handler;