Ported BlockingReadHandler

This commit is contained in:
Trustin Lee 2012-05-31 16:44:51 -07:00
parent 468918227a
commit 7be188f8c0

View File

@ -15,22 +15,21 @@
*/ */
package io.netty.handler.queue; package io.netty.handler.queue;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.BlockingOperationException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.util.internal.QueueFactory;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import io.netty.buffer.ChannelBuffer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelEvent;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelStateEvent;
import io.netty.channel.ExceptionEvent;
import io.netty.channel.MessageEvent;
import io.netty.channel.SimpleChannelUpstreamHandler;
import io.netty.util.internal.DeadLockProofWorker;
import io.netty.util.internal.QueueFactory;
/** /**
* Emulates blocking read operation. This handler stores all received messages * Emulates blocking read operation. This handler stores all received messages
* into a {@link BlockingQueue} and returns the received messages when * into a {@link BlockingQueue} and returns the received messages when
@ -71,9 +70,12 @@ import io.netty.util.internal.QueueFactory;
* *
* @param <E> the type of the received messages * @param <E> the type of the received messages
*/ */
public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler { public class BlockingReadHandler<E> extends ChannelInboundHandlerAdapter<Object> {
private final BlockingQueue<ChannelEvent> queue; private static final Object INACTIVE = new Object();
private volatile ChannelHandlerContext ctx;
private final BlockingQueue<Object> queue;
private volatile boolean closed; private volatile boolean closed;
/** /**
@ -81,33 +83,29 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
* implementation. * implementation.
*/ */
public BlockingReadHandler() { public BlockingReadHandler() {
this(QueueFactory.createQueue(ChannelEvent.class)); this(QueueFactory.createQueue());
} }
/** /**
* Creates a new instance with the specified {@link BlockingQueue}. * Creates a new instance with the specified {@link BlockingQueue}.
*/ */
public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) { public BlockingReadHandler(BlockingQueue<Object> queue) {
if (queue == null) { if (queue == null) {
throw new NullPointerException("queue"); throw new NullPointerException("queue");
} }
this.queue = queue; this.queue = queue;
} }
/** @Override
* Returns the queue which stores the received messages. The default public ChannelBufferHolder<Object> newInboundBuffer(
* implementation returns the queue which was specified in the constructor. ChannelInboundHandlerContext<Object> ctx) throws Exception {
*/ this.ctx = ctx;
protected BlockingQueue<ChannelEvent> getQueue() { return ChannelBufferHolders.catchAllBuffer();
return queue;
} }
/** /**
* Returns {@code true} if and only if the {@link Channel} associated with * Returns {@code true} if and only if the {@link Channel} associated with
* this handler has been closed. * this handler has been closed.
*
* @throws IllegalStateException
* if this handler was not added to a {@link ChannelPipeline} yet
*/ */
public boolean isClosed() { public boolean isClosed() {
return closed; return closed;
@ -125,18 +123,7 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
* if the operation has been interrupted * if the operation has been interrupted
*/ */
public E read() throws IOException, InterruptedException { public E read() throws IOException, InterruptedException {
ChannelEvent e = readEvent(); return filter(readEvent());
if (e == null) {
return null;
}
if (e instanceof MessageEvent) {
return getMessage((MessageEvent) e);
} else if (e instanceof ExceptionEvent) {
throw (IOException) new IOException().initCause(((ExceptionEvent) e).cause());
} else {
throw new IllegalStateException();
}
} }
/** /**
@ -160,115 +147,73 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler {
* if the operation has been interrupted * if the operation has been interrupted
*/ */
public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException { public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException {
ChannelEvent e = readEvent(timeout, unit); return filter(readEvent(timeout, unit));
if (e == null) { }
@SuppressWarnings("unchecked")
private E filter(Object e) throws IOException {
if (e == null || e == INACTIVE) {
return null; return null;
} }
if (e instanceof MessageEvent) { if (e instanceof Throwable) {
return getMessage((MessageEvent) e); throw (IOException) new IOException().initCause((Throwable) e);
} else if (e instanceof ExceptionEvent) {
throw (IOException) new IOException().initCause(((ExceptionEvent) e).cause());
} else {
throw new IllegalStateException();
} }
return (E) e;
} }
/** private Object readEvent() throws InterruptedException {
* Waits until a new {@link ChannelEvent} is received or the associated
* {@link Channel} is closed.
*
* @return a {@link MessageEvent} or an {@link ExceptionEvent}.
* {@code null} if the associated {@link Channel} has been closed
* @throws InterruptedException
* if the operation has been interrupted
*/
public ChannelEvent readEvent() throws InterruptedException {
detectDeadLock(); detectDeadLock();
if (isClosed()) { if (isClosed()) {
if (getQueue().isEmpty()) {
return null;
}
}
ChannelEvent e = getQueue().take();
if (e instanceof ChannelStateEvent) {
// channelClosed has been triggered.
assert closed;
return null; return null;
} else {
return e;
} }
return queue.take();
} }
/** private Object readEvent(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
* Waits until a new {@link ChannelEvent} is received or the associated
* {@link Channel} is closed.
*
* @param timeout
* the amount time to wait until a new {@link ChannelEvent} is
* received. If no message is received within the timeout,
* {@link BlockingReadTimeoutException} is thrown.
* @param unit
* the unit of {@code timeout}
*
* @return a {@link MessageEvent} or an {@link ExceptionEvent}.
* {@code null} if the associated {@link Channel} has been closed
* @throws BlockingReadTimeoutException
* if no event was received within the specified timeout
* @throws InterruptedException
* if the operation has been interrupted
*/
public ChannelEvent readEvent(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
detectDeadLock(); detectDeadLock();
if (isClosed()) { if (isClosed()) {
if (getQueue().isEmpty()) { return null;
return null;
}
} }
ChannelEvent e = getQueue().poll(timeout, unit); Object o = queue.poll(timeout, unit);
if (e == null) { if (o == null) {
throw new BlockingReadTimeoutException(); throw new BlockingReadTimeoutException();
} else if (e instanceof ChannelStateEvent) {
// channelClosed has been triggered.
assert closed;
return null;
} else { } else {
return e; return o;
} }
} }
private void detectDeadLock() { private void detectDeadLock() {
if (DeadLockProofWorker.PARENT.get() != null) { if (ctx.eventLoop().inEventLoop()) {
throw new IllegalStateException( throw new BlockingOperationException();
"read*(...) in I/O thread causes a dead lock or " +
"sudden performance drop. Implement a state machine or " +
"call read*() from a different thread.");
} }
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void channelInactive(ChannelInboundHandlerContext<Object> ctx) throws Exception {
throws Exception { addEvent(INACTIVE);
getQueue().put(e);
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) public void exceptionCaught(ChannelInboundHandlerContext<Object> ctx,
throws Exception { Throwable cause) throws Exception {
getQueue().put(e); addEvent(cause);
} }
@Override @Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) public void afterRemove(ChannelHandlerContext ctx) throws Exception {
throws Exception { addEvent(INACTIVE);
closed = true;
getQueue().put(e);
} }
@SuppressWarnings("unchecked") private void addEvent(Object e) {
private E getMessage(MessageEvent e) { if (!closed) {
return (E) e.getMessage(); if (e == INACTIVE) {
closed = true;
}
queue.add(e);
}
} }
} }