From 2685d03d8f6c9f95ad9ecab214c587ff95e69437 Mon Sep 17 00:00:00 2001 From: Trustin Lee Date: Mon, 7 Dec 2009 06:22:06 +0000 Subject: [PATCH] Resolved issue: NETTY-208 (Blocking read handler) * Added BlockingReadHandler and its exception --- .../handler/queue/BlockingReadHandler.java | 214 ++++++++++++++++++ .../queue/BlockingReadTimeoutException.java | 60 +++++ .../netty/handler/queue/package-info.java | 21 ++ 3 files changed, 295 insertions(+) create mode 100644 src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java create mode 100644 src/main/java/org/jboss/netty/handler/queue/BlockingReadTimeoutException.java create mode 100644 src/main/java/org/jboss/netty/handler/queue/package-info.java diff --git a/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java new file mode 100644 index 0000000000..1e27a76221 --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/queue/BlockingReadHandler.java @@ -0,0 +1,214 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.queue; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelPipeline; +import org.jboss.netty.channel.ChannelPipelineCoverage; +import org.jboss.netty.channel.LifeCycleAwareChannelHandler; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.util.internal.IoWorkerRunnable; +import org.jboss.netty.util.internal.LinkedTransferQueue; + +/** + * Emulates blocking read operation. This handler stores all received messages + * into a {@link BlockingQueue} and returns the received messages when + * {@link #read()} or {@link #read(long, TimeUnit)} method is called. + *

+ * Please note that this handler is only useful for the cases where there are + * very small number of connections, such as testing and simple client-side + * application development. + *

+ * Also, any handler placed after this handler will never receive a + * {@code messageReceived} event, hence it should be placed in the last place + * in a pipeline. + *

+ * Here is an example that demonstrates the usage: + *

+ * BlockingReadHandler<ChannelBuffer> reader =
+ *         new BlockingReadHandler<ChannelBuffer>();
+ * ChannelPipeline p = ...;
+ * p.addLast("reader", reader);
+ *
+ * ...
+ *
+ * // Read a message from a channel in a blocking manner.
+ * try {
+ *     ChannelBuffer buf = reader.read(60, TimeUnit.SECONDS);
+ *     if (buf == null) {
+ *         // Connection closed.
+ *     } else {
+ *         // Handle the received message here.
+ *     }
+ * } catch (BlockingReadTimeoutException e) {
+ *     // Read timed out.
+ * }
+ * 
+ * + * @param the type of the received messages + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (trustin@gmail.com) + * @version $Rev$, $Date$ + */ +@ChannelPipelineCoverage("one") +public class BlockingReadHandler extends SimpleChannelUpstreamHandler + implements LifeCycleAwareChannelHandler { + + private final BlockingQueue queue; + private volatile Channel channel; + + /** + * Creates a new instance with the default unbounded {@link BlockingQueue} + * implementation. + */ + public BlockingReadHandler() { + this(new LinkedTransferQueue()); + } + + /** + * Creates a new instance with the specified {@link BlockingQueue}. + */ + public BlockingReadHandler(BlockingQueue queue) { + if (queue == null) { + throw new NullPointerException("queue"); + } + this.queue = queue; + } + + /** + * Returns the queue which stores the received messages. The default + * implementation returns the queue which was specified in the constructor. + */ + protected BlockingQueue getQueue() { + return queue; + } + + /** + * Returns {@code true} if and only if the {@link Channel} associated with + * this handler has been closed. + * + * @throws IllegalStateException + * if this handler was not added to a {@link ChannelPipeline} yet + */ + public boolean isClosed() { + Channel ch = channel; + if (ch == null) { + throw new IllegalStateException("not added to the pipeline yet"); + } + + return ch.getCloseFuture().isDone(); + } + + /** + * Waits until a new message is received or the associated {@link Channel} + * is closed. + * + * @return the received message or {@code null} if the associated + * {@link Channel} has been closed + * @throws InterruptedException + * if the operation has been interrupted + */ + public E read() throws InterruptedException { + detectDeadLock(); + if (isClosed()) { + if (getQueue().isEmpty()) { + return null; + } + } + + return getQueue().take(); + } + + /** + * Waits until a new message is received or the associated {@link Channel} + * is closed. + * + * @param timeout + * the amount time to wait until a new message is received. + * If no message is received within the timeout, + * {@link BlockingReadTimeoutException} is thrown. + * @param unit + * the unit of {@code timeout} + * + * @return the received message or {@code null} if the associated + * {@link Channel} has been closed + * @throws BlockingReadTimeoutException + * if no message was received within the specified timeout + * @throws InterruptedException + * if the operation has been interrupted + */ + public E read(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException { + detectDeadLock(); + if (isClosed()) { + if (getQueue().isEmpty()) { + return null; + } + } + + E msg = getQueue().poll(timeout, unit); + if (msg == null) { + throw new BlockingReadTimeoutException(); + } else { + return msg; + } + } + + private void detectDeadLock() { + if (IoWorkerRunnable.IN_IO_THREAD.get()) { + throw new IllegalStateException( + "read(...) in I/O thread causes a dead lock or " + + "sudden performance drop. Implement a state machine or " + + "call await*() from a different thread."); + } + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + getQueue().put(getMessage(e)); + } + + @SuppressWarnings("unchecked") + private E getMessage(MessageEvent e) { + return (E) e.getMessage(); + } + + public void beforeAdd(ChannelHandlerContext ctx) throws Exception { + if (channel != null) { + throw new IllegalStateException( + "cannot be added to multiple pipelines"); + } + channel = ctx.getChannel(); + } + + public void afterAdd(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + public void afterRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } + + public void beforeRemove(ChannelHandlerContext ctx) throws Exception { + // NOOP + } +} diff --git a/src/main/java/org/jboss/netty/handler/queue/BlockingReadTimeoutException.java b/src/main/java/org/jboss/netty/handler/queue/BlockingReadTimeoutException.java new file mode 100644 index 0000000000..91b888447f --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/queue/BlockingReadTimeoutException.java @@ -0,0 +1,60 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.jboss.netty.handler.queue; + +import java.io.IOException; + +/** + * A {@link IOException} raised by {@link BlockingReadHandler} when no data + * was read within a certain period of time. + * + * @author The Netty Project (netty-dev@lists.jboss.org) + * @author Trustin Lee (trustin@gmail.com) + * @version $Rev$, $Date$ + */ +public class BlockingReadTimeoutException extends IOException { + + private static final long serialVersionUID = 356009226872649493L; + + /** + * Creates a new instance. + */ + public BlockingReadTimeoutException() { + super(); + } + + /** + * Creates a new instance. + */ + public BlockingReadTimeoutException(String message, Throwable cause) { + super(message); + initCause(cause); + } + + /** + * Creates a new instance. + */ + public BlockingReadTimeoutException(String message) { + super(message); + } + + /** + * Creates a new instance. + */ + public BlockingReadTimeoutException(Throwable cause) { + initCause(cause); + } +} diff --git a/src/main/java/org/jboss/netty/handler/queue/package-info.java b/src/main/java/org/jboss/netty/handler/queue/package-info.java new file mode 100644 index 0000000000..0a581a067c --- /dev/null +++ b/src/main/java/org/jboss/netty/handler/queue/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2009 Red Hat, Inc. + * + * Red Hat licenses this file to you under the Apache License, version 2.0 + * (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +/** + * A collection of special-purpose handlers that store an event into their + * internal queue instead of propagating the event immediately. + */ +package org.jboss.netty.handler.queue;