From 587c0ac27f69b7b4b63a21a04d61bd41683deb71 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Wed, 11 Feb 2009 05:59:35 +0000 Subject: [PATCH] Added IdlenessHandler, IdlenessEvent, and DefaultIdlenessEvent --- .../handler/timeout/DefaultIdlenessEvent.java | 81 +++++++ .../netty/handler/timeout/IdlenessEvent.java | 35 +++ .../handler/timeout/IdlenessHandler.java | 227 ++++++++++++++++++ 3 files changed, 343 insertions(+) create mode 100644 src/main/java/org/jboss/netty/handler/timeout/DefaultIdlenessEvent.java create mode 100644 src/main/java/org/jboss/netty/handler/timeout/IdlenessEvent.java create mode 100644 src/main/java/org/jboss/netty/handler/timeout/IdlenessHandler.java diff --git a/src/main/java/org/jboss/netty/handler/timeout/DefaultIdlenessEvent.java b/src/main/java/org/jboss/netty/handler/timeout/DefaultIdlenessEvent.java new file mode 100644 index 0000000000..77c6f242e0 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/timeout/DefaultIdlenessEvent.java @@ -0,0 +1,81 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.handler.timeout; + +import static org.jboss.netty.channel.Channels.*; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +final class DefaultIdlenessEvent implements IdlenessEvent { + + private final Channel channel; + private final long lastReadTime; + private final long lastWriteTime; + private final long readerIdleTime; + private final long writerIdleTime; + + DefaultIdlenessEvent( + Channel channel, + long lastReadTime, long lastWriteTime, + long readerIdleTime, long writerIdleTime) { + + if (channel == null) { + throw new NullPointerException("channel"); + } + + this.channel = channel; + this.lastReadTime = lastReadTime; + this.lastWriteTime = lastWriteTime; + this.readerIdleTime = readerIdleTime; + this.writerIdleTime = writerIdleTime; + } + + public Channel getChannel() { + return channel; + } + + public boolean isReaderIdle() { + return System.currentTimeMillis() + readerIdleTime > lastReadTime; + } + + public boolean isWriterIdle() { + return System.currentTimeMillis() + writerIdleTime > lastWriteTime; + } + + public ChannelFuture getFuture() { + return succeededFuture(getChannel()); + } + + @Override + public String toString() { + return getChannel().toString() + " - (IDLE: " + + (isReaderIdle()? 'R' : '_') + + (isWriterIdle()? 'W' : '_') + ')'; + } +} diff --git a/src/main/java/org/jboss/netty/handler/timeout/IdlenessEvent.java b/src/main/java/org/jboss/netty/handler/timeout/IdlenessEvent.java new file mode 100644 index 0000000000..81c537511f --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/timeout/IdlenessEvent.java @@ -0,0 +1,35 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.handler.timeout; + +import org.jboss.netty.channel.ChannelEvent; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +public interface IdlenessEvent extends ChannelEvent { + boolean isReaderIdle(); + boolean isWriterIdle(); +} diff --git a/src/main/java/org/jboss/netty/handler/timeout/IdlenessHandler.java b/src/main/java/org/jboss/netty/handler/timeout/IdlenessHandler.java new file mode 100644 index 0000000000..f4242e6938 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/timeout/IdlenessHandler.java @@ -0,0 +1,227 @@ +/* + * JBoss, Home of Professional Open Source + * + * Copyright 2009, Red Hat Middleware LLC, and individual contributors + * by the @author tags. See the COPYRIGHT.txt in the distribution for a + * full listing of individual contributors. + * + * This is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as + * published by the Free Software Foundation; either version 2.1 of + * the License, or (at your option) any later version. + * + * This software is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this software; if not, write to the Free + * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + * 02110-1301 USA, or see the FSF site: http://www.fsf.org. + */ +package org.jboss.netty.handler.timeout; + +import java.util.concurrent.TimeUnit; + +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.Channels; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.WriteCompletionEvent; +import org.jboss.netty.util.ExternalResourceReleasable; + +/** + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (tlee@redhat.com) + * @version $Rev$, $Date$ + */ +@ChannelPipelineCoverage("one") +public class IdlenessHandler extends SimpleChannelUpstreamHandler implements LifeCycleAwareChannelHandler, ExternalResourceReleasable { + + static final ReadTimeoutException EXCEPTION = new ReadTimeoutException(); + + final Timer timer; + + final long readerIdleTimeMillis; + volatile Timeout readerIdleTimeout; + private volatile ReaderIdleTimeoutTask readerIdleTimeoutTask; + volatile long lastReadTime; + + final long writerIdleTimeMillis; + volatile Timeout writerIdleTimeout; + private volatile WriterIdleTimeoutTask writerIdleTimeoutTask; + volatile long lastWriteTime; + + public IdlenessHandler( + Timer timer, long readerIdleTimeMillis, long writerIdleTimeMillis) { + this(timer, readerIdleTimeMillis, writerIdleTimeMillis, TimeUnit.MILLISECONDS); + } + + public IdlenessHandler( + Timer timer, long readerIdleTime, long writerIdleTime, TimeUnit unit) { + if (timer == null) { + throw new NullPointerException("timer"); + } + if (unit == null) { + throw new NullPointerException("unit"); + } + if (readerIdleTime < 0) { + throw new IllegalArgumentException( + "readerIdleTime must not be less than 0: " + readerIdleTime); + } + if (writerIdleTime < 0) { + throw new IllegalArgumentException( + "writerIdleTime must not be less than 0: " + writerIdleTime); + } + + this.timer = timer; + readerIdleTimeMillis = unit.toMillis(readerIdleTime); + writerIdleTimeMillis = unit.toMillis(writerIdleTime); + } + + public void releaseExternalResources() { + timer.stop(); + } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + initialize(ctx); + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + destroy(); + } + + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + @Override + public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + initialize(ctx); + ctx.sendUpstream(e); + } + + @Override + public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) + throws Exception { + destroy(); + ctx.sendUpstream(e); + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + lastReadTime = System.currentTimeMillis(); + ctx.sendUpstream(e); + } + + @Override + public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) + throws Exception { + if (e.getWrittenAmount() > 0) { + lastWriteTime = System.currentTimeMillis(); + } + ctx.sendUpstream(e); + } + + private void initialize(ChannelHandlerContext ctx) { + lastReadTime = lastWriteTime = System.currentTimeMillis(); + readerIdleTimeoutTask = new ReaderIdleTimeoutTask(ctx); + writerIdleTimeoutTask = new WriterIdleTimeoutTask(ctx); + readerIdleTimeout = timer.newTimeout(readerIdleTimeoutTask, readerIdleTimeMillis, TimeUnit.MILLISECONDS); + writerIdleTimeout = timer.newTimeout(writerIdleTimeoutTask, writerIdleTimeMillis, TimeUnit.MILLISECONDS); + } + + private void destroy() { + if (readerIdleTimeout != null) { + readerIdleTimeout.cancel(); + } + if (writerIdleTimeout != null) { + writerIdleTimeout.cancel(); + } + readerIdleTimeout = null; + readerIdleTimeoutTask = null; + writerIdleTimeout = null; + writerIdleTimeoutTask = null; + } + + private final class ReaderIdleTimeoutTask implements TimerTask { + + private final ChannelHandlerContext ctx; + + ReaderIdleTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + return; + } + + if (!ctx.getChannel().isOpen()) { + return; + } + + long currentTime = System.currentTimeMillis(); + long lastReadTime = IdlenessHandler.this.lastReadTime; + long nextDelay = readerIdleTimeMillis - (currentTime - lastReadTime); + if (nextDelay <= 0) { + // Reader is idle - set a new timeout and notify the callback. + readerIdleTimeout = + timer.newTimeout(this, readerIdleTimeMillis, TimeUnit.MILLISECONDS); + ctx.sendUpstream(new DefaultIdlenessEvent( + ctx.getChannel(), lastReadTime, lastWriteTime, + readerIdleTimeMillis, writerIdleTimeMillis)); + Channels.fireExceptionCaught(ctx, EXCEPTION); + } else { + // Read occurred before the timeout - set a new timeout with shorter delay. + readerIdleTimeout = + timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); + } + } + } + + private final class WriterIdleTimeoutTask implements TimerTask { + + private final ChannelHandlerContext ctx; + + WriterIdleTimeoutTask(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + public void run(Timeout timeout) throws Exception { + if (timeout.isCancelled()) { + return; + } + + if (!ctx.getChannel().isOpen()) { + return; + } + + long currentTime = System.currentTimeMillis(); + long lastWriteTime = IdlenessHandler.this.lastWriteTime; + long nextDelay = writerIdleTimeMillis - (currentTime - lastWriteTime); + if (nextDelay <= 0) { + // Writer is idle - set a new timeout and notify the callback. + writerIdleTimeout = + timer.newTimeout(this, writerIdleTimeMillis, TimeUnit.MILLISECONDS); + ctx.sendUpstream(new DefaultIdlenessEvent( + ctx.getChannel(), lastReadTime, lastWriteTime, + readerIdleTimeMillis, writerIdleTimeMillis)); + } else { + // Write occurred before the timeout - set a new timeout with shorter delay. + writerIdleTimeout = + timer.newTimeout(this, nextDelay, TimeUnit.MILLISECONDS); + } + } + } +}