* Fixed a problem where BlockingReadHandler does not handle exceptions properly
* Added BlockingReadHandler.readEvent()
This commit is contained in:
parent
38c60ef807
commit
bab9af2b52
@ -15,13 +15,16 @@
|
||||
*/
|
||||
package org.jboss.netty.handler.queue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jboss.netty.channel.Channel;
|
||||
import org.jboss.netty.channel.ChannelEvent;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
|
||||
import org.jboss.netty.channel.MessageEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
@ -38,8 +41,8 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
* application development.
|
||||
* <p>
|
||||
* Also, any handler placed after this handler will never receive a
|
||||
* {@code messageReceived} event, hence it should be placed in the last place
|
||||
* in a pipeline.
|
||||
* {@code messageReceived} event and an {@code exceptionCaught} event, hence it
|
||||
* should be placed in the last place in a pipeline.
|
||||
* <p>
|
||||
* Here is an example that demonstrates the usage:
|
||||
* <pre>
|
||||
@ -60,6 +63,8 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
* }
|
||||
* } catch (BlockingReadTimeoutException e) {
|
||||
* // Read timed out.
|
||||
* } catch (IOException e) {
|
||||
* // Other read errors
|
||||
* }
|
||||
* </pre>
|
||||
*
|
||||
@ -73,7 +78,7 @@ import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||
public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
implements LifeCycleAwareChannelHandler {
|
||||
|
||||
private final BlockingQueue<E> queue;
|
||||
private final BlockingQueue<ChannelEvent> queue;
|
||||
private volatile Channel channel;
|
||||
|
||||
/**
|
||||
@ -81,13 +86,13 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
* implementation.
|
||||
*/
|
||||
public BlockingReadHandler() {
|
||||
this(new LinkedTransferQueue<E>());
|
||||
this(new LinkedTransferQueue<ChannelEvent>());
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance with the specified {@link BlockingQueue}.
|
||||
*/
|
||||
public BlockingReadHandler(BlockingQueue<E> queue) {
|
||||
public BlockingReadHandler(BlockingQueue<ChannelEvent> queue) {
|
||||
if (queue == null) {
|
||||
throw new NullPointerException("queue");
|
||||
}
|
||||
@ -98,7 +103,7 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
* Returns the queue which stores the received messages. The default
|
||||
* implementation returns the queue which was specified in the constructor.
|
||||
*/
|
||||
protected BlockingQueue<E> getQueue() {
|
||||
protected BlockingQueue<ChannelEvent> getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
@ -124,18 +129,24 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
*
|
||||
* @return the received message or {@code null} if the associated
|
||||
* {@link Channel} has been closed
|
||||
* @throws IOException
|
||||
* if failed to receive a new message
|
||||
* @throws InterruptedException
|
||||
* if the operation has been interrupted
|
||||
*/
|
||||
public E read() throws InterruptedException {
|
||||
detectDeadLock();
|
||||
if (isClosed()) {
|
||||
if (getQueue().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
public E read() throws IOException, InterruptedException {
|
||||
ChannelEvent e = readEvent();
|
||||
if (e == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return getQueue().take();
|
||||
if (e instanceof MessageEvent) {
|
||||
return getMessage((MessageEvent) e);
|
||||
} else if (e instanceof ExceptionEvent) {
|
||||
throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
|
||||
} else {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -153,10 +164,36 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
* {@link Channel} has been closed
|
||||
* @throws BlockingReadTimeoutException
|
||||
* if no message was received within the specified timeout
|
||||
* @throws IOException
|
||||
* if failed to receive a new message
|
||||
* @throws InterruptedException
|
||||
* if the operation has been interrupted
|
||||
*/
|
||||
public E read(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
|
||||
public E read(long timeout, TimeUnit unit) throws IOException, InterruptedException {
|
||||
ChannelEvent e = readEvent(timeout, unit);
|
||||
if (e == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (e instanceof MessageEvent) {
|
||||
return getMessage((MessageEvent) e);
|
||||
} else if (e instanceof ExceptionEvent) {
|
||||
throw (IOException) new IOException().initCause(((ExceptionEvent) e).getCause());
|
||||
} else {
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until a new {@link ChannelEvent} is received or the associated
|
||||
* {@link Channel} is closed.
|
||||
*
|
||||
* @return a {@link MessageEvent} or an {@link ExceptionEvent}.
|
||||
* {@code null} if the associated {@link Channel} has been closed
|
||||
* @throws InterruptedException
|
||||
* if the operation has been interrupted
|
||||
*/
|
||||
public ChannelEvent readEvent() throws InterruptedException {
|
||||
detectDeadLock();
|
||||
if (isClosed()) {
|
||||
if (getQueue().isEmpty()) {
|
||||
@ -164,11 +201,40 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
}
|
||||
}
|
||||
|
||||
E msg = getQueue().poll(timeout, unit);
|
||||
if (msg == null) {
|
||||
return getQueue().take();
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits until a new {@link ChannelEvent} is received or the associated
|
||||
* {@link Channel} is closed.
|
||||
*
|
||||
* @param timeout
|
||||
* the amount time to wait until a new {@link ChannelEvent} is
|
||||
* received. If no message is received within the timeout,
|
||||
* {@link BlockingReadTimeoutException} is thrown.
|
||||
* @param unit
|
||||
* the unit of {@code timeout}
|
||||
*
|
||||
* @return a {@link MessageEvent} or an {@link ExceptionEvent}.
|
||||
* {@code null} if the associated {@link Channel} has been closed
|
||||
* @throws BlockingReadTimeoutException
|
||||
* if no event was received within the specified timeout
|
||||
* @throws InterruptedException
|
||||
* if the operation has been interrupted
|
||||
*/
|
||||
public ChannelEvent readEvent(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
|
||||
detectDeadLock();
|
||||
if (isClosed()) {
|
||||
if (getQueue().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
ChannelEvent e = getQueue().poll(timeout, unit);
|
||||
if (e == null) {
|
||||
throw new BlockingReadTimeoutException();
|
||||
} else {
|
||||
return msg;
|
||||
return e;
|
||||
}
|
||||
}
|
||||
|
||||
@ -184,9 +250,16 @@ public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||
@Override
|
||||
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||
throws Exception {
|
||||
getQueue().put(getMessage(e));
|
||||
getQueue().put(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||
throws Exception {
|
||||
getQueue().put(e);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private E getMessage(MessageEvent e) {
|
||||
return (E) e.getMessage();
|
||||
|
Loading…
Reference in New Issue
Block a user