Resolved issue: NETTY-208 (Blocking read handler)
* Added BlockingReadHandler and its exception
This commit is contained in:
parent
f7a0a4db11
commit
2685d03d8f
@ -0,0 +1,214 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2009 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.handler.queue;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.jboss.netty.channel.Channel;
|
||||||
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
import org.jboss.netty.channel.ChannelPipeline;
|
||||||
|
import org.jboss.netty.channel.ChannelPipelineCoverage;
|
||||||
|
import org.jboss.netty.channel.LifeCycleAwareChannelHandler;
|
||||||
|
import org.jboss.netty.channel.MessageEvent;
|
||||||
|
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||||
|
import org.jboss.netty.util.internal.IoWorkerRunnable;
|
||||||
|
import org.jboss.netty.util.internal.LinkedTransferQueue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Emulates blocking read operation. This handler stores all received messages
|
||||||
|
* into a {@link BlockingQueue} and returns the received messages when
|
||||||
|
* {@link #read()} or {@link #read(long, TimeUnit)} method is called.
|
||||||
|
* <p>
|
||||||
|
* Please note that this handler is only useful for the cases where there are
|
||||||
|
* very small number of connections, such as testing and simple client-side
|
||||||
|
* application development.
|
||||||
|
* <p>
|
||||||
|
* Also, any handler placed after this handler will never receive a
|
||||||
|
* {@code messageReceived} event, hence it should be placed in the last place
|
||||||
|
* in a pipeline.
|
||||||
|
* <p>
|
||||||
|
* Here is an example that demonstrates the usage:
|
||||||
|
* <pre>
|
||||||
|
* BlockingReadHandler<ChannelBuffer> reader =
|
||||||
|
* new BlockingReadHandler<ChannelBuffer>();
|
||||||
|
* ChannelPipeline p = ...;
|
||||||
|
* p.addLast("reader", reader);
|
||||||
|
*
|
||||||
|
* ...
|
||||||
|
*
|
||||||
|
* // Read a message from a channel in a blocking manner.
|
||||||
|
* try {
|
||||||
|
* ChannelBuffer buf = reader.read(60, TimeUnit.SECONDS);
|
||||||
|
* if (buf == null) {
|
||||||
|
* // Connection closed.
|
||||||
|
* } else {
|
||||||
|
* // Handle the received message here.
|
||||||
|
* }
|
||||||
|
* } catch (BlockingReadTimeoutException e) {
|
||||||
|
* // Read timed out.
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
*
|
||||||
|
* @param <E> the type of the received messages
|
||||||
|
*
|
||||||
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||||
|
* @author Trustin Lee (trustin@gmail.com)
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
|
@ChannelPipelineCoverage("one")
|
||||||
|
public class BlockingReadHandler<E> extends SimpleChannelUpstreamHandler
|
||||||
|
implements LifeCycleAwareChannelHandler {
|
||||||
|
|
||||||
|
private final BlockingQueue<E> queue;
|
||||||
|
private volatile Channel channel;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance with the default unbounded {@link BlockingQueue}
|
||||||
|
* implementation.
|
||||||
|
*/
|
||||||
|
public BlockingReadHandler() {
|
||||||
|
this(new LinkedTransferQueue<E>());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance with the specified {@link BlockingQueue}.
|
||||||
|
*/
|
||||||
|
public BlockingReadHandler(BlockingQueue<E> queue) {
|
||||||
|
if (queue == null) {
|
||||||
|
throw new NullPointerException("queue");
|
||||||
|
}
|
||||||
|
this.queue = queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the queue which stores the received messages. The default
|
||||||
|
* implementation returns the queue which was specified in the constructor.
|
||||||
|
*/
|
||||||
|
protected BlockingQueue<E> getQueue() {
|
||||||
|
return queue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns {@code true} if and only if the {@link Channel} associated with
|
||||||
|
* this handler has been closed.
|
||||||
|
*
|
||||||
|
* @throws IllegalStateException
|
||||||
|
* if this handler was not added to a {@link ChannelPipeline} yet
|
||||||
|
*/
|
||||||
|
public boolean isClosed() {
|
||||||
|
Channel ch = channel;
|
||||||
|
if (ch == null) {
|
||||||
|
throw new IllegalStateException("not added to the pipeline yet");
|
||||||
|
}
|
||||||
|
|
||||||
|
return ch.getCloseFuture().isDone();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits until a new message is received or the associated {@link Channel}
|
||||||
|
* is closed.
|
||||||
|
*
|
||||||
|
* @return the received message or {@code null} if the associated
|
||||||
|
* {@link Channel} has been closed
|
||||||
|
* @throws InterruptedException
|
||||||
|
* if the operation has been interrupted
|
||||||
|
*/
|
||||||
|
public E read() throws InterruptedException {
|
||||||
|
detectDeadLock();
|
||||||
|
if (isClosed()) {
|
||||||
|
if (getQueue().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return getQueue().take();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits until a new message is received or the associated {@link Channel}
|
||||||
|
* is closed.
|
||||||
|
*
|
||||||
|
* @param timeout
|
||||||
|
* the amount time to wait until a new message is received.
|
||||||
|
* If no message is received within the timeout,
|
||||||
|
* {@link BlockingReadTimeoutException} is thrown.
|
||||||
|
* @param unit
|
||||||
|
* the unit of {@code timeout}
|
||||||
|
*
|
||||||
|
* @return the received message or {@code null} if the associated
|
||||||
|
* {@link Channel} has been closed
|
||||||
|
* @throws BlockingReadTimeoutException
|
||||||
|
* if no message was received within the specified timeout
|
||||||
|
* @throws InterruptedException
|
||||||
|
* if the operation has been interrupted
|
||||||
|
*/
|
||||||
|
public E read(long timeout, TimeUnit unit) throws InterruptedException, BlockingReadTimeoutException {
|
||||||
|
detectDeadLock();
|
||||||
|
if (isClosed()) {
|
||||||
|
if (getQueue().isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
E msg = getQueue().poll(timeout, unit);
|
||||||
|
if (msg == null) {
|
||||||
|
throw new BlockingReadTimeoutException();
|
||||||
|
} else {
|
||||||
|
return msg;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void detectDeadLock() {
|
||||||
|
if (IoWorkerRunnable.IN_IO_THREAD.get()) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"read(...) in I/O thread causes a dead lock or " +
|
||||||
|
"sudden performance drop. Implement a state machine or " +
|
||||||
|
"call await*() from a different thread.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
|
||||||
|
throws Exception {
|
||||||
|
getQueue().put(getMessage(e));
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private E getMessage(MessageEvent e) {
|
||||||
|
return (E) e.getMessage();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
if (channel != null) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"cannot be added to multiple pipelines");
|
||||||
|
}
|
||||||
|
channel = ctx.getChannel();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void afterAdd(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
public void afterRemove(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
|
||||||
|
public void beforeRemove(ChannelHandlerContext ctx) throws Exception {
|
||||||
|
// NOOP
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,60 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2009 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.handler.queue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A {@link IOException} raised by {@link BlockingReadHandler} when no data
|
||||||
|
* was read within a certain period of time.
|
||||||
|
*
|
||||||
|
* @author The Netty Project (netty-dev@lists.jboss.org)
|
||||||
|
* @author Trustin Lee (trustin@gmail.com)
|
||||||
|
* @version $Rev$, $Date$
|
||||||
|
*/
|
||||||
|
public class BlockingReadTimeoutException extends IOException {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 356009226872649493L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*/
|
||||||
|
public BlockingReadTimeoutException() {
|
||||||
|
super();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*/
|
||||||
|
public BlockingReadTimeoutException(String message, Throwable cause) {
|
||||||
|
super(message);
|
||||||
|
initCause(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*/
|
||||||
|
public BlockingReadTimeoutException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new instance.
|
||||||
|
*/
|
||||||
|
public BlockingReadTimeoutException(Throwable cause) {
|
||||||
|
initCause(cause);
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,21 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2009 Red Hat, Inc.
|
||||||
|
*
|
||||||
|
* Red Hat licenses this file to you under the Apache License, version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with the
|
||||||
|
* License. You may obtain a copy of the License at:
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A collection of special-purpose handlers that store an event into their
|
||||||
|
* internal queue instead of propagating the event immediately.
|
||||||
|
*/
|
||||||
|
package org.jboss.netty.handler.queue;
|
Loading…
x
Reference in New Issue
Block a user